1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Replace (most) global state in cli/__init__.py (#9678)

* Rewrite helpful_test to appease the linter

* Use public interface to access argparse sources dict

* HelpfulParser builds ArgumentSources dict, stores it in NamespaceConfig

After arguments/config files/user prompted input have been parsed, we
build a mapping of Namespace options to an ArgumentSource value. These
generally come from argparse's builtin "source_to_settings" dict, but
we also add a source value representing dynamic values set at runtime.

This dict is then passed to NamespaceConfig, which can then be queried
directly or via the "set_by_user" method, which replaces the global
"set_by_cli" and "option_was_set" functions.

* Use NamespaceConfig.set_by_user instead of set_by_cli/option_was_set

This involves passing the NamespaceConfig around to more functions
than before, removes the need for most of the global state shenanigans
needed by set_by_cli and friends.

* Set runtime config values on the NamespaceConfig object

This'll correctly mark them as being "runtime" values in the
ArgumentSources dict

* Bump oldest configargparse version

We need a version that has get_source_to_settings_dict()

* Add more cli unit tests, use ArgumentSource.DEFAULT by default

One of the tests revealed that ConfigArgParse's source dict excludes
arguments it considers unimportant/irrelevant. We now mark all arguments
as having a DEFAULT source by default, and update them otherwise.

* Mark more argument sources as RUNTIME

* Removes some redundant helpful_test.py, moves one to cli_test.py

We were already testing most of these cases in cli_test.py, only
with a more complete HelpfulArgumentParser setup. And since the hsts/no-hsts
test was manually performing the kind of argument adding that cli
already does out of the box, I figured the cli tests were a more natural
place for it.

* appease the linter

* Various fixups from review

* Add windows compatability fix

* Add test ensuring relevant_values behaves properly

* Build sources dict in a more predictable manner

The dict is now built in a defined order: first defaults, then config
files, then env vars, then command line args. This way we eliminate the
possibility of undefined behavior if configargparse puts an arg's entry
in multiple source dicts.

* remove superfluous update to sources dict

* remove duplicate constant defines, resolve circular import situation
This commit is contained in:
Will Greenberg
2023-05-30 17:12:51 -07:00
committed by GitHub
parent b5661e84e8
commit a5d223d1e5
22 changed files with 491 additions and 551 deletions

View File

@@ -58,7 +58,7 @@ class Proxy(configurators_common.Proxy):
getattr(entrypoint.ENTRYPOINT.OS_DEFAULTS, k))
self._configurator = entrypoint.ENTRYPOINT(
config=configuration.NamespaceConfig(self.le_config),
config=configuration.NamespaceConfig(self.le_config, {}),
name="apache")
self._configurator.prepare()

View File

@@ -44,7 +44,7 @@ class Proxy(configurators_common.Proxy):
for k in constants.CLI_DEFAULTS:
setattr(self.le_config, "nginx_" + k, constants.os_constant(k))
conf = configuration.NamespaceConfig(self.le_config)
conf = configuration.NamespaceConfig(self.le_config, {})
self._configurator = configurator.NginxConfigurator(config=conf, name="nginx")
self._configurator.prepare()

View File

@@ -10,6 +10,7 @@ from typing import Optional
from typing import Type
import certbot
from certbot.configuration import NamespaceConfig
from certbot._internal import constants
from certbot._internal.cli.cli_constants import ARGPARSE_PARAMS_TO_REMOVE
from certbot._internal.cli.cli_constants import cli_command
@@ -20,7 +21,6 @@ from certbot._internal.cli.cli_constants import HELP_AND_VERSION_USAGE
from certbot._internal.cli.cli_constants import SHORT_USAGE
from certbot._internal.cli.cli_constants import VAR_MODIFIERS
from certbot._internal.cli.cli_constants import ZERO_ARG_ACTIONS
from certbot._internal.cli.cli_utils import _Default
from certbot._internal.cli.cli_utils import _DeployHookAction
from certbot._internal.cli.cli_utils import _DomainsAction
from certbot._internal.cli.cli_utils import _EncodeReasonAction
@@ -54,19 +54,19 @@ logger = logging.getLogger(__name__)
helpful_parser: Optional[HelpfulArgumentParser] = None
def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[str],
detect_defaults: bool = False) -> argparse.Namespace:
def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[str]
) -> NamespaceConfig:
"""Returns parsed command line arguments.
:param .PluginsRegistry plugins: available plugins
:param list args: command line arguments with the program name removed
:returns: parsed command line arguments
:rtype: argparse.Namespace
:rtype: configuration.NamespaceConfig
"""
helpful = HelpfulArgumentParser(args, plugins, detect_defaults)
helpful = HelpfulArgumentParser(args, plugins)
_add_all_groups(helpful)
# --help is automatically provided by argparse
@@ -471,95 +471,16 @@ def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[st
# parser (--help should display plugin-specific options last)
_plugins_parsing(helpful, plugins)
if not detect_defaults:
global helpful_parser # pylint: disable=global-statement
helpful_parser = helpful
global helpful_parser # pylint: disable=global-statement
helpful_parser = helpful
return helpful.parse_args()
def set_by_cli(var: str) -> bool:
"""
Return True if a particular config variable has been set by the user
(CLI or config file) including if the user explicitly set it to the
default. Returns False if the variable was assigned a default value.
"""
# We should probably never actually hit this code. But if we do,
# a deprecated option has logically never been set by the CLI.
if var in DEPRECATED_OPTIONS:
return False
detector = set_by_cli.detector # type: ignore
if detector is None and helpful_parser is not None:
# Setup on first run: `detector` is a weird version of config in which
# the default value of every attribute is wrangled to be boolean-false
plugins = plugins_disco.PluginsRegistry.find_all()
# reconstructed_args == sys.argv[1:], or whatever was passed to main()
reconstructed_args = helpful_parser.args + [helpful_parser.verb]
detector = set_by_cli.detector = prepare_and_parse_args( # type: ignore
plugins, reconstructed_args, detect_defaults=True)
# propagate plugin requests: eg --standalone modifies config.authenticator
detector.authenticator, detector.installer = (
plugin_selection.cli_plugin_requests(detector))
if not isinstance(getattr(detector, var), _Default):
logger.debug("Var %s=%s (set by user).", var, getattr(detector, var))
return True
for modifier in VAR_MODIFIERS.get(var, []):
if set_by_cli(modifier):
logger.debug("Var %s=%s (set by user).",
var, VAR_MODIFIERS.get(var, []))
return True
return False
# static housekeeping var
# functions attributed are not supported by mypy
# https://github.com/python/mypy/issues/2087
set_by_cli.detector = None # type: ignore
def has_default_value(option: str, value: Any) -> bool:
"""Does option have the default value?
If the default value of option is not known, False is returned.
:param str option: configuration variable being considered
:param value: value of the configuration variable named option
:returns: True if option has the default value, otherwise, False
:rtype: bool
"""
if helpful_parser is not None:
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
return False
def option_was_set(option: str, value: Any) -> bool:
"""Was option set by the user or does it differ from the default?
:param str option: configuration variable being considered
:param value: value of the configuration variable named option
:returns: True if the option was set, otherwise, False
:rtype: bool
"""
# If an option is deprecated, it was effectively not set by the user.
if option in DEPRECATED_OPTIONS:
return False
return set_by_cli(option) or not has_default_value(option, value)
def argparse_type(variable: Any) -> Type:
"""Return our argparse type function for a config variable (default: str)"""
# pylint: disable=protected-access
if helpful_parser is not None:
for action in helpful_parser.parser._actions:
for action in helpful_parser.actions:
if action.type is not None and action.dest == variable:
return action.type
return str

View File

@@ -22,22 +22,6 @@ if TYPE_CHECKING:
from certbot._internal.cli import helpful
class _Default:
"""A class to use as a default to detect if a value is set by a user"""
def __bool__(self) -> bool:
return False
def __eq__(self, other: Any) -> bool:
return isinstance(other, _Default)
def __hash__(self) -> int:
return id(_Default)
def __nonzero__(self) -> bool:
return self.__bool__()
def read_file(filename: str, mode: str = "rb") -> Tuple[str, Any]:
"""Returns the given file's contents.

View File

@@ -1,7 +1,6 @@
"""Certbot command line argument parser"""
import argparse
import copy
import functools
import glob
import sys
@@ -10,6 +9,7 @@ from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import configargparse
@@ -19,13 +19,9 @@ from certbot import errors
from certbot import util
from certbot._internal import constants
from certbot._internal import hooks
from certbot._internal.cli.cli_constants import ARGPARSE_PARAMS_TO_REMOVE
from certbot._internal.cli.cli_constants import COMMAND_OVERVIEW
from certbot._internal.cli.cli_constants import EXIT_ACTIONS
from certbot._internal.cli.cli_constants import HELP_AND_VERSION_USAGE
from certbot._internal.cli.cli_constants import SHORT_USAGE
from certbot._internal.cli.cli_constants import ZERO_ARG_ACTIONS
from certbot._internal.cli.cli_utils import _Default
from certbot._internal.cli.cli_utils import add_domains
from certbot._internal.cli.cli_utils import CustomHelpFormatter
from certbot._internal.cli.cli_utils import flag_default
@@ -35,6 +31,8 @@ from certbot._internal.cli.verb_help import VERB_HELP_MAP
from certbot._internal.display import obj as display_obj
from certbot._internal.plugins import disco
from certbot.compat import os
from certbot.configuration import ArgumentSource
from certbot.configuration import NamespaceConfig
class HelpfulArgumentParser:
@@ -45,8 +43,7 @@ class HelpfulArgumentParser:
'certbot --help security' for security options.
"""
def __init__(self, args: List[str], plugins: Iterable[str],
detect_defaults: bool = False) -> None:
def __init__(self, args: List[str], plugins: Iterable[str]) -> None:
from certbot._internal import main
self.VERBS = {
"auth": main.certonly,
@@ -72,6 +69,8 @@ class HelpfulArgumentParser:
# Get notification function for printing
self.notify = display_obj.NoninteractiveDisplay(sys.stdout).notification
self.actions: List[configargparse.Action] = []
# List of topics for which additional help can be provided
HELP_TOPICS: List[Optional[str]] = ["all", "security", "paths", "automation", "testing"]
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]
@@ -79,7 +78,6 @@ class HelpfulArgumentParser:
plugin_names: List[Optional[str]] = list(plugins)
self.help_topics: List[Optional[str]] = HELP_TOPICS + plugin_names + [None]
self.detect_defaults = detect_defaults
self.args = args
if self.args and self.args[0] == 'help':
@@ -100,8 +98,6 @@ class HelpfulArgumentParser:
# elements are added by .add_group()
self.groups: Dict[str, argparse._ArgumentGroup] = {}
# elements are added by .parse_args()
self.defaults: Dict[str, Any] = {}
self.parser = configargparse.ArgParser(
prog="certbot",
@@ -166,75 +162,114 @@ class HelpfulArgumentParser:
return usage
def remove_config_file_domains_for_renewal(self, parsed_args: argparse.Namespace) -> None:
def remove_config_file_domains_for_renewal(self, config: NamespaceConfig) -> None:
"""Make "certbot renew" safe if domains are set in cli.ini."""
# Works around https://github.com/certbot/certbot/issues/4096
if self.verb == "renew":
for source, flags in self.parser._source_to_settings.items(): # pylint: disable=protected-access
if source.startswith("config_file") and "domains" in flags:
parsed_args.domains = _Default() if self.detect_defaults else []
if (config.argument_sources['domains'] == ArgumentSource.CONFIG_FILE and
self.verb == "renew"):
config.domains = []
def parse_args(self) -> argparse.Namespace:
def _build_sources_dict(self) -> Dict[str, ArgumentSource]:
# ConfigArgparse's get_source_to_settings_dict doesn't actually create
# default entries for each argument with a default value, omitting many
# args we'd otherwise care about. So in general, unless an argument was
# specified in a config file/environment variable/command line arg,
# consider it as having a "default" value
result = { action.dest: ArgumentSource.DEFAULT for action in self.actions }
source_to_settings_dict: Dict[str, Dict[str, Tuple[configargparse.Action, str]]]
source_to_settings_dict = self.parser.get_source_to_settings_dict()
# We'll process the sources dict in order of each source's "priority",
# i.e. the order in which ConfigArgparse ultimately sets argument
# values:
# 1. defaults (`result` already has everything marked as such)
# 2. config files
# 3. env vars (shouldn't be any)
# 4. command line
def update_result(settings_dict: Dict[str, Tuple[configargparse.Action, str]],
source: ArgumentSource) -> None:
actions = [action for _, (action, _) in settings_dict.items()]
result.update({ action.dest: source for action in actions})
# config file sources look like "config_file|<name of file>"
for source_key in source_to_settings_dict:
if source_key.startswith('config_file'):
update_result(source_to_settings_dict[source_key], ArgumentSource.CONFIG_FILE)
update_result(source_to_settings_dict.get('env_var', {}), ArgumentSource.ENV_VAR)
# The command line settings dict is weird, so handle it separately
if 'command_line' in source_to_settings_dict:
settings_dict: Dict[str, Tuple[None, List[str]]]
settings_dict = source_to_settings_dict['command_line'] # type: ignore
(_, args) = settings_dict['']
args = [arg for arg in args if arg.startswith('-')]
for arg in args:
# find the action corresponding to this arg
for action in self.actions:
if arg in action.option_strings:
result[action.dest] = ArgumentSource.COMMAND_LINE
continue
return result
def parse_args(self) -> NamespaceConfig:
"""Parses command line arguments and returns the result.
:returns: parsed command line arguments
:rtype: argparse.Namespace
:rtype: configuration.NamespaceConfig
"""
parsed_args = self.parser.parse_args(self.args)
parsed_args.func = self.VERBS[self.verb]
parsed_args.verb = self.verb
config = NamespaceConfig(parsed_args, self._build_sources_dict())
self.remove_config_file_domains_for_renewal(parsed_args)
if self.detect_defaults:
return parsed_args
self.defaults = {key: copy.deepcopy(self.parser.get_default(key))
for key in vars(parsed_args)}
self.remove_config_file_domains_for_renewal(config)
# Do any post-parsing homework here
if self.verb == "renew":
if parsed_args.force_interactive:
if config.force_interactive:
raise errors.Error(
"{0} cannot be used with renew".format(
constants.FORCE_INTERACTIVE_FLAG))
parsed_args.noninteractive_mode = True
config.noninteractive_mode = True
if parsed_args.force_interactive and parsed_args.noninteractive_mode:
if config.force_interactive and config.noninteractive_mode:
raise errors.Error(
"Flag for non-interactive mode and {0} conflict".format(
constants.FORCE_INTERACTIVE_FLAG))
if parsed_args.staging or parsed_args.dry_run:
self.set_test_server(parsed_args)
if config.staging or config.dry_run:
self.set_test_server(config)
if parsed_args.csr:
self.handle_csr(parsed_args)
if config.csr:
self.handle_csr(config)
if parsed_args.must_staple:
parsed_args.staple = True
if config.must_staple and not config.staple:
config.staple = True
if parsed_args.validate_hooks:
hooks.validate_hooks(parsed_args)
if config.validate_hooks:
hooks.validate_hooks(config)
if parsed_args.allow_subset_of_names:
if any(util.is_wildcard_domain(d) for d in parsed_args.domains):
if config.allow_subset_of_names:
if any(util.is_wildcard_domain(d) for d in config.domains):
raise errors.Error("Using --allow-subset-of-names with a"
" wildcard domain is not supported.")
if parsed_args.hsts and parsed_args.auto_hsts:
if config.hsts and config.auto_hsts:
raise errors.Error(
"Parameters --hsts and --auto-hsts cannot be used simultaneously.")
if isinstance(parsed_args.key_type, list) and len(parsed_args.key_type) > 1:
if isinstance(config.key_type, list) and len(config.key_type) > 1:
raise errors.Error(
"Only *one* --key-type type may be provided at this time.")
return parsed_args
return config
def set_test_server(self, parsed_args: argparse.Namespace) -> None:
def set_test_server(self, config: NamespaceConfig) -> None:
"""We have --staging/--dry-run; perform sanity check and set config.server"""
# Flag combinations should produce these results:
@@ -246,51 +281,51 @@ class HelpfulArgumentParser:
default_servers = (flag_default("server"), constants.STAGING_URI)
if parsed_args.staging and parsed_args.server not in default_servers:
if config.staging and config.server not in default_servers:
raise errors.Error("--server value conflicts with --staging")
if parsed_args.server in default_servers:
parsed_args.server = constants.STAGING_URI
if config.server == flag_default("server"):
config.server = constants.STAGING_URI
if parsed_args.dry_run:
if config.dry_run:
if self.verb not in ["certonly", "renew"]:
raise errors.Error("--dry-run currently only works with the "
"'certonly' or 'renew' subcommands (%r)" % self.verb)
parsed_args.break_my_certs = parsed_args.staging = True
if glob.glob(os.path.join(parsed_args.config_dir, constants.ACCOUNTS_DIR, "*")):
config.break_my_certs = config.staging = True
if glob.glob(os.path.join(config.config_dir, constants.ACCOUNTS_DIR, "*")):
# The user has a prod account, but might not have a staging
# one; we don't want to start trying to perform interactive registration
parsed_args.tos = True
parsed_args.register_unsafely_without_email = True
config.tos = True
config.register_unsafely_without_email = True
def handle_csr(self, parsed_args: argparse.Namespace) -> None:
def handle_csr(self, config: NamespaceConfig) -> None:
"""Process a --csr flag."""
if parsed_args.verb != "certonly":
if config.verb != "certonly":
raise errors.Error("Currently, a CSR file may only be specified "
"when obtaining a new or replacement "
"via the certonly command. Please try the "
"certonly command instead.")
if parsed_args.allow_subset_of_names:
if config.allow_subset_of_names:
raise errors.Error("--allow-subset-of-names cannot be used with --csr")
csrfile, contents = parsed_args.csr[0:2]
csrfile, contents = config.csr[0:2]
typ, csr, domains = crypto_util.import_csr_file(csrfile, contents)
# This is not necessary for webroot to work, however,
# obtain_certificate_from_csr requires parsed_args.domains to be set
# obtain_certificate_from_csr requires config.domains to be set
for domain in domains:
add_domains(parsed_args, domain)
add_domains(config, domain)
if not domains:
# TODO: add CN to domains instead:
raise errors.Error(
"Unfortunately, your CSR %s needs to have a SubjectAltName for every domain"
% parsed_args.csr[0])
% config.csr[0])
parsed_args.actual_csr = (csr, typ)
config.actual_csr = (csr, typ)
csr_domains = {d.lower() for d in domains}
config_domains = set(parsed_args.domains)
config_domains = set(config.domains)
if csr_domains != config_domains:
raise errors.ConfigurationError(
"Inconsistent domain requests:\nFrom the CSR: {0}\nFrom command line/config: {1}"
@@ -356,6 +391,10 @@ class HelpfulArgumentParser:
:param dict **kwargs: various argparse settings for this argument
"""
self.actions.append(self._add(topics, *args, **kwargs))
def _add(self, topics: Optional[Union[List[Optional[str]], str]], *args: Any,
**kwargs: Any) -> configargparse.Action:
action = kwargs.get("action")
if action is util.DeprecatedArgumentAction:
# If the argument is deprecated through
@@ -366,8 +405,7 @@ class HelpfulArgumentParser:
# handling default detection since these actions aren't needed and
# can cause bugs like
# https://github.com/certbot/certbot/issues/8495.
self.parser.add_argument(*args, **kwargs)
return
return self.parser.add_argument(*args, **kwargs)
if isinstance(topics, list):
# if this flag can be listed in multiple sections, try to pick the one
@@ -376,40 +414,15 @@ class HelpfulArgumentParser:
else:
topic = topics # there's only one
if self.detect_defaults:
kwargs = self.modify_kwargs_for_default_detection(**kwargs)
if not isinstance(topic, bool) and self.visible_topics[topic]:
if topic in self.groups:
group = self.groups[topic]
group.add_argument(*args, **kwargs)
return group.add_argument(*args, **kwargs)
else:
self.parser.add_argument(*args, **kwargs)
return self.parser.add_argument(*args, **kwargs)
else:
kwargs["help"] = argparse.SUPPRESS
self.parser.add_argument(*args, **kwargs)
def modify_kwargs_for_default_detection(self, **kwargs: Any) -> Dict[str, Any]:
"""Modify an arg so we can check if it was set by the user.
Changes the parameters given to argparse when adding an argument
so we can properly detect if the value was set by the user.
:param dict kwargs: various argparse settings for this argument
:returns: a modified versions of kwargs
:rtype: dict
"""
action = kwargs.get("action", None)
if action not in EXIT_ACTIONS:
kwargs["action"] = ("store_true" if action in ZERO_ARG_ACTIONS else
"store")
kwargs["default"] = _Default()
for param in ARGPARSE_PARAMS_TO_REMOVE:
kwargs.pop(param, None)
return kwargs
return self.parser.add_argument(*args, **kwargs)
def add_deprecated_argument(self, argument_name: str, num_args: int) -> None:
"""Adds a deprecated argument with the name argument_name.

View File

@@ -615,15 +615,16 @@ class Client:
for path in cert_path, chain_path, fullchain_path:
util.make_or_verify_dir(os.path.dirname(path), 0o755, self.config.strict_permissions)
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)
cert_file, abs_cert_path = _open_pem_file(self.config, 'cert_path', cert_path)
try:
cert_file.write(cert_pem)
finally:
cert_file.close()
chain_file, abs_chain_path = _open_pem_file('chain_path', chain_path)
fullchain_file, abs_fullchain_path = _open_pem_file('fullchain_path', fullchain_path)
chain_file, abs_chain_path = _open_pem_file(self.config, 'chain_path', chain_path)
fullchain_file, abs_fullchain_path = _open_pem_file(
self.config, 'fullchain_path', fullchain_path)
_save_chain(chain_pem, chain_file)
_save_chain(cert_pem + chain_pem, fullchain_file)
@@ -847,7 +848,8 @@ def rollback(default_installer: str, checkpoints: int,
installer.restart()
def _open_pem_file(cli_arg_path: str, pem_path: str) -> Tuple[IO, str]:
def _open_pem_file(config: configuration.NamespaceConfig,
cli_arg_path: str, pem_path: str) -> Tuple[IO, str]:
"""Open a pem file.
If cli_arg_path was set by the client, open that.
@@ -859,7 +861,7 @@ def _open_pem_file(cli_arg_path: str, pem_path: str) -> Tuple[IO, str]:
:returns: a tuple of file object and its absolute file path
"""
if cli.set_by_cli(cli_arg_path):
if config.set_by_user(cli_arg_path):
return util.safe_open(pem_path, chmod=0o644, mode="wb"),\
os.path.abspath(pem_path)
uniq = util.unique_file(pem_path, 0o644, "wb")

View File

@@ -168,7 +168,7 @@ def _handle_unexpected_key_type_migration(config: configuration.NamespaceConfig,
# If both --key-type and --cert-name are provided, we consider the user's intent to
# be unambiguous: to change the key type of this lineage.
is_confirmed_via_cli = cli.set_by_cli("key_type") and cli.set_by_cli("certname")
is_confirmed_via_cli = config.set_by_user("key_type") and config.set_by_user("certname")
# Failing that, we interactively prompt the user to confirm the change.
if is_confirmed_via_cli or display_util.yesno(
@@ -181,7 +181,7 @@ def _handle_unexpected_key_type_migration(config: configuration.NamespaceConfig,
# If --key-type was set on the CLI but the user did not confirm the key type change using
# one of the two above methods, their intent is ambiguous. Error out.
if cli.set_by_cli("key_type"):
if config.set_by_user("key_type"):
raise errors.Error(
'Are you trying to change the key type of the certificate named '
f'{cert.lineagename} from {cur_key_type} to {new_key_type}? Please provide '
@@ -1126,13 +1126,13 @@ def _populate_from_certname(config: configuration.NamespaceConfig) -> configurat
if not lineage:
return config
if not config.key_path:
config.namespace.key_path = lineage.key_path
config.key_path = lineage.key_path
if not config.cert_path:
config.namespace.cert_path = lineage.cert_path
config.cert_path = lineage.cert_path
if not config.chain_path:
config.namespace.chain_path = lineage.chain_path
config.chain_path = lineage.chain_path
if not config.fullchain_path:
config.namespace.fullchain_path = lineage.fullchain_path
config.fullchain_path = lineage.fullchain_path
return config
@@ -1364,7 +1364,7 @@ def revoke(config: configuration.NamespaceConfig,
storage.renewal_file_for_certname(config, config.certname), config)
config.cert_path = lineage.cert_path
# --server takes priority over lineage.server
if lineage.server and not cli.set_by_cli("server"):
if lineage.server and not config.set_by_user("server"):
config.server = lineage.server
elif not config.cert_path or (config.cert_path and config.certname):
# intentionally not supporting --cert-path & --cert-name together,
@@ -1843,8 +1843,7 @@ def main(cli_args: Optional[List[str]] = None) -> Optional[Union[str, int]]:
misc.prepare_virtual_console()
# note: arg parser internally handles --help (and exits afterwards)
args = cli.prepare_and_parse_args(plugins, cli_args)
config = configuration.NamespaceConfig(args)
config = cli.prepare_and_parse_args(plugins, cli_args)
# On windows, shell without administrative right cannot create symlinks required by certbot.
# So we check the rights before continuing.

View File

@@ -127,11 +127,11 @@ def _restore_webroot_config(config: configuration.NamespaceConfig,
restoring logic is not able to correctly parse it from the serialized
form.
"""
if "webroot_map" in renewalparams and not cli.set_by_cli("webroot_map"):
if "webroot_map" in renewalparams and not config.set_by_user("webroot_map"):
config.webroot_map = renewalparams["webroot_map"]
# To understand why webroot_path and webroot_map processing are not mutually exclusive,
# see https://github.com/certbot/certbot/pull/7095
if "webroot_path" in renewalparams and not cli.set_by_cli("webroot_path"):
if "webroot_path" in renewalparams and not config.set_by_user("webroot_path"):
wp = renewalparams["webroot_path"]
if isinstance(wp, str): # prior to 0.1.0, webroot_path was a string
wp = [wp]
@@ -170,7 +170,7 @@ def _restore_plugin_configs(config: configuration.NamespaceConfig,
for plugin_prefix in set(plugin_prefixes):
plugin_prefix = plugin_prefix.replace('-', '_')
for config_item, config_value in renewalparams.items():
if config_item.startswith(plugin_prefix + "_") and not cli.set_by_cli(config_item):
if config_item.startswith(plugin_prefix + "_") and not config.set_by_user(config_item):
# Values None, True, and False need to be treated specially,
# As their types aren't handled correctly by configobj
if config_value in ("None", "True", "False"):
@@ -199,9 +199,9 @@ def restore_required_config_elements(config: configuration.NamespaceConfig,
zip(INT_CONFIG_ITEMS, itertools.repeat(_restore_int)),
zip(STR_CONFIG_ITEMS, itertools.repeat(_restore_str)))
for item_name, restore_func in required_items:
if item_name in renewalparams and not cli.set_by_cli(item_name):
if item_name in renewalparams and not config.set_by_user(item_name):
value = restore_func(item_name, renewalparams[item_name])
setattr(config.namespace, item_name, value)
setattr(config, item_name, value)
def _remove_deprecated_config_elements(renewalparams: Mapping[str, Any]) -> Dict[str, Any]:
@@ -339,7 +339,7 @@ def _avoid_reuse_key_conflicts(config: configuration.NamespaceConfig,
--new-key is also set.
"""
# If --no-reuse-key is set, no conflict
if cli.set_by_cli("reuse_key") and not config.reuse_key:
if config.set_by_user("reuse_key") and not config.reuse_key:
return
# If reuse_key is not set on the lineage and --reuse-key is not

View File

@@ -32,7 +32,6 @@ from certbot import errors
from certbot import interfaces
from certbot import ocsp
from certbot import util
from certbot._internal import cli
from certbot._internal import constants
from certbot._internal import error_handler
from certbot._internal.plugins import disco as plugins_disco
@@ -217,7 +216,7 @@ def update_configuration(lineagename: str, archive_dir: str, target: Mapping[str
os.unlink(temp_filename)
# Save only the config items that are relevant to renewal
values = relevant_values(vars(cli_config.namespace))
values = relevant_values(cli_config)
write_renewal_config(config_filename, temp_filename, archive_dir, target, values)
filesystem.replace(temp_filename, config_filename)
@@ -283,22 +282,23 @@ def _relevant(namespaces: Iterable[str], option: str) -> bool:
any(option.startswith(namespace) for namespace in namespaces))
def relevant_values(all_values: Mapping[str, Any]) -> Dict[str, Any]:
def relevant_values(config: configuration.NamespaceConfig) -> Dict[str, Any]:
"""Return a new dict containing only items relevant for renewal.
:param dict all_values: The original values.
:param .NamespaceConfig config: parsed command line
:returns: A new dictionary containing items that can be used in renewal.
:rtype dict:
"""
all_values = config.to_dict()
plugins = plugins_disco.PluginsRegistry.find_all()
namespaces = [plugins_common.dest_namespace(plugin) for plugin in plugins]
rv = {
option: value
for option, value in all_values.items()
if _relevant(namespaces, option) and cli.option_was_set(option, value)
if _relevant(namespaces, option) and config.set_by_user(option)
}
# We always save the server value to help with forward compatibility
# and behavioral consistency when versions of Certbot with different
@@ -1121,7 +1121,7 @@ class RenewableCert(interfaces.RenewableCert):
config_file.close()
# Save only the config items that are relevant to renewal
values = relevant_values(vars(cli_config.namespace))
values = relevant_values(cli_config)
new_config = write_renewal_config(config_filename, config_filename, archive,
target, values)

View File

@@ -236,7 +236,7 @@ class CertificatesTest(BaseCertManagerTest):
work_dir=os.path.join(empty_tempdir, "work"),
logs_dir=os.path.join(empty_tempdir, "logs"),
quiet=False
))
), {})
filesystem.makedirs(empty_config.renewal_configs_dir)
self._certificates(empty_config)

View File

@@ -5,6 +5,7 @@ from importlib import reload as reload_module
import io
import sys
import tempfile
from typing import Any, List
import unittest
from unittest import mock
@@ -12,8 +13,10 @@ import pytest
from acme import challenges
from certbot import errors
from certbot.configuration import ArgumentSource, NamespaceConfig
from certbot._internal import cli
from certbot._internal import constants
from certbot._internal.cli.cli_utils import flag_default
from certbot._internal.plugins import disco
from certbot.compat import filesystem
from certbot.compat import os
@@ -69,25 +72,31 @@ class FlagDefaultTest(unittest.TestCase):
assert cli.flag_default('logs_dir') == 'C:\\Certbot\\log'
def assert_set_by_user_with_value(namespace, attr: str, value: Any):
assert getattr(namespace, attr) == value
assert namespace.set_by_user(attr)
def assert_value_and_source(namespace, attr: str, value: Any, source: ArgumentSource):
assert getattr(namespace, attr) == value
assert namespace.argument_sources[attr] == source
class ParseTest(unittest.TestCase):
'''Test the cli args entrypoint'''
def setUp(self):
reload_module(cli)
@staticmethod
def _unmocked_parse(*args, **kwargs):
def _unmocked_parse(args: List[str]) -> NamespaceConfig:
"""Get result of cli.prepare_and_parse_args."""
return cli.prepare_and_parse_args(PLUGINS, *args, **kwargs)
return cli.prepare_and_parse_args(PLUGINS, args)
@staticmethod
def parse(*args, **kwargs):
def parse(args: List[str]) -> NamespaceConfig:
"""Mocks certbot._internal.display.obj.get_display and calls _unmocked_parse."""
with test_util.patch_display_util():
return ParseTest._unmocked_parse(*args, **kwargs)
return ParseTest._unmocked_parse(args)
def _help_output(self, args):
def _help_output(self, args: List[str]):
"Run a command, and return the output string for scrutiny"
output = io.StringIO()
@@ -100,7 +109,7 @@ class ParseTest(unittest.TestCase):
mock_get_utility().notification.side_effect = write_msg
with mock.patch('certbot._internal.main.sys.stderr'):
with pytest.raises(SystemExit):
self._unmocked_parse(args, output)
self._unmocked_parse(args)
return output.getvalue()
@@ -117,18 +126,19 @@ class ParseTest(unittest.TestCase):
mock_flag_default.side_effect = shim
namespace = self.parse(["certonly"])
assert namespace.domains == []
assert_value_and_source(namespace, 'domains', [], ArgumentSource.DEFAULT)
with open(tmp_config.name, 'w') as file_h:
file_h.write("domains = example.com")
namespace = self.parse(["certonly"])
assert namespace.domains == ["example.com"]
assert_value_and_source(namespace, 'domains', ["example.com"], ArgumentSource.CONFIG_FILE)
namespace = self.parse(["renew"])
assert namespace.domains == []
assert_value_and_source(namespace, 'domains', [], ArgumentSource.RUNTIME)
def test_no_args(self):
namespace = self.parse([])
for d in ('config_dir', 'logs_dir', 'work_dir'):
assert getattr(namespace, d) == cli.flag_default(d)
assert not namespace.set_by_user(d)
def test_install_abspath(self):
cert = 'cert'
@@ -226,35 +236,35 @@ class ParseTest(unittest.TestCase):
def test_parse_domains(self):
short_args = ['-d', 'example.com']
namespace = self.parse(short_args)
assert namespace.domains == ['example.com']
assert_set_by_user_with_value(namespace, 'domains', ['example.com'])
short_args = ['-d', 'trailing.period.com.']
namespace = self.parse(short_args)
assert namespace.domains == ['trailing.period.com']
assert_set_by_user_with_value(namespace, 'domains', ['trailing.period.com'])
short_args = ['-d', 'example.com,another.net,third.org,example.com']
namespace = self.parse(short_args)
assert namespace.domains == ['example.com', 'another.net',
'third.org']
assert_set_by_user_with_value(namespace, 'domains',
['example.com', 'another.net', 'third.org'])
long_args = ['--domains', 'example.com']
namespace = self.parse(long_args)
assert namespace.domains == ['example.com']
assert_set_by_user_with_value(namespace, 'domains', ['example.com'])
long_args = ['--domains', 'trailing.period.com.']
namespace = self.parse(long_args)
assert namespace.domains == ['trailing.period.com']
assert_set_by_user_with_value(namespace, 'domains', ['trailing.period.com'])
long_args = ['--domains', 'example.com,another.net,example.com']
namespace = self.parse(long_args)
assert namespace.domains == ['example.com', 'another.net']
assert_set_by_user_with_value(namespace, 'domains', ['example.com', 'another.net'])
def test_preferred_challenges(self):
short_args = ['--preferred-challenges', 'http, dns']
namespace = self.parse(short_args)
expected = [challenges.HTTP01.typ, challenges.DNS01.typ]
assert namespace.pref_challs == expected
assert_set_by_user_with_value(namespace, 'pref_challs', expected)
short_args = ['--preferred-challenges', 'jumping-over-the-moon']
# argparse.ArgumentError makes argparse print more information
@@ -265,13 +275,17 @@ class ParseTest(unittest.TestCase):
def test_server_flag(self):
namespace = self.parse('--server example.com'.split())
assert namespace.server == 'example.com'
assert_set_by_user_with_value(namespace, 'server', 'example.com')
def test_must_staple_flag(self):
short_args = ['--must-staple']
namespace = self.parse(short_args)
assert namespace.must_staple is True
assert namespace.staple is True
namespace = self.parse(['--must-staple'])
assert_set_by_user_with_value(namespace, 'must_staple', True)
assert_value_and_source(namespace, 'staple', True, ArgumentSource.RUNTIME)
def test_must_staple_and_staple_ocsp_flags(self):
namespace = self.parse(['--must-staple', '--staple-ocsp'])
assert_set_by_user_with_value(namespace, 'must_staple', True)
assert_set_by_user_with_value(namespace, 'staple', True)
def _check_server_conflict_message(self, parser_args, conflicting_args):
try:
@@ -287,24 +301,24 @@ class ParseTest(unittest.TestCase):
def test_staging_flag(self):
short_args = ['--staging']
namespace = self.parse(short_args)
assert namespace.staging is True
assert namespace.server == constants.STAGING_URI
assert_set_by_user_with_value(namespace, 'staging', True)
assert_set_by_user_with_value(namespace, 'server', constants.STAGING_URI)
short_args += '--server example.com'.split()
self._check_server_conflict_message(short_args, '--staging')
def _assert_dry_run_flag_worked(self, namespace, existing_account):
assert namespace.dry_run is True
assert namespace.break_my_certs is True
assert namespace.staging is True
assert namespace.server == constants.STAGING_URI
assert_set_by_user_with_value(namespace, 'dry_run', True)
assert_value_and_source(namespace, 'break_my_certs', True, ArgumentSource.RUNTIME)
assert_value_and_source(namespace, 'staging', True, ArgumentSource.RUNTIME)
assert_value_and_source(namespace, 'server', constants.STAGING_URI, ArgumentSource.RUNTIME)
if existing_account:
assert namespace.tos is True
assert namespace.register_unsafely_without_email is True
assert_value_and_source(namespace, 'tos', True, ArgumentSource.RUNTIME)
assert_value_and_source(namespace, 'register_unsafely_without_email', True, ArgumentSource.RUNTIME)
else:
assert namespace.tos is False
assert namespace.register_unsafely_without_email is False
assert_value_and_source(namespace, 'tos', False, ArgumentSource.DEFAULT)
assert_value_and_source(namespace, 'register_unsafely_without_email', False, ArgumentSource.DEFAULT)
def test_dry_run_flag(self):
config_dir = tempfile.mkdtemp()
@@ -330,51 +344,73 @@ class ParseTest(unittest.TestCase):
short_args += ['certonly']
# `--dry-run --server example.com` should emit example.com
assert self.parse(short_args + ['--server', 'example.com']).server == \
'example.com'
config = self.parse(short_args + ['--server', 'example.com'])
assert_set_by_user_with_value(config, 'server', 'example.com')
# `--dry-run --server STAGING_URI` should emit STAGING_URI
assert self.parse(short_args + ['--server', constants.STAGING_URI]).server == \
constants.STAGING_URI
config = self.parse(short_args + ['--server', constants.STAGING_URI])
assert_set_by_user_with_value(config, 'server', constants.STAGING_URI)
# `--dry-run --server LIVE` should emit STAGING_URI
assert self.parse(short_args + ['--server', cli.flag_default("server")]).server == \
constants.STAGING_URI
config = self.parse(short_args + ['--server', cli.flag_default("server")])
assert_value_and_source(config, 'server', constants.STAGING_URI, ArgumentSource.RUNTIME)
# `--dry-run --server example.com --staging` should emit an error
conflicts = ['--staging']
self._check_server_conflict_message(short_args + ['--server', 'example.com', '--staging'],
conflicts)
def test_option_was_set(self):
def test_user_set_rsa_key_size(self):
key_size_option = 'rsa_key_size'
key_size_value = cli.flag_default(key_size_option)
self.parse('--rsa-key-size {0}'.format(key_size_value).split())
config = self.parse('--rsa-key-size {0}'.format(key_size_value).split())
assert cli.option_was_set(key_size_option, key_size_value) is True
assert cli.option_was_set('no_verify_ssl', True) is True
assert config.set_by_user(key_size_option)
config_dir_option = 'config_dir'
assert not cli.option_was_set(
config_dir_option, cli.flag_default(config_dir_option))
assert not cli.option_was_set(
'authenticator', cli.flag_default('authenticator'))
assert not config.set_by_user(
config_dir_option)
assert not config.set_by_user('authenticator')
def test_ecdsa_key_option(self):
def test_user_set_installer_and_authenticator(self):
config = self.parse('--apache')
assert config.set_by_user('installer')
assert config.set_by_user('authenticator')
config = self.parse('--installer webroot')
assert config.set_by_user('installer')
assert not config.set_by_user('authenticator')
def test_user_set_ecdsa_key_option(self):
elliptic_curve_option = 'elliptic_curve'
elliptic_curve_option_value = cli.flag_default(elliptic_curve_option)
self.parse('--elliptic-curve {0}'.format(elliptic_curve_option_value).split())
assert cli.option_was_set(elliptic_curve_option, elliptic_curve_option_value) is True
config = self.parse('--elliptic-curve {0}'.format(elliptic_curve_option_value).split())
assert config.set_by_user(elliptic_curve_option)
def test_invalid_key_type(self):
def test_user_set_invalid_key_type(self):
key_type_option = 'key_type'
key_type_value = cli.flag_default(key_type_option)
self.parse('--key-type {0}'.format(key_type_value).split())
assert cli.option_was_set(key_type_option, key_type_value) is True
config = self.parse('--key-type {0}'.format(key_type_value).split())
assert config.set_by_user(key_type_option)
with pytest.raises(SystemExit):
self.parse("--key-type foo")
@mock.patch('certbot._internal.hooks.validate_hooks')
def test_user_set_deploy_hook(self, unused_mock_validate_hooks):
args = 'renew --deploy-hook foo'.split()
plugins = disco.PluginsRegistry.find_all()
config = cli.prepare_and_parse_args(plugins, args)
assert config.set_by_user('renew_hook')
@mock.patch('certbot._internal.plugins.webroot._validate_webroot')
def test_user_set_webroot_map(self, mock_validate_webroot):
args = 'renew -w /var/www/html -d example.com'.split()
mock_validate_webroot.return_value = '/var/www/html'
plugins = disco.PluginsRegistry.find_all()
config = cli.prepare_and_parse_args(plugins, args)
assert config.set_by_user('webroot_map')
def test_encode_revocation_reason(self):
for reason, code in constants.REVOCATION_REASONS.items():
namespace = self.parse(['--reason', reason])
@@ -399,15 +435,15 @@ class ParseTest(unittest.TestCase):
namespace = self.parse(["--renew-hook", value,
"--deploy-hook", value,
"--disable-hook-validation"])
assert namespace.deploy_hook == value
assert namespace.renew_hook == value
assert_set_by_user_with_value(namespace, 'deploy_hook', value)
assert_set_by_user_with_value(namespace, 'renew_hook', value)
def test_deploy_hook_sets_renew_hook(self):
value = "foo"
namespace = self.parse(
["--deploy-hook", value, "--disable-hook-validation"])
assert namespace.deploy_hook == value
assert namespace.renew_hook == value
assert_set_by_user_with_value(namespace, 'deploy_hook', value)
assert_set_by_user_with_value(namespace, 'renew_hook', value)
def test_renew_hook_conflict(self):
with mock.patch("certbot._internal.cli.sys.stderr"):
@@ -419,15 +455,15 @@ class ParseTest(unittest.TestCase):
namespace = self.parse(["--deploy-hook", value,
"--renew-hook", value,
"--disable-hook-validation"])
assert namespace.deploy_hook == value
assert namespace.renew_hook == value
assert_set_by_user_with_value(namespace, 'deploy_hook', value)
assert_set_by_user_with_value(namespace, 'renew_hook', value)
def test_renew_hook_does_not_set_renew_hook(self):
value = "foo"
namespace = self.parse(
["--renew-hook", value, "--disable-hook-validation"])
assert namespace.deploy_hook is None
assert namespace.renew_hook == value
assert_set_by_user_with_value(namespace, 'renew_hook', value)
def test_max_log_backups_error(self):
with mock.patch('certbot._internal.cli.sys.stderr'):
@@ -439,37 +475,40 @@ class ParseTest(unittest.TestCase):
def test_max_log_backups_success(self):
value = "42"
namespace = self.parse(["--max-log-backups", value])
assert namespace.max_log_backups == int(value)
assert_set_by_user_with_value(namespace, 'max_log_backups', int(value))
def test_unchanging_defaults(self):
namespace = self.parse([])
assert namespace.domains == []
assert namespace.pref_challs == []
assert_value_and_source(namespace, 'domains', [], ArgumentSource.DEFAULT)
assert_value_and_source(namespace, 'pref_challs', [], ArgumentSource.DEFAULT)
namespace.pref_challs = [challenges.HTTP01.typ]
namespace.domains = ['example.com']
namespace = self.parse([])
assert namespace.domains == []
assert namespace.pref_challs == []
assert_value_and_source(namespace, 'domains', [], ArgumentSource.DEFAULT)
assert_value_and_source(namespace, 'pref_challs', [], ArgumentSource.DEFAULT)
def test_no_directory_hooks_set(self):
assert not self.parse(["--no-directory-hooks"]).directory_hooks
namespace = self.parse(["--no-directory-hooks"])
assert_set_by_user_with_value(namespace, 'directory_hooks', False)
def test_no_directory_hooks_unset(self):
assert self.parse([]).directory_hooks is True
namespace = self.parse([])
assert_value_and_source(namespace, 'directory_hooks', True, ArgumentSource.DEFAULT)
def test_delete_after_revoke(self):
namespace = self.parse(["--delete-after-revoke"])
assert namespace.delete_after_revoke is True
assert_set_by_user_with_value(namespace, 'delete_after_revoke', True)
def test_delete_after_revoke_default(self):
namespace = self.parse([])
assert namespace.delete_after_revoke is None
assert not namespace.set_by_user('delete_after_revoke')
def test_no_delete_after_revoke(self):
namespace = self.parse(["--no-delete-after-revoke"])
assert namespace.delete_after_revoke is False
assert_set_by_user_with_value(namespace, 'delete_after_revoke', False)
def test_allow_subset_with_wildcard(self):
with pytest.raises(errors.Error):
@@ -480,50 +519,38 @@ class ParseTest(unittest.TestCase):
for topic in ['all', 'plugins', 'dns-route53']:
assert 'certbot-route53:auth' not in self._help_output([help_flag, topic])
def test_parse_args_hosts_and_auto_hosts(self):
with pytest.raises(errors.Error):
self.parse(['--hsts', '--auto-hsts'])
class DefaultTest(unittest.TestCase):
"""Tests for certbot._internal.cli._Default."""
def test_parse_with_multiple_argument_sources(self):
DEFAULT_VALUE = flag_default('server')
CONFIG_FILE_VALUE = 'configfile.biz'
COMMAND_LINE_VALUE = 'commandline.edu'
# check that the default is set
namespace = self.parse(['certonly'])
assert_value_and_source(namespace, 'server', DEFAULT_VALUE, ArgumentSource.DEFAULT)
def setUp(self):
# pylint: disable=protected-access
self.default1 = cli._Default()
self.default2 = cli._Default()
with tempfile.NamedTemporaryFile() as tmp_config:
tmp_config.close() # close now because of compatibility issues on Windows
with open(tmp_config.name, 'w') as file_h:
file_h.write(f'server = {CONFIG_FILE_VALUE}')
def test_boolean(self):
assert bool(self.default1) is False
assert bool(self.default2) is False
# first, just provide a value from a config file
namespace = self.parse([
'certonly',
'-c', tmp_config.name,
])
assert_value_and_source(namespace, 'server', CONFIG_FILE_VALUE, ArgumentSource.CONFIG_FILE)
def test_equality(self):
assert self.default1 == self.default2
def test_hash(self):
assert hash(self.default1) == hash(self.default2)
class SetByCliTest(unittest.TestCase):
"""Tests for certbot.set_by_cli and related functions."""
def setUp(self):
reload_module(cli)
def test_deploy_hook(self):
assert _call_set_by_cli(
'renew_hook', '--deploy-hook foo'.split(), 'renew')
def test_webroot_map(self):
args = '-w /var/www/html -d example.com'.split()
verb = 'renew'
assert _call_set_by_cli('webroot_map', args, verb) is True
def _call_set_by_cli(var, args, verb):
with mock.patch('certbot._internal.cli.helpful_parser') as mock_parser:
with test_util.patch_display_util():
mock_parser.args = args
mock_parser.verb = verb
return cli.set_by_cli(var)
# now provide config file + command line values
namespace = self.parse([
'certonly',
'-c', tmp_config.name,
'--server', COMMAND_LINE_VALUE,
])
assert_value_and_source(namespace, 'server', COMMAND_LINE_VALUE, ArgumentSource.COMMAND_LINE)
if __name__ == '__main__':

View File

@@ -7,7 +7,9 @@ import warnings
import pytest
from certbot import errors
from certbot._internal import cli
from certbot._internal import constants
from certbot._internal.plugins import disco
from certbot.compat import misc
from certbot.compat import os
from certbot.tests import util as test_util
@@ -24,10 +26,10 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.config.http01_port = 4321
def test_init_same_ports(self):
self.config.namespace.https_port = 4321
self.config.https_port = 4321
from certbot.configuration import NamespaceConfig
with pytest.raises(errors.Error):
NamespaceConfig(self.config.namespace)
NamespaceConfig(self.config.namespace, {})
def test_proxy_getattr(self):
assert self.config.foo == 'bar'
@@ -37,7 +39,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
assert ['acme-server.org:443', 'new'] == \
self.config.server_path.split(os.path.sep)
self.config.namespace.server = ('http://user:pass@acme.server:443'
self.config.server = ('http://user:pass@acme.server:443'
'/p/a/t/h;parameters?query#fragment')
assert ['user:pass@acme.server:443', 'p', 'a', 't', 'h'] == \
self.config.server_path.split(os.path.sep)
@@ -85,7 +87,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
mock_namespace.work_dir = work_base
mock_namespace.logs_dir = logs_base
mock_namespace.server = server
config = NamespaceConfig(mock_namespace)
config = NamespaceConfig(mock_namespace, {})
assert os.path.isabs(config.config_dir)
assert config.config_dir == \
@@ -130,7 +132,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
mock_namespace.config_dir = config_base
mock_namespace.work_dir = work_base
mock_namespace.logs_dir = logs_base
config = NamespaceConfig(mock_namespace)
config = NamespaceConfig(mock_namespace, {})
assert os.path.isabs(config.default_archive_dir)
assert os.path.isabs(config.live_dir)

View File

@@ -1,8 +0,0 @@
import pytest
from certbot._internal import cli
@pytest.fixture(autouse=True)
def reset_cli_global():
cli.set_by_cli.detector = None

View File

@@ -1,12 +1,8 @@
"""Tests for certbot.helpful_parser"""
import sys
from unittest import mock
import pytest
from certbot import errors
from certbot._internal import constants
from certbot._internal.cli import _DomainsAction
from certbot._internal.cli import HelpfulArgumentParser
@@ -77,7 +73,7 @@ class TestAdd:
arg_parser.add(None, "--hello-world")
parsed_args = arg_parser.parser.parse_args(['--hello-world',
'Hello World!'])
assert parsed_args.hello_world is 'Hello World!'
assert parsed_args.hello_world == 'Hello World!'
assert not hasattr(parsed_args, 'potato')
def test_add_expected_argument(self):
@@ -117,92 +113,5 @@ class TestAddGroup:
assert arg_parser.groups["certonly"] is False
class TestParseArgsErrors:
'''Tests for errors that should be met for some cases in parse_args method
in HelpfulArgumentParser'''
def test_parse_args_renew_force_interactive(self):
arg_parser = HelpfulArgumentParser(['renew', '--force-interactive'],
{})
arg_parser.add(
None, constants.FORCE_INTERACTIVE_FLAG, action="store_true")
with pytest.raises(errors.Error):
arg_parser.parse_args()
def test_parse_args_non_interactive_and_force_interactive(self):
arg_parser = HelpfulArgumentParser(['--force-interactive',
'--non-interactive'], {})
arg_parser.add(
None, constants.FORCE_INTERACTIVE_FLAG, action="store_true")
arg_parser.add(
None, "--non-interactive", dest="noninteractive_mode",
action="store_true"
)
with pytest.raises(errors.Error):
arg_parser.parse_args()
def test_parse_args_subset_names_wildcard_domain(self):
arg_parser = HelpfulArgumentParser(['--domain',
'*.example.com,potato.example.com',
'--allow-subset-of-names'], {})
# The following arguments are added because they have to be defined
# in order for arg_parser to run completely. They are not used for the
# test.
arg_parser.add(
None, constants.FORCE_INTERACTIVE_FLAG, action="store_true")
arg_parser.add(
None, "--non-interactive", dest="noninteractive_mode",
action="store_true")
arg_parser.add(
None, "--staging"
)
arg_parser.add(None, "--dry-run")
arg_parser.add(None, "--csr")
arg_parser.add(None, "--must-staple")
arg_parser.add(None, "--validate-hooks")
arg_parser.add(None, "-d", "--domain", dest="domains",
metavar="DOMAIN", action=_DomainsAction)
arg_parser.add(None, "--allow-subset-of-names")
# with self.assertRaises(errors.Error):
# arg_parser.parse_args()
def test_parse_args_hosts_and_auto_hosts(self):
arg_parser = HelpfulArgumentParser(['--hsts', '--auto-hsts'], {})
arg_parser.add(
None, "--hsts", action="store_true", dest="hsts")
arg_parser.add(
None, "--auto-hsts", action="store_true", dest="auto_hsts")
# The following arguments are added because they have to be defined
# in order for arg_parser to run completely. They are not used for the
# test.
arg_parser.add(
None, constants.FORCE_INTERACTIVE_FLAG, action="store_true")
arg_parser.add(
None, "--non-interactive", dest="noninteractive_mode",
action="store_true")
arg_parser.add(None, "--staging")
arg_parser.add(None, "--dry-run")
arg_parser.add(None, "--csr")
arg_parser.add(None, "--must-staple")
arg_parser.add(None, "--validate-hooks")
arg_parser.add(None, "--allow-subset-of-names")
with pytest.raises(errors.Error):
arg_parser.parse_args()
class TestAddDeprecatedArgument:
"""Tests for add_deprecated_argument method of HelpfulArgumentParser"""
@mock.patch.object(HelpfulArgumentParser, "modify_kwargs_for_default_detection")
def test_no_default_detection_modifications(self, mock_modify):
arg_parser = HelpfulArgumentParser(["run"], {}, detect_defaults=True)
arg_parser.add_deprecated_argument("--foo", 0)
arg_parser.parse_args()
mock_modify.assert_not_called()
if __name__ == '__main__':
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover

View File

@@ -21,7 +21,6 @@ import pytest
import pytz
from acme.messages import Error as acme_error
from certbot import configuration
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
@@ -86,9 +85,10 @@ class TestHandleCerts(unittest.TestCase):
assert mock_handle_migration.called
@mock.patch("certbot._internal.main.display_util.yesno")
@mock.patch("certbot._internal.main.cli.set_by_cli")
def test_handle_unexpected_key_type_migration(self, mock_set, mock_yesno):
def test_handle_unexpected_key_type_migration(self, mock_yesno):
config = mock.Mock()
mock_set = mock.Mock()
config.set_by_user = mock_set
cert = mock.Mock()
# If the key types do not differ, it should be a no-op.
@@ -163,8 +163,7 @@ class RunTest(test_util.ConfigTestCase):
def _call(self):
args = '-a webroot -i null -d {0}'.format(self.domain).split()
plugins = disco.PluginsRegistry.find_all()
config = configuration.NamespaceConfig(
cli.prepare_and_parse_args(plugins, args))
config = cli.prepare_and_parse_args(plugins, args)
from certbot._internal.main import run
run(config, plugins)
@@ -231,8 +230,7 @@ class CertonlyTest(unittest.TestCase):
def _call(self, args):
plugins = disco.PluginsRegistry.find_all()
config = configuration.NamespaceConfig(
cli.prepare_and_parse_args(plugins, args))
config = cli.prepare_and_parse_args(plugins, args)
with mock.patch('certbot._internal.main._init_le_client') as mock_init:
with mock.patch('certbot._internal.main._suggest_donation_if_appropriate'):
@@ -433,19 +431,11 @@ class RevokeTest(test_util.TempDirTestCase):
args = 'revoke --cert-path={0} '
args = args.format(self.tmp_cert_path).split()
plugins = disco.PluginsRegistry.find_all()
config = configuration.NamespaceConfig(
cli.prepare_and_parse_args(plugins, args))
config = cli.prepare_and_parse_args(plugins, args)
from certbot._internal.main import revoke
revoke(config, plugins)
def _mock_set_by_cli(self, mocked: mock.MagicMock, key: str, value: bool) -> None:
def set_by_cli(k: str) -> bool:
if key == k:
return value
return mock.DEFAULT
mocked.side_effect = set_by_cli
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.main.client.acme_client')
def test_revoke_with_reason(self, mock_acme_client,
@@ -467,11 +457,9 @@ class RevokeTest(test_util.TempDirTestCase):
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.cli.set_by_cli')
def test_revoke_by_certname(self, mock_set_by_cli, mock_acme_from_config,
def test_revoke_by_certname(self, mock_acme_from_config,
unused_mock_renewal_file_for_certname, mock_cert,
mock_delete_if_appropriate):
self._mock_set_by_cli(mock_set_by_cli, "server", False)
mock_acme_from_config.return_value = self.mock_acme_client
mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path,
server="https://acme.example")
@@ -486,12 +474,10 @@ class RevokeTest(test_util.TempDirTestCase):
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.cli.set_by_cli')
def test_revoke_by_certname_and_server(self, mock_set_by_cli, mock_acme_from_config,
def test_revoke_by_certname_and_server(self, mock_acme_from_config,
unused_mock_renewal_file_for_certname, mock_cert,
mock_delete_if_appropriate):
"""Revoking with --server should use the server from the CLI"""
self._mock_set_by_cli(mock_set_by_cli, "server", True)
mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path,
server="https://acme.example")
args = 'revoke --cert-name=example.com --server https://other.example'.split()
@@ -505,8 +491,7 @@ class RevokeTest(test_util.TempDirTestCase):
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.cli.set_by_cli')
def test_revoke_by_certname_empty_server(self, mock_set_by_cli, mock_acme_from_config,
def test_revoke_by_certname_empty_server(self, mock_acme_from_config,
unused_mock_renewal_file_for_certname,
mock_cert, mock_delete_if_appropriate):
"""Revoking with --cert-name where the lineage server is empty shouldn't crash """
@@ -599,8 +584,7 @@ class ReconfigureTest(test_util.TempDirTestCase):
def _call(self, passed_args):
full_args = passed_args + ['--config-dir', self.config_dir]
plugins = disco.PluginsRegistry.find_all()
config = configuration.NamespaceConfig(
cli.prepare_and_parse_args(plugins, full_args))
config = cli.prepare_and_parse_args(plugins, full_args)
from certbot._internal.main import reconfigure
reconfigure(config, plugins)
@@ -616,6 +600,12 @@ class ReconfigureTest(test_util.TempDirTestCase):
@mock.patch('certbot._internal.cert_manager.get_certnames')
def test_asks_for_certname(self, mock_cert_manager):
named_mock = mock.Mock()
named_mock.name = 'nginx'
self.mocks['pick_installer'].return_value = named_mock
self.mocks['pick_auth'].return_value = named_mock
self.mocks['find_init'].return_value = named_mock
mock_cert_manager.return_value = ['example.com']
self._call('--nginx'.split())
assert mock_cert_manager.call_count == 1
@@ -774,7 +764,7 @@ class DeleteIfAppropriateTest(test_util.ConfigTestCase):
mock_match_and_check_overlaps, mock_renewal_file_for_certname):
# pylint: disable = unused-argument
config = self.config
config.namespace.noninteractive_mode = True
config.noninteractive_mode = True
config.cert_path = "/some/reasonable/path"
config.certname = ""
mock_cert_path_to_lineage.return_value = "example.com"
@@ -793,7 +783,7 @@ class DeleteIfAppropriateTest(test_util.ConfigTestCase):
mock_cert_path_to_lineage, mock_full_archive_dir,
mock_match_and_check_overlaps, mock_renewal_file_for_certname):
config = self.config
config.namespace.delete_after_revoke = True
config.delete_after_revoke = True
config.cert_path = "/some/reasonable/path"
config.certname = ""
mock_cert_path_to_lineage.return_value = "example.com"
@@ -1932,8 +1922,7 @@ class EnhanceTest(test_util.ConfigTestCase):
def _call(self, args):
plugins = disco.PluginsRegistry.find_all()
config = configuration.NamespaceConfig(
cli.prepare_and_parse_args(plugins, args))
config = cli.prepare_and_parse_args(plugins, args)
with mock.patch('certbot._internal.cert_manager.get_certnames') as mock_certs:
mock_certs.return_value = ['example.com']

View File

@@ -222,8 +222,7 @@ class TestChooseConfiguratorPlugins(unittest.TestCase):
def _parseArgs(self, args):
from certbot import configuration
from certbot._internal import cli
return configuration.NamespaceConfig(
cli.prepare_and_parse_args(self.plugins, args.split()))
return cli.prepare_and_parse_args(self.plugins, args.split())
def setUp(self):
self.plugins = PluginsRegistry({

View File

@@ -14,15 +14,15 @@ import certbot.tests.util as test_util
class RenewalTest(test_util.ConfigTestCase):
@mock.patch('certbot._internal.cli.set_by_cli')
def test_ancient_webroot_renewal_conf(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_ancient_webroot_renewal_conf(self, mock_set_by_user):
mock_set_by_user.return_value = False
rc_path = test_util.make_lineage(
self.config.config_dir, 'sample-renewal-ancient.conf')
self.config.account = None
self.config.email = None
self.config.webroot_path = None
config = configuration.NamespaceConfig(self.config)
config = configuration.NamespaceConfig(self.config, {})
lineage = storage.RenewableCert(rc_path, config)
renewalparams = lineage.configuration['renewalparams']
# pylint: disable=protected-access
@@ -30,13 +30,13 @@ class RenewalTest(test_util.ConfigTestCase):
renewal._restore_webroot_config(config, renewalparams)
assert config.webroot_path == ['/var/www/']
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_webroot_params_conservation(self, mock_set_by_cli):
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_webroot_params_conservation(self, mock_set_by_user):
# For more details about why this test is important, see:
# certbot._internal.plugins.webroot_test::
# WebrootActionTest::test_webroot_map_partial_without_perform
from certbot._internal import renewal
mock_set_by_cli.return_value = False
mock_set_by_user.return_value = False
renewalparams = {
'webroot_map': {'test.example.com': '/var/www/test'},
@@ -59,7 +59,7 @@ class RenewalTest(test_util.ConfigTestCase):
self.config.elliptic_curve = 'INVALID_VALUE'
self.config.reuse_key = True
self.config.dry_run = True
config = configuration.NamespaceConfig(self.config)
config = configuration.NamespaceConfig(self.config, {})
rc_path = test_util.make_lineage(
self.config.config_dir, 'sample-renewal.conf')
@@ -81,7 +81,7 @@ class RenewalTest(test_util.ConfigTestCase):
self.config.reuse_key = True
self.config.dry_run = True
self.config.key_type = 'ecdsa'
config = configuration.NamespaceConfig(self.config)
config = configuration.NamespaceConfig(self.config, {})
rc_path = test_util.make_lineage(
self.config.config_dir,
@@ -100,15 +100,15 @@ class RenewalTest(test_util.ConfigTestCase):
assert self.config.elliptic_curve == 'secp256r1'
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_new_key(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_new_key(self, mock_set_by_user):
mock_set_by_user.return_value = False
# When renewing with both reuse_key and new_key, the key should be regenerated,
# the key type, key parameters and reuse_key should be kept.
self.config.reuse_key = True
self.config.new_key = True
self.config.dry_run = True
config = configuration.NamespaceConfig(self.config)
config = configuration.NamespaceConfig(self.config, {})
rc_path = test_util.make_lineage(
self.config.config_dir, 'sample-renewal.conf')
@@ -129,9 +129,9 @@ class RenewalTest(test_util.ConfigTestCase):
le_client.obtain_certificate.assert_called_with(mock.ANY, None)
@mock.patch('certbot._internal.renewal.hooks.renew_hook')
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_reuse_key_conflicts(self, mock_set_by_cli, unused_mock_renew_hook):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_reuse_key_conflicts(self, mock_set_by_user, unused_mock_renew_hook):
mock_set_by_user.return_value = False
# When renewing with reuse_key and a conflicting key parameter (size, curve)
# an error should be raised ...
@@ -140,7 +140,7 @@ class RenewalTest(test_util.ConfigTestCase):
self.config.rsa_key_size = 4096
self.config.dry_run = True
config = configuration.NamespaceConfig(self.config)
config = configuration.NamespaceConfig(self.config, {})
rc_path = test_util.make_lineage(
self.config.config_dir, 'sample-renewal.conf')
@@ -156,15 +156,15 @@ class RenewalTest(test_util.ConfigTestCase):
renewal.renew_cert(self.config, None, le_client, lineage)
# ... unless --no-reuse-key is set
mock_set_by_cli.side_effect = lambda var: var == "reuse_key"
mock_set_by_user.side_effect = lambda var: var == "reuse_key"
self.config.reuse_key = False
renewal.renew_cert(self.config, None, le_client, lineage)
@test_util.patch_display_util()
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_remove_deprecated_config_elements(self, mock_set_by_cli, unused_mock_get_utility):
mock_set_by_cli.return_value = False
config = configuration.NamespaceConfig(self.config)
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_remove_deprecated_config_elements(self, mock_set_by_user, unused_mock_get_utility):
mock_set_by_user.return_value = False
config = configuration.NamespaceConfig(self.config, {})
config.certname = "sample-renewal-deprecated-option"
rc_path = test_util.make_lineage(
@@ -177,9 +177,9 @@ class RenewalTest(test_util.ConfigTestCase):
# value in the renewal conf file
assert isinstance(lineage_config.manual_public_ip_logging_ok, mock.MagicMock)
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_absent_key_type_restored(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_absent_key_type_restored(self, mock_set_by_user):
mock_set_by_user.return_value = False
rc_path = test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf', ec=False)
@@ -196,60 +196,60 @@ class RestoreRequiredConfigElementsTest(test_util.ConfigTestCase):
from certbot._internal.renewal import restore_required_config_elements
return restore_required_config_elements(*args, **kwargs)
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_allow_subset_of_names_success(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_allow_subset_of_names_success(self, mock_set_by_user):
mock_set_by_user.return_value = False
self._call(self.config, {'allow_subset_of_names': 'True'})
assert self.config.allow_subset_of_names is True
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_allow_subset_of_names_failure(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_allow_subset_of_names_failure(self, mock_set_by_user):
mock_set_by_user.return_value = False
renewalparams = {'allow_subset_of_names': 'maybe'}
with pytest.raises(errors.Error):
self._call(self.config, renewalparams)
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_pref_challs_list(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_pref_challs_list(self, mock_set_by_user):
mock_set_by_user.return_value = False
renewalparams = {'pref_challs': 'http-01, dns'.split(',')}
self._call(self.config, renewalparams)
expected = [challenges.HTTP01.typ, challenges.DNS01.typ]
assert self.config.pref_challs == expected
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_pref_challs_str(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_pref_challs_str(self, mock_set_by_user):
mock_set_by_user.return_value = False
renewalparams = {'pref_challs': 'dns'}
self._call(self.config, renewalparams)
expected = [challenges.DNS01.typ]
assert self.config.pref_challs == expected
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_pref_challs_failure(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_pref_challs_failure(self, mock_set_by_user):
mock_set_by_user.return_value = False
renewalparams = {'pref_challs': 'finding-a-shrubbery'}
with pytest.raises(errors.Error):
self._call(self.config, renewalparams)
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_must_staple_success(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_must_staple_success(self, mock_set_by_user):
mock_set_by_user.return_value = False
self._call(self.config, {'must_staple': 'True'})
assert self.config.must_staple is True
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_must_staple_failure(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_must_staple_failure(self, mock_set_by_user):
mock_set_by_user.return_value = False
renewalparams = {'must_staple': 'maybe'}
with pytest.raises(errors.Error):
self._call(self.config, renewalparams)
@mock.patch('certbot._internal.renewal.cli.set_by_cli')
def test_ancient_server_renewal_conf(self, mock_set_by_cli):
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
def test_ancient_server_renewal_conf(self, mock_set_by_user):
from certbot._internal import constants
self.config.server = None
mock_set_by_cli.return_value = False
mock_set_by_user.return_value = False
self._call(self.config, {'server': constants.V1_URI})
assert self.config.server == constants.CLI_DEFAULTS['server']

View File

@@ -12,6 +12,7 @@ import pytest
import pytz
import certbot
from certbot import configuration
from certbot import errors
from certbot._internal.storage import ALL_FOUR
from certbot.compat import filesystem
@@ -39,23 +40,24 @@ class RelevantValuesTest(unittest.TestCase):
def setUp(self):
self.values = {"server": "example.org", "key_type": "rsa"}
self.mock_config = mock.MagicMock()
self.mock_config.set_by_user = mock.MagicMock()
def _call(self, *args, **kwargs):
def _call(self, values):
from certbot._internal.storage import relevant_values
return relevant_values(*args, **kwargs)
self.mock_config.to_dict.return_value = values
return relevant_values(self.mock_config)
@mock.patch("certbot._internal.cli.option_was_set")
@mock.patch("certbot._internal.plugins.disco.PluginsRegistry.find_all")
def test_namespace(self, mock_find_all, mock_option_was_set):
def test_namespace(self, mock_find_all):
mock_find_all.return_value = ["certbot-foo:bar"]
mock_option_was_set.return_value = True
self.mock_config.set_by_user.return_value = True
self.values["certbot_foo:bar_baz"] = 42
assert self._call(self.values.copy()) == self.values
@mock.patch("certbot._internal.cli.option_was_set")
def test_option_set(self, mock_option_was_set):
mock_option_was_set.return_value = True
def test_option_set(self):
self.mock_config.set_by_user.return_value = True
self.values["allow_subset_of_names"] = True
self.values["authenticator"] = "apache"
@@ -65,26 +67,46 @@ class RelevantValuesTest(unittest.TestCase):
assert self._call(self.values) == expected_relevant_values
@mock.patch("certbot._internal.cli.option_was_set")
def test_option_unset(self, mock_option_was_set):
mock_option_was_set.return_value = False
def test_option_unset(self):
self.mock_config.set_by_user.return_value = False
expected_relevant_values = self.values.copy()
self.values["rsa_key_size"] = 2048
assert self._call(self.values) == expected_relevant_values
@mock.patch("certbot._internal.cli.set_by_cli")
def test_deprecated_item(self, unused_mock_set_by_cli):
def test_deprecated_item(self):
deprected_option = 'manual_public_ip_logging_ok'
self.mock_config.set_by_user = lambda v: False if v == deprected_option else True
# deprecated items should never be relevant to store
expected_relevant_values = self.values.copy()
self.values["manual_public_ip_logging_ok"] = None
self.values[deprected_option] = None
assert self._call(self.values) == expected_relevant_values
self.values["manual_public_ip_logging_ok"] = True
self.values[deprected_option] = True
assert self._call(self.values) == expected_relevant_values
self.values["manual_public_ip_logging_ok"] = False
self.values[deprected_option] = False
assert self._call(self.values) == expected_relevant_values
def test_with_real_parser(self):
from certbot._internal.storage import relevant_values
from certbot._internal.plugins import disco
from certbot._internal import cli
from certbot._internal import constants
PLUGINS = disco.PluginsRegistry.find_all()
namespace = cli.prepare_and_parse_args(PLUGINS, [
'--allow-subset-of-names',
'--authenticator', 'apache',
])
expected_relevant_values = {
'server': constants.CLI_DEFAULTS['server'],
'key_type': 'ecdsa',
'allow_subset_of_names': True,
'authenticator': 'apache',
}
assert relevant_values(namespace) == expected_relevant_values
class BaseRenewableCertTest(test_util.ConfigTestCase):
"""Base class for setting up Renewable Cert tests.
@@ -419,9 +441,9 @@ class RenewableCertTests(BaseRenewableCertTest):
with pytest.raises(errors.CertStorageError):
self.test_rc.names()
@mock.patch("certbot._internal.storage.cli")
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.storage.datetime")
def test_time_interval_judgments(self, mock_datetime, mock_cli):
def test_time_interval_judgments(self, mock_datetime, mock_set_by_user):
"""Test should_autorenew() on the basis of expiry time windows."""
test_cert = test_util.load_vector("cert_512.pem")
@@ -435,7 +457,7 @@ class RenewableCertTests(BaseRenewableCertTest):
f.write(test_cert)
mock_datetime.timedelta = datetime.timedelta
mock_cli.set_by_cli.return_value = False
mock_set_by_user.return_value = False
self.test_rc.configuration["renewalparams"] = {}
for (current_time, interval, result) in [
@@ -473,12 +495,12 @@ class RenewableCertTests(BaseRenewableCertTest):
self.test_rc.configuration["renewalparams"]["autorenew"] = "False"
assert not self.test_rc.autorenewal_is_enabled()
@mock.patch("certbot._internal.storage.cli")
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked")
def test_should_autorenew(self, mock_ocsp, mock_cli):
def test_should_autorenew(self, mock_ocsp, mock_set_by_user):
"""Test should_autorenew on the basis of reasons other than
expiry time window."""
mock_cli.set_by_cli.return_value = False
mock_set_by_user.return_value = False
# Autorenewal turned off
self.test_rc.configuration["renewalparams"] = {"autorenew": "False"}
assert not self.test_rc.should_autorenew()
@@ -494,7 +516,7 @@ class RenewableCertTests(BaseRenewableCertTest):
def test_save_successor(self, mock_rv):
# Mock relevant_values() to claim that all values are relevant here
# (to avoid instantiating parser)
mock_rv.side_effect = lambda x: x
mock_rv.side_effect = lambda x: x.to_dict()
for ver in range(1, 6):
for kind in ALL_FOUR:
@@ -553,7 +575,7 @@ class RenewableCertTests(BaseRenewableCertTest):
def test_save_successor_maintains_group_mode(self, mock_rv):
# Mock relevant_values() to claim that all values are relevant here
# (to avoid instantiating parser)
mock_rv.side_effect = lambda x: x
mock_rv.side_effect = lambda x: x.to_dict()
for kind in ALL_FOUR:
self._write_out_kind(kind, 1)
self.test_rc.update_all_links_to(1)
@@ -575,7 +597,7 @@ class RenewableCertTests(BaseRenewableCertTest):
def test_save_successor_maintains_gid(self, mock_ownership, mock_rv):
# Mock relevant_values() to claim that all values are relevant here
# (to avoid instantiating parser)
mock_rv.side_effect = lambda x: x
mock_rv.side_effect = lambda x: x.to_dict()
for kind in ALL_FOUR:
self._write_out_kind(kind, 1)
self.test_rc.update_all_links_to(1)
@@ -589,7 +611,7 @@ class RenewableCertTests(BaseRenewableCertTest):
"""Test for new_lineage() class method."""
# Mock relevant_values to say everything is relevant here (so we
# don't have to mock the parser to help it decide!)
mock_rv.side_effect = lambda x: x
mock_rv.side_effect = lambda x: x.to_dict()
from certbot._internal import storage
result = storage.RenewableCert.new_lineage(
@@ -643,7 +665,7 @@ class RenewableCertTests(BaseRenewableCertTest):
"""Test that directories can be created if they don't exist."""
# Mock relevant_values to say everything is relevant here (so we
# don't have to mock the parser to help it decide!)
mock_rv.side_effect = lambda x: x
mock_rv.side_effect = lambda x: x.to_dict()
from certbot._internal import storage
shutil.rmtree(self.config.renewal_configs_dir)

View File

@@ -1,7 +1,10 @@
"""Certbot user-supplied configuration."""
import argparse
import copy
import enum
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from urllib import parse
@@ -14,6 +17,24 @@ from certbot.compat import misc
from certbot.compat import os
logger = logging.getLogger(__name__)
class ArgumentSource(enum.Enum):
"""Enum for describing where a configuration argument was set."""
COMMAND_LINE = enum.auto()
"""Argument was specified on the command line"""
CONFIG_FILE = enum.auto()
"""Argument was specified in a .ini config file"""
DEFAULT = enum.auto()
"""Argument was not set by the user, and was assigned its default value"""
ENV_VAR = enum.auto()
"""Argument was specified in an environment variable"""
RUNTIME = enum.auto()
"""Argument was set at runtime by certbot"""
class NamespaceConfig:
"""Configuration wrapper around :class:`argparse.Namespace`.
@@ -38,13 +59,17 @@ class NamespaceConfig:
:ivar namespace: Namespace typically produced by
:meth:`argparse.ArgumentParser.parse_args`.
:type namespace: :class:`argparse.Namespace`
:ivar argument_sources: dictionary of argument names to their :class:`ArgumentSource`
:type argument_sources: :class:`Dict[str, ArgumentSource]`
"""
def __init__(self, namespace: argparse.Namespace) -> None:
def __init__(self, namespace: argparse.Namespace,
argument_sources: Dict[str, ArgumentSource]) -> None:
self.namespace: argparse.Namespace
# Avoid recursion loop because of the delegation defined in __setattr__
object.__setattr__(self, 'namespace', namespace)
object.__setattr__(self, 'argument_sources', argument_sources)
self.namespace.config_dir = os.path.abspath(self.namespace.config_dir)
self.namespace.work_dir = os.path.abspath(self.namespace.work_dir)
@@ -53,12 +78,61 @@ class NamespaceConfig:
# Check command line parameters sanity, and error out in case of problem.
_check_config_sanity(self)
def set_by_user(self, var: str) -> bool:
"""
Return True if a particular config variable has been set by the user
(via CLI or config file) including if the user explicitly set it to the
default, or if it was dynamically set at runtime. Returns False if the
variable was assigned a default value.
"""
from certbot._internal.cli.cli_constants import DEPRECATED_OPTIONS
from certbot._internal.cli.cli_constants import VAR_MODIFIERS
from certbot._internal.plugins import selection
# We should probably never actually hit this code. But if we do,
# a deprecated option has logically never been set by the CLI.
if var in DEPRECATED_OPTIONS:
return False
if var in ['authenticator', 'installer']:
auth, inst = selection.cli_plugin_requests(self)
if var == 'authenticator':
return auth is not None
if var == 'installer':
return inst is not None
if var in self.argument_sources and self.argument_sources[var] != ArgumentSource.DEFAULT:
logger.debug("Var %s=%s (set by user).", var, getattr(self, var))
return True
for modifier in VAR_MODIFIERS.get(var, []):
if self.set_by_user(modifier):
logger.debug("Var %s=%s (set by user).",
var, VAR_MODIFIERS.get(var, []))
return True
return False
def to_dict(self) -> Dict[str, Any]:
"""
Returns a dictionary mapping all argument names to their values
"""
return vars(self.namespace)
def _mark_runtime_override(self, name: str) -> None:
"""
Overwrites an argument's source to be ArgumentSource.RUNTIME. Used when certbot sets an
argument's values at runtime.
"""
self.argument_sources[name] = ArgumentSource.RUNTIME
# Delegate any attribute not explicitly defined to the underlying namespace object.
def __getattr__(self, name: str) -> Any:
return getattr(self.namespace, name)
def __setattr__(self, name: str, value: Any) -> None:
self._mark_runtime_override(name)
setattr(self.namespace, name, value)
@property
@@ -68,6 +142,7 @@ class NamespaceConfig:
@server.setter
def server(self, server_: str) -> None:
self._mark_runtime_override('server')
self.namespace.server = server_
@property
@@ -81,6 +156,7 @@ class NamespaceConfig:
@email.setter
def email(self, mail: str) -> None:
self._mark_runtime_override('email')
self.namespace.email = mail
@property
@@ -91,6 +167,7 @@ class NamespaceConfig:
@rsa_key_size.setter
def rsa_key_size(self, ksize: int) -> None:
"""Set the rsa_key_size property"""
self._mark_runtime_override('rsa_key_size')
self.namespace.rsa_key_size = ksize
@property
@@ -104,6 +181,7 @@ class NamespaceConfig:
@elliptic_curve.setter
def elliptic_curve(self, ecurve: str) -> None:
"""Set the elliptic_curve property"""
self._mark_runtime_override('elliptic_curve')
self.namespace.elliptic_curve = ecurve
@property
@@ -117,6 +195,7 @@ class NamespaceConfig:
@key_type.setter
def key_type(self, ktype: str) -> None:
"""Set the key_type property"""
self._mark_runtime_override('key_type')
self.namespace.key_type = ktype
@property
@@ -322,7 +401,8 @@ class NamespaceConfig:
# Work around https://bugs.python.org/issue1515 for py26 tests :( :(
# https://travis-ci.org/letsencrypt/letsencrypt/jobs/106900743#L3276
new_ns = copy.deepcopy(self.namespace)
return type(self)(new_ns)
new_sources = copy.deepcopy(self.argument_sources)
return type(self)(new_ns, new_sources)
def _check_config_sanity(config: NamespaceConfig) -> None:

View File

@@ -395,7 +395,8 @@ class ConfigTestCase(TempDirTestCase):
def setUp(self) -> None:
super().setUp()
self.config = configuration.NamespaceConfig(
mock.MagicMock(**constants.CLI_DEFAULTS)
mock.MagicMock(**constants.CLI_DEFAULTS),
{},
)
self.config.namespace.verb = "certonly"
self.config.namespace.config_dir = os.path.join(self.tempdir, 'config')

View File

@@ -2,7 +2,7 @@
# that script.
apacheconfig==0.3.2 ; python_full_version < "3.8.0" and python_version >= "3.7"
asn1crypto==0.24.0 ; python_full_version >= "3.7.0" and python_full_version < "3.8.0"
astroid==2.15.2 ; python_full_version >= "3.7.2" and python_full_version < "3.8.0"
astroid==2.15.3 ; python_full_version >= "3.7.2" and python_full_version < "3.8.0"
boto3==1.15.15 ; python_full_version < "3.8.0" and python_version >= "3.7"
botocore==1.18.15 ; python_full_version < "3.8.0" and python_version >= "3.7"
cachetools==5.3.0 ; python_version >= "3.7" and python_full_version < "3.8.0"
@@ -11,7 +11,7 @@ cffi==1.11.5 ; python_full_version < "3.8.0" and python_version >= "3.7"
chardet==3.0.4 ; python_full_version < "3.8.0" and python_version >= "3.7"
cloudflare==1.5.1 ; python_full_version < "3.8.0" and python_version >= "3.7"
colorama==0.4.6 ; python_full_version < "3.8.0" and sys_platform == "win32" and python_version >= "3.7"
configargparse==0.10.0 ; python_full_version < "3.8.0" and python_version >= "3.7"
configargparse==1.5.3 ; python_full_version < "3.8.0" and python_version >= "3.7"
configobj==5.0.6 ; python_full_version < "3.8.0" and python_version >= "3.7"
coverage==7.2.3 ; python_version >= "3.7" and python_full_version < "3.8.0"
cryptography==3.2.1 ; python_full_version < "3.8.0" and python_version >= "3.7"
@@ -86,7 +86,7 @@ types-python-dateutil==2.8.19.12 ; python_version >= "3.7" and python_full_versi
types-pytz==2023.3.0.0 ; python_version >= "3.7" and python_full_version < "3.8.0"
types-pywin32==306.0.0.1 ; python_version >= "3.7" and python_full_version < "3.8.0"
types-requests==2.28.11.17 ; python_version >= "3.7" and python_full_version < "3.8.0"
types-setuptools==67.6.0.7 ; python_version >= "3.7" and python_full_version < "3.8.0"
types-setuptools==67.6.0.8 ; python_version >= "3.7" and python_full_version < "3.8.0"
types-six==1.16.21.8 ; python_version >= "3.7" and python_full_version < "3.8.0"
types-urllib3==1.26.25.10 ; python_version >= "3.7" and python_full_version < "3.8.0"
typing-extensions==4.5.0 ; python_version < "3.8" and python_version >= "3.7"

View File

@@ -43,7 +43,7 @@ acme = {path = "../../../acme", extras = ["test"]}
# dependency should be updated in our setup.py files as well to communicate
# this information to our users.
ConfigArgParse = "0.10.0"
ConfigArgParse = "1.5.3"
apacheconfig = "0.3.2"
asn1crypto = "0.24.0"
boto3 = "1.15.15"