diff --git a/acme/acme/messages.py b/acme/acme/messages.py index 0dd579a98..80dacb674 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -335,7 +335,7 @@ class Registration(ResourceBody): @classmethod def from_data(cls, phone: Optional[str] = None, email: Optional[str] = None, - external_account_binding: Optional[ExternalAccountBinding] = None, + external_account_binding: Optional[Dict[str, Any]] = None, **kwargs: Any) -> 'Registration': """ Create registration resource from contact details. diff --git a/certbot/certbot/_internal/cli/__init__.py b/certbot/certbot/_internal/cli/__init__.py index e7a1de49b..63b0dd097 100644 --- a/certbot/certbot/_internal/cli/__init__.py +++ b/certbot/certbot/_internal/cli/__init__.py @@ -4,7 +4,10 @@ import argparse import logging import logging.handlers import sys +from typing import Any +from typing import List from typing import Optional +from typing import Type import certbot from certbot._internal import constants @@ -40,9 +43,9 @@ from certbot._internal.cli.plugins_parsing import _plugins_parsing from certbot._internal.cli.subparsers import _create_subparsers from certbot._internal.cli.verb_help import VERB_HELP from certbot._internal.cli.verb_help import VERB_HELP_MAP -from certbot.plugins import enhancements from certbot._internal.plugins import disco as plugins_disco import certbot._internal.plugins.selection as plugin_selection +from certbot.plugins import enhancements logger = logging.getLogger(__name__) @@ -51,7 +54,8 @@ logger = logging.getLogger(__name__) helpful_parser: Optional[HelpfulArgumentParser] = None -def prepare_and_parse_args(plugins, args, detect_defaults=False): +def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[str], + detect_defaults: bool = False) -> argparse.Namespace: """Returns parsed command line arguments. :param .PluginsRegistry plugins: available plugins @@ -443,7 +447,7 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): return helpful.parse_args() -def set_by_cli(var): +def set_by_cli(var: str) -> bool: """ Return True if a particular config variable has been set by the user (CLI or config file) including if the user explicitly set it to the @@ -487,7 +491,7 @@ def set_by_cli(var): set_by_cli.detector = None # type: ignore -def has_default_value(option, value): +def has_default_value(option: str, value: Any) -> bool: """Does option have the default value? If the default value of option is not known, False is returned. @@ -505,7 +509,7 @@ def has_default_value(option, value): return False -def option_was_set(option, value): +def option_was_set(option: str, value: Any) -> bool: """Was option set by the user or does it differ from the default? :param str option: configuration variable being considered @@ -521,7 +525,7 @@ def option_was_set(option, value): return set_by_cli(option) or not has_default_value(option, value) -def argparse_type(variable): +def argparse_type(variable: Any) -> Type: """Return our argparse type function for a config variable (default: str)""" # pylint: disable=protected-access if helpful_parser is not None: diff --git a/certbot/certbot/_internal/cli/cli_utils.py b/certbot/certbot/_internal/cli/cli_utils.py index 7c859509c..5f3267eb0 100644 --- a/certbot/certbot/_internal/cli/cli_utils.py +++ b/certbot/certbot/_internal/cli/cli_utils.py @@ -2,6 +2,14 @@ import argparse import copy import inspect +from typing import Any +from typing import Iterable +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import TYPE_CHECKING +from typing import Union from acme import challenges from certbot import configuration @@ -10,24 +18,27 @@ from certbot import util from certbot._internal import constants from certbot.compat import os +if TYPE_CHECKING: + from certbot._internal.cli import helpful + class _Default: """A class to use as a default to detect if a value is set by a user""" - def __bool__(self): + def __bool__(self) -> bool: return False - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return isinstance(other, _Default) - def __hash__(self): + def __hash__(self) -> int: return id(_Default) - def __nonzero__(self): + def __nonzero__(self) -> bool: return self.__bool__() -def read_file(filename, mode="rb"): +def read_file(filename: str, mode: str = "rb") -> Tuple[str, Any]: """Returns the given file's contents. :param str filename: path to file @@ -48,7 +59,7 @@ def read_file(filename, mode="rb"): raise argparse.ArgumentTypeError(exc.strerror) -def flag_default(name): +def flag_default(name: str) -> Any: """Default value for CLI flag.""" # XXX: this is an internal housekeeping notion of defaults before # argparse has been set up; it is not accurate for all flags. Call it @@ -57,7 +68,7 @@ def flag_default(name): return copy.deepcopy(constants.CLI_DEFAULTS[name]) -def config_help(name, hidden=False): +def config_help(name: str, hidden: bool = False) -> Optional[str]: """Extract the help message for a `configuration.NamespaceConfig` property docstring.""" if hidden: return argparse.SUPPRESS @@ -73,11 +84,11 @@ class HelpfulArgumentGroup: HelpfulArgumentParser when necessary. """ - def __init__(self, helpful_arg_parser, topic): + def __init__(self, helpful_arg_parser: "helpful.HelpfulArgumentParser", topic: str) -> None: self._parser = helpful_arg_parser self._topic = topic - def add_argument(self, *args, **kwargs): + def add_argument(self, *args: Any, **kwargs: Any) -> None: """Add a new command line argument to the argument group.""" self._parser.add(self._topic, *args, **kwargs) @@ -88,12 +99,12 @@ class CustomHelpFormatter(argparse.HelpFormatter): In particular we fix https://bugs.python.org/issue28742 """ - def _get_help_string(self, action): + def _get_help_string(self, action: argparse.Action) -> Optional[str]: helpstr = action.help - if '%(default)' not in action.help and '(default:' not in action.help: + if action.help and '%(default)' not in action.help and '(default:' not in action.help: if action.default != argparse.SUPPRESS: defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE] - if action.option_strings or action.nargs in defaulting_nargs: + if helpstr and (action.option_strings or action.nargs in defaulting_nargs): helpstr += ' (default: %(default)s)' return helpstr @@ -101,12 +112,15 @@ class CustomHelpFormatter(argparse.HelpFormatter): class _DomainsAction(argparse.Action): """Action class for parsing domains.""" - def __call__(self, parser, namespace, domain, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + domain: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: """Just wrap add_domains in argparseese.""" - add_domains(namespace, domain) + add_domains(namespace, str(domain) if domain is not None else None) -def add_domains(args_or_config, domains): +def add_domains(args_or_config: Union[argparse.Namespace, configuration.NamespaceConfig], + domains: Optional[str]) -> List[str]: """Registers new domains to be used during the current client run. Domains are not added to the list of requested domains if they have @@ -121,7 +135,10 @@ def add_domains(args_or_config, domains): :rtype: `list` of `str` """ - validated_domains = [] + validated_domains: List[str] = [] + if not domains: + return validated_domains + for domain in domains.split(","): domain = util.enforce_domain_sanity(domain.strip()) validated_domains.append(domain) @@ -137,11 +154,13 @@ class CaseInsensitiveList(list): This class is passed to the `choices` argument of `argparse.add_arguments` through the `helpful` wrapper. It is necessary due to special handling of command line arguments by `set_by_cli` in which the `type_func` is not applied.""" - def __contains__(self, element): + def __contains__(self, element: object) -> bool: + if not isinstance(element, str): + return False return super().__contains__(element.lower()) -def _user_agent_comment_type(value): +def _user_agent_comment_type(value: str) -> str: if "(" in value or ")" in value: raise argparse.ArgumentTypeError("may not contain parentheses") return value @@ -150,13 +169,17 @@ def _user_agent_comment_type(value): class _EncodeReasonAction(argparse.Action): """Action class for parsing revocation reason.""" - def __call__(self, parser, namespace, reason, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + reason: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: """Encodes the reason for certificate revocation.""" - code = constants.REVOCATION_REASONS[reason.lower()] + if reason is None: + raise ValueError("Unexpected null reason.") + code = constants.REVOCATION_REASONS[str(reason).lower()] setattr(namespace, self.dest, code) -def parse_preferred_challenges(pref_challs): +def parse_preferred_challenges(pref_challs: Iterable[str]) -> List[str]: """Translate and validate preferred challenges. :param pref_challs: list of preferred challenge types @@ -183,9 +206,13 @@ def parse_preferred_challenges(pref_challs): class _PrefChallAction(argparse.Action): """Action class for parsing preferred challenges.""" - def __call__(self, parser, namespace, pref_challs, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + pref_challs: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: + if pref_challs is None: + raise ValueError("Unexpected null pref_challs.") try: - challs = parse_preferred_challenges(pref_challs.split(",")) + challs = parse_preferred_challenges(str(pref_challs).split(",")) except errors.Error as error: raise argparse.ArgumentError(self, str(error)) namespace.pref_challs.extend(challs) @@ -194,7 +221,9 @@ class _PrefChallAction(argparse.Action): class _DeployHookAction(argparse.Action): """Action class for parsing deploy hooks.""" - def __call__(self, parser, namespace, values, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: renew_hook_set = namespace.deploy_hook != namespace.renew_hook if renew_hook_set and namespace.renew_hook != values: raise argparse.ArgumentError( @@ -205,7 +234,9 @@ class _DeployHookAction(argparse.Action): class _RenewHookAction(argparse.Action): """Action class for parsing renew hooks.""" - def __call__(self, parser, namespace, values, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: deploy_hook_set = namespace.deploy_hook is not None if deploy_hook_set and namespace.deploy_hook != values: raise argparse.ArgumentError( @@ -213,7 +244,7 @@ class _RenewHookAction(argparse.Action): namespace.renew_hook = values -def nonnegative_int(value): +def nonnegative_int(value: str) -> int: """Converts value to an int and checks that it is not negative. This function should used as the type parameter for argparse diff --git a/certbot/certbot/_internal/cli/group_adder.py b/certbot/certbot/_internal/cli/group_adder.py index 0c54c9fe1..96d58824b 100644 --- a/certbot/certbot/_internal/cli/group_adder.py +++ b/certbot/certbot/_internal/cli/group_adder.py @@ -1,9 +1,14 @@ """This module contains a function to add the groups of arguments for the help display""" +from typing import TYPE_CHECKING + from certbot._internal.cli.verb_help import VERB_HELP +if TYPE_CHECKING: + from certbot._internal.cli import helpful -def _add_all_groups(helpful): + +def _add_all_groups(helpful: "helpful.HelpfulArgumentParser") -> None: helpful.add_group("automation", description="Flags for automating execution & other tweaks") helpful.add_group("security", description="Security parameters & server settings") helpful.add_group("testing", diff --git a/certbot/certbot/_internal/cli/helpful.py b/certbot/certbot/_internal/cli/helpful.py index 0848829c4..714c487a4 100644 --- a/certbot/certbot/_internal/cli/helpful.py +++ b/certbot/certbot/_internal/cli/helpful.py @@ -7,6 +7,10 @@ import glob import sys from typing import Any from typing import Dict +from typing import Iterable +from typing import List +from typing import Optional +from typing import Union import configargparse @@ -29,6 +33,7 @@ from certbot._internal.cli.cli_utils import HelpfulArgumentGroup from certbot._internal.cli.verb_help import VERB_HELP from certbot._internal.cli.verb_help import VERB_HELP_MAP from certbot._internal.display import obj as display_obj +from certbot._internal.plugins import disco from certbot.compat import os @@ -40,7 +45,8 @@ class HelpfulArgumentParser: 'certbot --help security' for security options. """ - def __init__(self, args, plugins, detect_defaults=False): + def __init__(self, args: List[str], plugins: Iterable[str], + detect_defaults: bool = False) -> None: from certbot._internal import main self.VERBS = { "auth": main.certonly, @@ -80,6 +86,7 @@ class HelpfulArgumentParser: self.determine_verb() help1 = self.prescan_for_flag("-h", self.help_topics) help2 = self.prescan_for_flag("--help", self.help_topics) + self.help_arg: Union[str, bool] if isinstance(help1, bool) and isinstance(help2, bool): self.help_arg = help1 or help2 else: @@ -111,7 +118,7 @@ class HelpfulArgumentParser: # Help that are synonyms for --help subcommands COMMANDS_TOPICS = ["command", "commands", "subcommand", "subcommands", "verbs"] - def _list_subcommands(self): + def _list_subcommands(self) -> str: longest = max(len(v) for v in VERB_HELP_MAP) text = "The full list of available SUBCOMMANDS is:\n\n" @@ -122,7 +129,7 @@ class HelpfulArgumentParser: text += "\nYou can get more help on a specific subcommand with --help SUBCOMMAND\n" return text - def _usage_string(self, plugins, help_arg): + def _usage_string(self, plugins: Iterable[str], help_arg: Union[str, bool]) -> str: """Make usage strings late so that plugins can be initialised late :param plugins: all discovered plugins @@ -150,13 +157,14 @@ class HelpfulArgumentParser: # if we're doing --help all, the OVERVIEW is part of the SHORT_USAGE at # the top; if we're doing --help someothertopic, it's OT so it's not usage += COMMAND_OVERVIEW % (apache_doc, nginx_doc) - else: + elif isinstance(help_arg, str): custom = VERB_HELP_MAP.get(help_arg, {}).get("usage", None) usage = custom if custom else usage + # Only remaining case is help_arg == False, which gives effectively usage == SHORT_USAGE. return usage - def remove_config_file_domains_for_renewal(self, parsed_args): + def remove_config_file_domains_for_renewal(self, parsed_args: argparse.Namespace) -> None: """Make "certbot renew" safe if domains are set in cli.ini.""" # Works around https://github.com/certbot/certbot/issues/4096 if self.verb == "renew": @@ -164,7 +172,7 @@ class HelpfulArgumentParser: if source.startswith("config_file") and "domains" in flags: parsed_args.domains = _Default() if self.detect_defaults else [] - def parse_args(self): + def parse_args(self) -> argparse.Namespace: """Parses command line arguments and returns the result. :returns: parsed command line arguments @@ -224,7 +232,7 @@ class HelpfulArgumentParser: return parsed_args - def set_test_server(self, parsed_args): + def set_test_server(self, parsed_args: argparse.Namespace) -> None: """We have --staging/--dry-run; perform sanity check and set config.server""" # Flag combinations should produce these results: @@ -253,7 +261,7 @@ class HelpfulArgumentParser: parsed_args.tos = True parsed_args.register_unsafely_without_email = True - def handle_csr(self, parsed_args): + def handle_csr(self, parsed_args: argparse.Namespace) -> None: """Process a --csr flag.""" if parsed_args.verb != "certonly": raise errors.Error("Currently, a CSR file may only be specified " @@ -287,7 +295,7 @@ class HelpfulArgumentParser: .format(", ".join(csr_domains), ", ".join(config_domains))) - def determine_verb(self): + def determine_verb(self) -> None: """Determines the verb/subcommand provided by the user. This function works around some of the limitations of argparse. @@ -311,7 +319,7 @@ class HelpfulArgumentParser: self.verb = "run" - def prescan_for_flag(self, flag, possible_arguments): + def prescan_for_flag(self, flag: str, possible_arguments: Iterable[str]) -> Union[str, bool]: """Checks cli input for flags. Check for a flag, which accepts a fixed set of possible arguments, in @@ -332,7 +340,8 @@ class HelpfulArgumentParser: pass return True - def add(self, topics, *args, **kwargs): + def add(self, topics: Optional[Union[List[Optional[str]], str]], *args: Any, + **kwargs: Any) -> None: """Add a new command line argument. :param topics: str or [str] help topic(s) this should be listed under, @@ -367,7 +376,7 @@ class HelpfulArgumentParser: if self.detect_defaults: kwargs = self.modify_kwargs_for_default_detection(**kwargs) - if self.visible_topics[topic]: + if isinstance(topic, str) and self.visible_topics[topic]: if topic in self.groups: group = self.groups[topic] group.add_argument(*args, **kwargs) @@ -377,7 +386,7 @@ class HelpfulArgumentParser: kwargs["help"] = argparse.SUPPRESS self.parser.add_argument(*args, **kwargs) - def modify_kwargs_for_default_detection(self, **kwargs): + def modify_kwargs_for_default_detection(self, **kwargs: Any) -> Dict[str, Any]: """Modify an arg so we can check if it was set by the user. Changes the parameters given to argparse when adding an argument @@ -399,7 +408,7 @@ class HelpfulArgumentParser: return kwargs - def add_deprecated_argument(self, argument_name, num_args): + def add_deprecated_argument(self, argument_name: str, num_args: int) -> None: """Adds a deprecated argument with the name argument_name. Deprecated arguments are not shown in the help. If they are used @@ -407,7 +416,7 @@ class HelpfulArgumentParser: argument is deprecated and no other action is taken. :param str argument_name: Name of deprecated argument. - :param int nargs: Number of arguments the option takes. + :param int num_args: Number of arguments the option takes. """ # certbot.util.add_deprecated_argument expects the normal add_argument @@ -427,7 +436,8 @@ class HelpfulArgumentParser: add_func = functools.partial(self.add, None) util.add_deprecated_argument(add_func, argument_name, num_args) - def add_group(self, topic, verbs=(), **kwargs): + def add_group(self, topic: str, verbs: Iterable[str] = (), + **kwargs: Any) -> HelpfulArgumentGroup: """Create a new argument group. This method must be called once for every topic, however, calls @@ -449,7 +459,7 @@ class HelpfulArgumentParser: self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"]) return HelpfulArgumentGroup(self, topic) - def add_plugin_args(self, plugins): + def add_plugin_args(self, plugins: disco.PluginsRegistry) -> None: """ Let each of the plugins add its own command line arguments, which @@ -461,7 +471,7 @@ class HelpfulArgumentParser: description=plugin_ep.long_description) plugin_ep.plugin_cls.inject_parser_options(parser_or_group, name) - def determine_help_topics(self, chosen_topic): + def determine_help_topics(self, chosen_topic: Union[str, bool]) -> Dict[str, bool]: """ The user may have requested help on a topic, return a dict of which diff --git a/certbot/certbot/_internal/cli/paths_parser.py b/certbot/certbot/_internal/cli/paths_parser.py index 6197a4bf9..69a112cfa 100644 --- a/certbot/certbot/_internal/cli/paths_parser.py +++ b/certbot/certbot/_internal/cli/paths_parser.py @@ -1,13 +1,19 @@ """This is a module that adds configuration to the argument parser regarding paths for certificates""" +from typing import TYPE_CHECKING +from typing import Union + from certbot._internal.cli.cli_utils import config_help from certbot._internal.cli.cli_utils import flag_default from certbot.compat import os +if TYPE_CHECKING: + from certbot._internal.cli import helpful -def _paths_parser(helpful): + +def _paths_parser(helpful: "helpful.HelpfulArgumentParser") -> None: add = helpful.add - verb = helpful.verb + verb: Union[str, bool] = helpful.verb if verb == "help": verb = helpful.help_arg @@ -21,7 +27,7 @@ def _paths_parser(helpful): add(["paths", "install", "revoke", "certonly", "manage"], "--cert-path", **cpkwargs) section = "paths" - if verb in ("install", "revoke"): + if isinstance(verb, str) and verb in ("install", "revoke"): section = verb add(section, "--key-path", type=os.path.abspath, help="Path to private key for certificate installation " diff --git a/certbot/certbot/_internal/cli/plugins_parsing.py b/certbot/certbot/_internal/cli/plugins_parsing.py index bbfdf22da..f0a976bf4 100644 --- a/certbot/certbot/_internal/cli/plugins_parsing.py +++ b/certbot/certbot/_internal/cli/plugins_parsing.py @@ -1,8 +1,15 @@ """This is a module that handles parsing of plugins for the argument parser""" +from typing import TYPE_CHECKING + from certbot._internal.cli.cli_utils import flag_default +from certbot._internal.plugins import disco + +if TYPE_CHECKING: + from certbot._internal.cli import helpful -def _plugins_parsing(helpful, plugins): +def _plugins_parsing(helpful: "helpful.HelpfulArgumentParser", + plugins: disco.PluginsRegistry) -> None: # It's nuts, but there are two "plugins" topics. Somehow this works helpful.add_group( "plugins", description="Plugin Selection: Certbot client supports an " diff --git a/certbot/certbot/_internal/cli/subparsers.py b/certbot/certbot/_internal/cli/subparsers.py index d872cf71a..5e3304ba1 100644 --- a/certbot/certbot/_internal/cli/subparsers.py +++ b/certbot/certbot/_internal/cli/subparsers.py @@ -1,4 +1,6 @@ """This module creates subparsers for the argument parser""" +from typing import TYPE_CHECKING + from certbot import interfaces from certbot._internal import constants from certbot._internal.cli.cli_utils import _EncodeReasonAction @@ -7,8 +9,11 @@ from certbot._internal.cli.cli_utils import CaseInsensitiveList from certbot._internal.cli.cli_utils import flag_default from certbot._internal.cli.cli_utils import read_file +if TYPE_CHECKING: + from certbot._internal.cli import helpful -def _create_subparsers(helpful): + +def _create_subparsers(helpful: "helpful.HelpfulArgumentParser") -> None: from certbot._internal.client import sample_user_agent # avoid import loops helpful.add( None, "--user-agent", default=flag_default("user_agent"), diff --git a/certbot/certbot/_internal/client.py b/certbot/certbot/_internal/client.py index f5793c895..4c472df55 100644 --- a/certbot/certbot/_internal/client.py +++ b/certbot/certbot/_internal/client.py @@ -2,14 +2,13 @@ import datetime import logging import platform +from typing import cast from typing import Any from typing import Callable -from typing import cast from typing import Dict from typing import IO from typing import List from typing import Optional -from typing import Set from typing import Tuple from typing import Union import warnings @@ -241,10 +240,8 @@ def perform_registration(acme: acme_client.ClientV2, config: configuration.Names raise errors.Error(msg) try: - # TODO: Remove the cast once certbot package is fully typed newreg = messages.NewRegistration.from_data( - email=config.email, - external_account_binding=cast(Optional[messages.ExternalAccountBinding], eab)) + email=config.email, external_account_binding=eab) # Until ACME v1 support is removed from Certbot, we actually need the provided # ACME client to be a wrapper of type BackwardsCompatibleClientV2. # TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2 @@ -416,8 +413,7 @@ class Client: elliptic_curve=elliptic_curve, strict_permissions=self.config.strict_permissions, ) - # TODO: Remove the cast once certbot package is fully typed - csr = crypto_util.generate_csr(key, cast(Set[str], domains), self.config.csr_dir, + csr = crypto_util.generate_csr(key, domains, self.config.csr_dir, self.config.must_staple, self.config.strict_permissions) orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names) @@ -668,8 +664,7 @@ class Client: with error_handler.ErrorHandler(self._recovery_routine_with_msg, None): for dom in domains: try: - # TODO: Remove the cast once certbot package is fully typed - self.installer.enhance(dom, enhancement, cast(Optional[List[str]], options)) + self.installer.enhance(dom, enhancement, options) except errors.PluginEnhancementAlreadyPresent: logger.info("Enhancement %s was already set.", enh_label) except errors.PluginError: diff --git a/certbot/certbot/_internal/display/completer.py b/certbot/certbot/_internal/display/completer.py index b43859b19..821aba780 100644 --- a/certbot/certbot/_internal/display/completer.py +++ b/certbot/certbot/_internal/display/completer.py @@ -1,8 +1,14 @@ """Provides Tab completion when prompting users for a path.""" import glob +from types import TracebackType from typing import Callable from typing import Iterator from typing import Optional +from typing import Type +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing_extensions import Literal # readline module is not available on all systems try: @@ -28,12 +34,12 @@ class Completer: """ - def __init__(self): + def __init__(self) -> None: self._iter: Iterator[str] self._original_completer: Optional[Callable] self._original_delims: str - def complete(self, text, state): + def complete(self, text: str, state: int) -> Optional[str]: """Provides path completion for use with readline. :param str text: text to offer completions for @@ -48,7 +54,7 @@ class Completer: self._iter = glob.iglob(text + '*') return next(self._iter, None) - def __enter__(self): + def __enter__(self) -> None: self._original_completer = readline.get_completer() self._original_delims = readline.get_completer_delims() @@ -62,6 +68,9 @@ class Completer: else: readline.parse_and_bind('tab: complete') - def __exit__(self, unused_type, unused_value, unused_traceback): + def __exit__(self, unused_type: Optional[Type[BaseException]], + unused_value: Optional[BaseException], + unused_traceback: Optional[TracebackType]) -> 'Literal[False]': readline.set_completer_delims(self._original_delims) readline.set_completer(self._original_completer) + return False diff --git a/certbot/certbot/_internal/display/dummy_readline.py b/certbot/certbot/_internal/display/dummy_readline.py index fb3d807bb..2b6e4c310 100644 --- a/certbot/certbot/_internal/display/dummy_readline.py +++ b/certbot/certbot/_internal/display/dummy_readline.py @@ -1,21 +1,25 @@ """A dummy module with no effect for use on systems without readline.""" +from typing import Callable +from typing import Iterable +from typing import List +from typing import Optional -def get_completer(): +def get_completer() -> Optional[Callable[[], str]]: """An empty implementation of readline.get_completer.""" -def get_completer_delims(): +def get_completer_delims() -> List[str]: """An empty implementation of readline.get_completer_delims.""" -def parse_and_bind(unused_command): +def parse_and_bind(unused_command: str) -> None: """An empty implementation of readline.parse_and_bind.""" -def set_completer(unused_function=None): +def set_completer(unused_function: Optional[Callable[[], str]] = None) -> None: """An empty implementation of readline.set_completer.""" -def set_completer_delims(unused_delims): +def set_completer_delims(unused_delims: Iterable[str]) -> None: """An empty implementation of readline.set_completer_delims.""" diff --git a/certbot/certbot/_internal/display/obj.py b/certbot/certbot/_internal/display/obj.py index b30587b4e..39b737e80 100644 --- a/certbot/certbot/_internal/display/obj.py +++ b/certbot/certbot/_internal/display/obj.py @@ -1,7 +1,13 @@ """This modules define the actual display implementations used in Certbot""" import logging import sys +from typing import Any +from typing import Iterable +from typing import List from typing import Optional +from typing import TextIO +from typing import Tuple +from typing import TypeVar from typing import Union import zope.component @@ -35,12 +41,14 @@ it as a heading)""" # Adding a level of indirection causes the lookup of the global _DisplayService # object to happen first avoiding this potential bug. class _DisplayService: - def __init__(self): + def __init__(self) -> None: self.display: Optional[Union[FileDisplay, NoninteractiveDisplay]] = None _SERVICE = _DisplayService() +T = TypeVar("T") + # This use of IDisplay can be removed when this class is no longer accessible # through the public API in certbot.display.util. @@ -49,15 +57,14 @@ class FileDisplay: """File-based display.""" # see https://github.com/certbot/certbot/issues/3915 - def __init__(self, outfile, force_interactive): + def __init__(self, outfile: TextIO, force_interactive: bool) -> None: super().__init__() self.outfile = outfile self.force_interactive = force_interactive self.skipped_interaction = False - def notification(self, message, pause=True, - wrap=True, force_interactive=False, - decorate=True): + def notification(self, message: str, pause: bool = True, wrap: bool = True, + force_interactive: bool = False, decorate: bool = True) -> None: """Displays a notification and waits for user acceptance. :param str message: Message to display @@ -89,9 +96,11 @@ class FileDisplay: else: logger.debug("Not pausing for user confirmation") - def menu(self, message, choices, ok_label=None, cancel_label=None, # pylint: disable=unused-argument - help_label=None, default=None, # pylint: disable=unused-argument - cli_flag=None, force_interactive=False, **unused_kwargs): + def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]], + ok_label: Optional[str] = None, cancel_label: Optional[str] = None, # pylint: disable=unused-argument + help_label: Optional[str] = None, default: Optional[int] = None, # pylint: disable=unused-argument + cli_flag: Optional[str] = None, force_interactive: bool = False, + **unused_kwargs: Any) -> Tuple[str, int]: """Display a menu. .. todo:: This doesn't enable the help label/button (I wasn't sold on @@ -113,8 +122,9 @@ class FileDisplay: :rtype: tuple """ - if self._return_default(message, default, cli_flag, force_interactive): - return OK, default + return_default = self._return_default(message, default, cli_flag, force_interactive) + if return_default is not None: + return OK, return_default self._print_menu(message, choices) @@ -122,8 +132,8 @@ class FileDisplay: return code, selection - 1 - def input(self, message, default=None, - cli_flag=None, force_interactive=False, **unused_kwargs): + def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None, + force_interactive: bool = False, **unused_kwargs: Any) -> Tuple[str, str]: """Accept input from the user. :param str message: message to display to the user @@ -138,8 +148,9 @@ class FileDisplay: :rtype: tuple """ - if self._return_default(message, default, cli_flag, force_interactive): - return OK, default + return_default = self._return_default(message, default, cli_flag, force_interactive) + if return_default is not None: + return OK, return_default # Trailing space must be added outside of util.wrap_lines to # be preserved @@ -150,8 +161,9 @@ class FileDisplay: return CANCEL, "-1" return OK, ans - def yesno(self, message, yes_label="Yes", no_label="No", default=None, - cli_flag=None, force_interactive=False, **unused_kwargs): + def yesno(self, message: str, yes_label: str = "Yes", no_label: str = "No", + default: Optional[bool] = None, cli_flag: Optional[str] = None, + force_interactive: bool = False, **unused_kwargs: Any) -> bool: """Query the user with a yes/no question. Yes and No label must begin with different letters, and must contain at @@ -169,8 +181,9 @@ class FileDisplay: :rtype: bool """ - if self._return_default(message, default, cli_flag, force_interactive): - return default + return_default = self._return_default(message, default, cli_flag, force_interactive) + if return_default is not None: + return return_default message = util.wrap_lines(message) @@ -192,8 +205,9 @@ class FileDisplay: ans.startswith(no_label[0].upper())): return False - def checklist(self, message, tags, default=None, - cli_flag=None, force_interactive=False, **unused_kwargs): + def checklist(self, message: str, tags: List[str], default: Optional[List[str]] = None, + cli_flag: Optional[str] = None, force_interactive: bool = False, + **unused_kwargs: Any) -> Tuple[str, List[str]]: """Display a checklist. :param str message: Message to display to user @@ -209,8 +223,9 @@ class FileDisplay: :rtype: tuple """ - if self._return_default(message, default, cli_flag, force_interactive): - return OK, default + return_default = self._return_default(message, default, cli_flag, force_interactive) + if return_default is not None: + return OK, return_default while True: self._print_menu(message, tags) @@ -233,22 +248,23 @@ class FileDisplay: else: return code, [] - def _return_default(self, prompt, default, cli_flag, force_interactive): + def _return_default(self, prompt: str, default: Optional[T], + cli_flag: Optional[str], force_interactive: bool) -> Optional[T]: """Should we return the default instead of prompting the user? :param str prompt: prompt for the user - :param default: default answer to prompt + :param T default: default answer to prompt :param str cli_flag: command line option for setting an answer to this question :param bool force_interactive: if interactivity is forced - :returns: True if we should return the default without prompting - :rtype: bool + :returns: The default value if we should return it else `None` + :rtype: T or `None` """ # assert_valid_call(prompt, default, cli_flag, force_interactive) if self._can_interact(force_interactive): - return False + return None if default is None: msg = "Unable to get an answer for the question:\n{0}".format(prompt) if cli_flag: @@ -259,9 +275,9 @@ class FileDisplay: logger.debug( "Falling back to default %s for the prompt:\n%s", default, prompt) - return True + return default - def _can_interact(self, force_interactive): + def _can_interact(self, force_interactive: bool) -> bool: """Can we safely interact with the user? :param bool force_interactive: if interactivity is forced @@ -282,8 +298,9 @@ class FileDisplay: self.skipped_interaction = True return False - def directory_select(self, message, default=None, cli_flag=None, - force_interactive=False, **unused_kwargs): + def directory_select(self, message: str, default: Optional[str] = None, + cli_flag: Optional[str] = None, force_interactive: bool = False, + **unused_kwargs: Any) -> Tuple[str, str]: """Display a directory selection screen. :param str message: prompt to give the user @@ -300,7 +317,8 @@ class FileDisplay: with completer.Completer(): return self.input(message, default, cli_flag, force_interactive) - def _scrub_checklist_input(self, indices, tags): + def _scrub_checklist_input(self, indices: Iterable[Union[str, int]], + tags: List[str]) -> List[str]: """Validate input and transform indices to appropriate tags. :param list indices: input @@ -312,21 +330,22 @@ class FileDisplay: """ # They should all be of type int try: - indices = [int(index) for index in indices] + indices_int = [int(index) for index in indices] except ValueError: return [] # Remove duplicates - indices = list(set(indices)) + indices_int = list(set(indices_int)) # Check all input is within range - for index in indices: + for index in indices_int: if index < 1 or index > len(tags): return [] - # Transform indices to appropriate tags - return [tags[index - 1] for index in indices] + # Transform indices_int to appropriate tags + return [tags[index - 1] for index in indices_int] - def _print_menu(self, message, choices): + def _print_menu(self, message: str, + choices: Union[List[Tuple[str, str]], List[str]]) -> None: """Print a menu on the screen. :param str message: title of menu @@ -355,7 +374,7 @@ class FileDisplay: self.outfile.write(SIDE_FRAME + os.linesep) self.outfile.flush() - def _get_valid_int_ans(self, max_): + def _get_valid_int_ans(self, max_: int) -> Tuple[str, int]: """Get a numerical selection. :param int max: The maximum entry (len of choices), must be positive @@ -398,21 +417,23 @@ class FileDisplay: class NoninteractiveDisplay: """A display utility implementation that never asks for interactive user input""" - def __init__(self, outfile, *unused_args, **unused_kwargs): + def __init__(self, outfile: TextIO, *unused_args: Any, **unused_kwargs: Any) -> None: super().__init__() self.outfile = outfile - def _interaction_fail(self, message, cli_flag, extra=""): - """Error out in case of an attempt to interact in noninteractive mode""" + def _interaction_fail(self, message: str, cli_flag: Optional[str], + extra: str = "") -> errors.MissingCommandlineFlag: + """Return error to raise in case of an attempt to interact in noninteractive mode""" msg = "Missing command line flag or config entry for this setting:\n" msg += message if extra: msg += "\n" + extra if cli_flag: msg += "\n\n(You can set this with the {0} flag)".format(cli_flag) - raise errors.MissingCommandlineFlag(msg) + return errors.MissingCommandlineFlag(msg) - def notification(self, message, pause=False, wrap=True, decorate=True, **unused_kwargs): # pylint: disable=unused-argument + def notification(self, message: str, pause: bool = False, wrap: bool = True, # pylint: disable=unused-argument + decorate: bool = True, **unused_kwargs: Any) -> None: """Displays a notification without waiting for user acceptance. :param str message: Message to display to stdout @@ -434,8 +455,10 @@ class NoninteractiveDisplay: ) self.outfile.flush() - def menu(self, message, choices, ok_label=None, cancel_label=None, - help_label=None, default=None, cli_flag=None, **unused_kwargs): + def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]], + ok_label: Optional[str] = None, cancel_label: Optional[str] = None, + help_label: Optional[str] = None, default: Optional[int] = None, + cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, int]: # pylint: disable=unused-argument """Avoid displaying a menu. @@ -454,11 +477,12 @@ class NoninteractiveDisplay: """ if default is None: - self._interaction_fail(message, cli_flag, "Choices: " + repr(choices)) + raise self._interaction_fail(message, cli_flag, "Choices: " + repr(choices)) return OK, default - def input(self, message, default=None, cli_flag=None, **unused_kwargs): + def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None, + **unused_kwargs: Any) -> Tuple[str, str]: """Accept input from the user. :param str message: message to display to the user @@ -471,11 +495,12 @@ class NoninteractiveDisplay: """ if default is None: - self._interaction_fail(message, cli_flag) + raise self._interaction_fail(message, cli_flag) return OK, default - def yesno(self, message, yes_label=None, no_label=None, # pylint: disable=unused-argument - default=None, cli_flag=None, **unused_kwargs): + def yesno(self, message: str, yes_label: Optional[str] = None, no_label: Optional[str] = None, # pylint: disable=unused-argument + default: Optional[bool] = None, cli_flag: Optional[str] = None, + **unused_kwargs: Any) -> bool: """Decide Yes or No, without asking anybody :param str message: question for the user @@ -487,11 +512,11 @@ class NoninteractiveDisplay: """ if default is None: - self._interaction_fail(message, cli_flag) + raise self._interaction_fail(message, cli_flag) return default - def checklist(self, message, tags, default=None, - cli_flag=None, **unused_kwargs): + def checklist(self, message: str, tags: Iterable[str], default: Optional[List[str]] = None, + cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, List[str]]: """Display a checklist. :param str message: Message to display to user @@ -505,11 +530,11 @@ class NoninteractiveDisplay: """ if default is None: - self._interaction_fail(message, cli_flag, "? ".join(tags) + "?") + raise self._interaction_fail(message, cli_flag, "? ".join(tags) + "?") return OK, default - def directory_select(self, message, default=None, - cli_flag=None, **unused_kwargs): + def directory_select(self, message: str, default: Optional[str] = None, + cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, str]: """Simulate prompting the user for a directory. This function returns default if it is not ``None``, otherwise, diff --git a/certbot/certbot/_internal/display/util.py b/certbot/certbot/_internal/display/util.py index b9aa132b6..d2115289e 100644 --- a/certbot/certbot/_internal/display/util.py +++ b/certbot/certbot/_internal/display/util.py @@ -1,12 +1,13 @@ """Internal Certbot display utilities.""" -from typing import List -import textwrap import sys +import textwrap +from typing import List +from typing import Optional from certbot.compat import misc -def wrap_lines(msg): +def wrap_lines(msg: str) -> str: """Format lines nicely to 80 chars. :param str msg: Original message @@ -28,7 +29,7 @@ def wrap_lines(msg): return '\n'.join(fixed_l) -def parens_around_char(label): +def parens_around_char(label: str) -> str: """Place parens around first character of label. :param str label: Must contain at least one character @@ -37,7 +38,7 @@ def parens_around_char(label): return "({first}){rest}".format(first=label[0], rest=label[1:]) -def input_with_timeout(prompt=None, timeout=36000.0): +def input_with_timeout(prompt: Optional[str] = None, timeout: float = 36000.0) -> str: """Get user input with a timeout. Behaves the same as the builtin input, however, an error is raised if @@ -67,7 +68,7 @@ def input_with_timeout(prompt=None, timeout=36000.0): return line.rstrip('\n') -def separate_list_input(input_): +def separate_list_input(input_: str) -> List[str]: """Separate a comma or space separated list. :param str input_: input from the user @@ -97,10 +98,10 @@ def summarize_domain_list(domains: List[str]) -> str: if not domains: return "" - l = len(domains) - if l == 1: + length = len(domains) + if length == 1: return domains[0] - elif l == 2: + elif length == 2: return " and ".join(domains) else: - return "{0} and {1} more domains".format(domains[0], l-1) + return "{0} and {1} more domains".format(domains[0], length-1) diff --git a/certbot/certbot/_internal/plugins/disco.py b/certbot/certbot/_internal/plugins/disco.py index 1a8cf22c7..30409aff0 100644 --- a/certbot/certbot/_internal/plugins/disco.py +++ b/certbot/certbot/_internal/plugins/disco.py @@ -1,9 +1,14 @@ """Utilities for plugins discovery and selection.""" -from collections.abc import Mapping import itertools import logging import sys +from typing import Callable +from typing import cast from typing import Dict +from typing import Iterable +from typing import Iterator +from typing import List +from typing import Mapping from typing import Optional from typing import Type from typing import Union @@ -13,6 +18,7 @@ import pkg_resources import zope.interface import zope.interface.verify +from certbot import configuration from certbot import errors from certbot import interfaces from certbot._internal import constants @@ -49,7 +55,7 @@ class PluginEntryPoint: # this object is mutable, don't allow it to be hashed! __hash__ = None # type: ignore - def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix=False): + def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix: bool = False) -> None: self.name = self.entry_point_to_plugin_name(entry_point, with_prefix) self.plugin_cls: Type[interfaces.Plugin] = entry_point.load() self.entry_point = entry_point @@ -59,7 +65,7 @@ class PluginEntryPoint: self._hidden = False self._long_description: Optional[str] = None - def check_name(self, name): + def check_name(self, name: Optional[str]) -> bool: """Check if the name refers to this plugin.""" if name == self.name: if self.warning_message: @@ -68,43 +74,46 @@ class PluginEntryPoint: return False @classmethod - def entry_point_to_plugin_name(cls, entry_point, with_prefix): + def entry_point_to_plugin_name(cls, entry_point: pkg_resources.EntryPoint, + with_prefix: bool) -> str: """Unique plugin name for an ``entry_point``""" if with_prefix: + if not entry_point.dist: + raise errors.Error(f"Entrypoint {entry_point.name} has no distribution!") return entry_point.dist.key + ":" + entry_point.name return entry_point.name @property - def description(self): + def description(self) -> str: """Description of the plugin.""" return self.plugin_cls.description @property - def description_with_name(self): + def description_with_name(self) -> str: """Description with name. Handy for UI.""" return "{0} ({1})".format(self.description, self.name) @property - def long_description(self): + def long_description(self) -> str: """Long description of the plugin.""" if self._long_description: return self._long_description return getattr(self.plugin_cls, "long_description", self.description) @long_description.setter - def long_description(self, description): + def long_description(self, description: str) -> None: self._long_description = description @property - def hidden(self): + def hidden(self) -> bool: """Should this plugin be hidden from UI?""" return self._hidden or getattr(self.plugin_cls, "hidden", False) @hidden.setter - def hidden(self, hide): + def hidden(self, hide: bool) -> None: self._hidden = hide - def ifaces(self, *ifaces_groups): + def ifaces(self, *ifaces_groups: Iterable[Type]) -> bool: """Does plugin implements specified interface groups?""" return not ifaces_groups or any( all(_implements(self.plugin_cls, iface) @@ -112,20 +121,20 @@ class PluginEntryPoint: for ifaces in ifaces_groups) @property - def initialized(self): + def initialized(self) -> bool: """Has the plugin been initialized already?""" return self._initialized is not None - def init(self, config=None): + def init(self, config: Optional[configuration.NamespaceConfig] = None) -> interfaces.Plugin: """Memoized plugin initialization.""" - if not self.initialized: + if not self._initialized: self.entry_point.require() # fetch extras! # For plugins implementing ABCs Plugin, Authenticator or Installer, the following # line will raise an exception if some implementations of abstract methods are missing. self._initialized = self.plugin_cls(config, self.name) return self._initialized - def verify(self, ifaces): + def verify(self, ifaces: Iterable[Type]) -> bool: """Verify that the plugin conforms to the specified interfaces.""" if not self.initialized: raise ValueError("Plugin is not initialized.") @@ -136,13 +145,13 @@ class PluginEntryPoint: return True @property - def prepared(self): + def prepared(self) -> bool: """Has the plugin been prepared already?""" if not self.initialized: logger.debug(".prepared called on uninitialized %r", self) return self._prepared is not None - def prepare(self): + def prepare(self) -> Union[bool, Error]: """Memoized plugin preparation.""" if self._initialized is None: raise ValueError("Plugin is not initialized.") @@ -161,29 +170,30 @@ class PluginEntryPoint: self._prepared = error else: self._prepared = True - return self._prepared + # Mypy seems to fail to understand the actual type here, let's help it. + return cast(Union[bool, Error], self._prepared) @property - def misconfigured(self): + def misconfigured(self) -> bool: """Is plugin misconfigured?""" return isinstance(self._prepared, errors.MisconfigurationError) @property - def problem(self): + def problem(self) -> Optional[Exception]: """Return the Exception raised during plugin setup, or None if all is well""" if isinstance(self._prepared, Exception): return self._prepared return None @property - def available(self): + def available(self) -> bool: """Is plugin available, i.e. prepared or misconfigured?""" return self._prepared is True or self.misconfigured - def __repr__(self): + def __repr__(self) -> str: return "PluginEntryPoint#{0}".format(self.name) - def __str__(self): + def __str__(self) -> str: lines = [ "* {0}".format(self.name), "Description: {0}".format(self.plugin_cls.description), @@ -205,7 +215,7 @@ class PluginEntryPoint: class PluginsRegistry(Mapping): """Plugins registry.""" - def __init__(self, plugins): + def __init__(self, plugins: Mapping[str, PluginEntryPoint]) -> None: # plugins are sorted so the same order is used between runs. # This prevents deadlock caused by plugins acquiring a lock # and ensures at least one concurrent Certbot instance will run @@ -213,7 +223,7 @@ class PluginsRegistry(Mapping): self._plugins = dict(sorted(plugins.items())) @classmethod - def find_all(cls): + def find_all(cls) -> 'PluginsRegistry': """Find plugins using setuptools entry points.""" plugins: Dict[str, PluginEntryPoint] = {} plugin_paths_string = os.getenv('CERTBOT_PLUGIN_PATH') @@ -245,7 +255,9 @@ class PluginsRegistry(Mapping): return cls(plugins) @classmethod - def _load_entry_point(cls, entry_point, plugins, with_prefix): + def _load_entry_point(cls, entry_point: pkg_resources.EntryPoint, + plugins: Dict[str, PluginEntryPoint], + with_prefix: bool) -> PluginEntryPoint: plugin_ep = PluginEntryPoint(entry_point, with_prefix) if plugin_ep.name in plugins: other_ep = plugins[plugin_ep.name] @@ -261,47 +273,47 @@ class PluginsRegistry(Mapping): return plugin_ep - def __getitem__(self, name): + def __getitem__(self, name: str) -> PluginEntryPoint: return self._plugins[name] - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self._plugins) - def __len__(self): + def __len__(self) -> int: return len(self._plugins) - def init(self, config): + def init(self, config: configuration.NamespaceConfig) -> List[interfaces.Plugin]: """Initialize all plugins in the registry.""" return [plugin_ep.init(config) for plugin_ep in self._plugins.values()] - def filter(self, pred): + def filter(self, pred: Callable[[PluginEntryPoint], bool]) -> "PluginsRegistry": """Filter plugins based on predicate.""" return type(self)({name: plugin_ep for name, plugin_ep - in self._plugins.items() if pred(plugin_ep)}) + in self._plugins.items() if pred(plugin_ep)}) - def visible(self): + def visible(self) -> "PluginsRegistry": """Filter plugins based on visibility.""" return self.filter(lambda plugin_ep: not plugin_ep.hidden) - def ifaces(self, *ifaces_groups): + def ifaces(self, *ifaces_groups: Iterable[Type]) -> "PluginsRegistry": """Filter plugins based on interfaces.""" return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups)) - def verify(self, ifaces): + def verify(self, ifaces: Iterable[Type]) -> "PluginsRegistry": """Filter plugins based on verification.""" return self.filter(lambda p_ep: p_ep.verify(ifaces)) - def prepare(self): + def prepare(self) -> List[Union[bool, Error]]: """Prepare all plugins in the registry.""" return [plugin_ep.prepare() for plugin_ep in self._plugins.values()] - def available(self): + def available(self) -> "PluginsRegistry": """Filter plugins based on availability.""" return self.filter(lambda p_ep: p_ep.available) # successfully prepared + misconfigured - def find_init(self, plugin): + def find_init(self, plugin: interfaces.Plugin) -> Optional[PluginEntryPoint]: """Find an initialized plugin. This is particularly useful for finding a name for the plugin:: @@ -321,12 +333,12 @@ class PluginsRegistry(Mapping): return candidates[0] return None - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, ','.join( repr(p_ep) for p_ep in self._plugins.values())) - def __str__(self): + def __str__(self) -> str: if not self._plugins: return "No plugins" return "\n\n".join(str(p_ep) for p_ep in self._plugins.values()) diff --git a/certbot/certbot/_internal/plugins/manual.py b/certbot/certbot/_internal/plugins/manual.py index d2372e7dd..dc45ae271 100644 --- a/certbot/certbot/_internal/plugins/manual.py +++ b/certbot/certbot/_internal/plugins/manual.py @@ -1,6 +1,12 @@ """Manual authenticator plugin""" import logging +from typing import Any +from typing import Callable from typing import Dict +from typing import Iterable +from typing import List +from typing import Tuple +from typing import Type from acme import challenges from certbot import achallenges @@ -88,7 +94,7 @@ asked to create multiple distinct TXT records with the same name. This is permitted by DNS standards.) """ - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.reverter = reverter.Reverter(self.config) self.reverter.recovery_routine() @@ -97,14 +103,14 @@ permitted by DNS standards.) self.subsequent_any_challenge = False @classmethod - def add_parser_arguments(cls, add): + def add_parser_arguments(cls, add: Callable[..., None]) -> None: add('auth-hook', help='Path or command to execute for the authentication script') add('cleanup-hook', help='Path or command to execute for the cleanup script') util.add_deprecated_argument(add, 'public-ip-logging-ok', 0) - def prepare(self): # pylint: disable=missing-function-docstring + def prepare(self) -> None: # pylint: disable=missing-function-docstring if self.config.noninteractive_mode and not self.conf('auth-hook'): raise errors.PluginError( 'An authentication script must be provided with --{0} when ' @@ -112,7 +118,7 @@ permitted by DNS standards.) self.option_name('auth-hook'))) self._validate_hooks() - def _validate_hooks(self): + def _validate_hooks(self) -> None: if self.config.validate_hooks: for name in ('auth-hook', 'cleanup-hook'): hook = self.conf(name) @@ -120,13 +126,13 @@ permitted by DNS standards.) hook_prefix = self.option_name(name)[:-len('-hook')] hooks.validate_hook(hook, hook_prefix) - def more_info(self): # pylint: disable=missing-function-docstring + def more_info(self) -> str: # pylint: disable=missing-function-docstring return ( 'This plugin allows the user to customize setup for domain ' 'validation challenges either through shell scripts provided by ' 'the user or by performing the setup manually.') - def auth_hint(self, failed_achalls): + def auth_hint(self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]) -> str: has_chall = lambda cls: any(isinstance(achall.chall, cls) for achall in failed_achalls) has_dns = has_chall(challenges.DNS01) @@ -162,11 +168,12 @@ permitted by DNS standards.) ) ) - def get_chall_pref(self, domain): + def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]: # pylint: disable=unused-argument,missing-function-docstring return [challenges.HTTP01, challenges.DNS01] - def perform(self, achalls): # pylint: disable=missing-function-docstring + def perform(self, achalls: List[achallenges.AnnotatedChallenge] + ) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring responses = [] last_dns_achall = 0 for i, achall in enumerate(achalls): @@ -180,7 +187,8 @@ permitted by DNS standards.) responses.append(achall.response(achall.account_key)) return responses - def _perform_achall_with_script(self, achall, achalls): + def _perform_achall_with_script(self, achall: achallenges.AnnotatedChallenge, + achalls: List[achallenges.AnnotatedChallenge]) -> None: env = dict(CERTBOT_DOMAIN=achall.domain, CERTBOT_VALIDATION=achall.validation(achall.account_key), CERTBOT_ALL_DOMAINS=','.join(one_achall.domain for one_achall in achalls), @@ -194,7 +202,8 @@ permitted by DNS standards.) env['CERTBOT_AUTH_OUTPUT'] = out.strip() self.env[achall] = env - def _perform_achall_manually(self, achall, last_dns_achall=False): + def _perform_achall_manually(self, achall: achallenges.AnnotatedChallenge, + last_dns_achall: bool = False) -> None: validation = achall.validation(achall.account_key) if isinstance(achall.chall, challenges.HTTP01): msg = self._HTTP_INSTRUCTIONS.format( @@ -225,7 +234,7 @@ permitted by DNS standards.) display_util.notification(msg, wrap=False, force_interactive=True) self.subsequent_any_challenge = True - def cleanup(self, achalls): # pylint: disable=missing-function-docstring + def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring if self.conf('cleanup-hook'): for achall in achalls: env = self.env.pop(achall) @@ -235,7 +244,7 @@ permitted by DNS standards.) self._execute_hook('cleanup-hook', achall.domain) self.reverter.recovery_routine() - def _execute_hook(self, hook_name, achall_domain): + def _execute_hook(self, hook_name: str, achall_domain: str) -> Tuple[str, str]: returncode, err, out = misc.execute_command_status( self.option_name(hook_name), self.conf(hook_name), env=util.env_no_snap_for_external_calls() diff --git a/certbot/certbot/_internal/plugins/null.py b/certbot/certbot/_internal/plugins/null.py index b800c5c39..e79a88bb9 100644 --- a/certbot/certbot/_internal/plugins/null.py +++ b/certbot/certbot/_internal/plugins/null.py @@ -1,5 +1,9 @@ """Null plugin.""" import logging +from typing import Callable +from typing import List +from typing import Optional +from typing import Union from certbot import interfaces from certbot.plugins import common @@ -14,41 +18,42 @@ class Installer(common.Plugin, interfaces.Installer): hidden = True @classmethod - def add_parser_arguments(cls, add): + def add_parser_arguments(cls, add: Callable[..., None]) -> None: pass # pylint: disable=missing-function-docstring - def prepare(self): + def prepare(self) -> None: pass # pragma: no cover - def more_info(self): + def more_info(self) -> str: return "Installer that doesn't do anything (for testing)." - def get_all_names(self): + def get_all_names(self) -> List[str]: return [] - def deploy_cert(self, domain, cert_path, key_path, - chain_path=None, fullchain_path=None): + def deploy_cert(self, domain: str, cert_path: str, key_path: str, + chain_path: str, fullchain_path: str) -> None: pass # pragma: no cover - def enhance(self, domain, enhancement, options=None): + def enhance(self, domain: str, enhancement: str, + options: Optional[Union[List[str], str]] = None) -> None: pass # pragma: no cover - def supported_enhancements(self): + def supported_enhancements(self) -> List[str]: return [] - def save(self, title=None, temporary=False): + def save(self, title: Optional[str] = None, temporary: bool = False) -> None: pass # pragma: no cover - def rollback_checkpoints(self, rollback=1): + def rollback_checkpoints(self, rollback: int = 1) -> None: pass # pragma: no cover - def recovery_routine(self): + def recovery_routine(self) -> None: pass # pragma: no cover - def config_test(self): + def config_test(self) -> None: pass # pragma: no cover - def restart(self): + def restart(self) -> None: pass # pragma: no cover diff --git a/certbot/certbot/_internal/plugins/selection.py b/certbot/certbot/_internal/plugins/selection.py index 0e88e1324..f62db4089 100644 --- a/certbot/certbot/_internal/plugins/selection.py +++ b/certbot/certbot/_internal/plugins/selection.py @@ -1,8 +1,13 @@ """Decide which plugins to use for authentication & installation""" import logging +from typing import cast +from typing import Iterable +from typing import List from typing import Optional from typing import Tuple +from typing import Type +from typing import TypeVar from certbot import configuration from certbot import errors @@ -14,32 +19,36 @@ from certbot.display import util as display_util logger = logging.getLogger(__name__) -def pick_configurator( - config, default, plugins, - question="How would you like to authenticate and install " - "certificates?"): +def pick_configurator(config: configuration.NamespaceConfig, default: Optional[str], + plugins: disco.PluginsRegistry, + question: str = "How would you like to authenticate and install " + "certificates?") -> Optional[interfaces.Plugin]: """Pick configurator plugin.""" return pick_plugin( config, default, plugins, question, (interfaces.Authenticator, interfaces.Installer)) -def pick_installer(config, default, plugins, - question="How would you like to install certificates?"): +def pick_installer(config: configuration.NamespaceConfig, default: Optional[str], + plugins: disco.PluginsRegistry, + question: str = "How would you like to install certificates?" + ) -> Optional[interfaces.Installer]: """Pick installer plugin.""" - return pick_plugin( - config, default, plugins, question, (interfaces.Installer,)) + return pick_plugin(config, default, plugins, question, (interfaces.Installer,)) -def pick_authenticator( - config, default, plugins, question="How would you " - "like to authenticate with the ACME CA?"): +def pick_authenticator(config: configuration.NamespaceConfig, default: Optional[str], + plugins: disco.PluginsRegistry, + question: str = "How would you " + "like to authenticate with the ACME CA?" + ) -> Optional[interfaces.Authenticator]: """Pick authentication plugin.""" return pick_plugin( config, default, plugins, question, (interfaces.Authenticator,)) -def get_unprepared_installer(config, plugins): +def get_unprepared_installer(config: configuration.NamespaceConfig, + plugins: disco.PluginsRegistry) -> Optional[interfaces.Installer]: """ Get an unprepared interfaces.Installer object. @@ -69,10 +78,15 @@ def get_unprepared_installer(config, plugins): "Could not select or initialize the requested installer %s." % req_inst) -def pick_plugin(config, default, plugins, question, ifaces): +P = TypeVar('P', bound=interfaces.Plugin) + + +def pick_plugin(config: configuration.NamespaceConfig, default: Optional[str], + plugins: disco.PluginsRegistry, question: str, + ifaces: Iterable[Type]) -> Optional[P]: """Pick plugin. - :param certbot.configuration.NamespaceConfig: Configuration + :param certbot.configuration.NamespaceConfig config: Configuration :param str default: Plugin name supplied by user or ``None``. :param certbot._internal.plugins.disco.PluginsRegistry plugins: All plugins registered as entry points. @@ -108,22 +122,23 @@ def pick_plugin(config, default, plugins, question, ifaces): if len(prepared) > 1: logger.debug("Multiple candidate plugins: %s", prepared) - plugin_ep = choose_plugin(list(prepared.values()), question) - if plugin_ep is None: + plugin_ep1 = choose_plugin(list(prepared.values()), question) + if plugin_ep1 is None: return None - return plugin_ep.init() + return cast(P, plugin_ep1.init()) elif len(prepared) == 1: - plugin_ep = list(prepared.values())[0] - logger.debug("Single candidate plugin: %s", plugin_ep) - if plugin_ep.misconfigured: + plugin_ep2 = list(prepared.values())[0] + logger.debug("Single candidate plugin: %s", plugin_ep2) + if plugin_ep2.misconfigured: return None - return plugin_ep.init() + return plugin_ep2.init() else: logger.debug("No candidate plugin") return None -def choose_plugin(prepared, question): +def choose_plugin(prepared: List[disco.PluginEntryPoint], + question: str) -> Optional[disco.PluginEntryPoint]: """Allow the user to choose their plugin. :param list prepared: List of `~.PluginEntryPoint`. @@ -152,17 +167,29 @@ def choose_plugin(prepared, question): else: return None + noninstaller_plugins = ["webroot", "manual", "standalone", "dns-cloudflare", "dns-cloudxns", "dns-digitalocean", "dns-dnsimple", "dns-dnsmadeeasy", "dns-gehirn", "dns-google", "dns-linode", "dns-luadns", "dns-nsone", "dns-ovh", "dns-rfc2136", "dns-route53", "dns-sakuracloud"] -def record_chosen_plugins(config, plugins, auth, inst): - "Update the config entries to reflect the plugins we actually selected." - config.authenticator = plugins.find_init(auth).name if auth else None - config.installer = plugins.find_init(inst).name if inst else None + +def record_chosen_plugins(config: configuration.NamespaceConfig, plugins: disco.PluginsRegistry, + auth: Optional[interfaces.Authenticator], + inst: Optional[interfaces.Installer]) -> None: + """Update the config entries to reflect the plugins we actually selected.""" + config.authenticator = None + if auth: + auth_ep = plugins.find_init(auth) + if auth_ep: + config.authenticator = auth_ep.name + config.installer = None + if inst: + inst_ep = plugins.find_init(inst) + if inst_ep: + config.installer = inst_ep.name logger.info("Plugins selected: Authenticator %s, Installer %s", - config.authenticator, config.installer) + config.authenticator, config.installer) def choose_configurator_plugins(config: configuration.NamespaceConfig, @@ -181,7 +208,7 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig, """ req_auth, req_inst = cli_plugin_requests(config) - installer_question = None + installer_question = "" if verb == "enhance": installer_question = ("Which installer would you like to use to " @@ -209,11 +236,14 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig, logger.warning("Specifying an authenticator doesn't make sense when " "running Certbot with verb \"%s\"", verb) # Try to meet the user's request and/or ask them to pick plugins - authenticator = installer = None + authenticator: Optional[interfaces.Authenticator] = None + installer: Optional[interfaces.Installer] = None if verb == "run" and req_auth == req_inst: # Unless the user has explicitly asked for different auth/install, # only consider offering a single choice - authenticator = installer = pick_configurator(config, req_inst, plugins) + configurator = pick_configurator(config, req_inst, plugins) + authenticator = cast(Optional[interfaces.Authenticator], configurator) + installer = cast(Optional[interfaces.Installer], configurator) else: if need_inst or req_inst: installer = pick_installer(config, req_inst, plugins, installer_question) @@ -231,11 +261,11 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig, return installer, authenticator -def set_configurator(previously, now): +def set_configurator(previously: Optional[str], now: Optional[str]) -> Optional[str]: """ Setting configurators multiple ways is okay, as long as they all agree :param str previously: previously identified request for the installer/authenticator - :param str requested: the request currently being processed + :param str now: the request currently being processed """ if not now: # we're not actually setting anything @@ -247,7 +277,8 @@ def set_configurator(previously, now): return now -def cli_plugin_requests(config): +def cli_plugin_requests(config: configuration.NamespaceConfig + ) -> Tuple[Optional[str], Optional[str]]: """ Figure out which plugins the user requested with CLI and config options @@ -302,7 +333,8 @@ def cli_plugin_requests(config): return req_auth, req_inst -def diagnose_configurator_problem(cfg_type, requested, plugins): +def diagnose_configurator_problem(cfg_type: str, requested: Optional[str], + plugins: disco.PluginsRegistry) -> None: """ Raise the most helpful error message about a plugin being unavailable diff --git a/certbot/certbot/_internal/plugins/standalone.py b/certbot/certbot/_internal/plugins/standalone.py index 45c801256..f70d2ed9e 100644 --- a/certbot/certbot/_internal/plugins/standalone.py +++ b/certbot/certbot/_internal/plugins/standalone.py @@ -3,14 +3,19 @@ import collections import errno import logging import socket +from typing import Any +from typing import Callable from typing import DefaultDict from typing import Dict +from typing import Iterable from typing import List +from typing import Mapping from typing import Set from typing import Tuple +from typing import Type from typing import TYPE_CHECKING -import OpenSSL +from OpenSSL import crypto from acme import challenges from acme import standalone as acme_standalone @@ -42,12 +47,15 @@ class ServerManager: will serve the same URLs! """ - def __init__(self, certs, http_01_resources): - self._instances: Dict[int, acme_standalone.BaseDualNetworkedServers] = {} + def __init__(self, certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]], + http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] + ) -> None: + self._instances: Dict[int, acme_standalone.HTTP01DualNetworkedServers] = {} self.certs = certs self.http_01_resources = http_01_resources - def run(self, port, challenge_type, listenaddr=""): + def run(self, port: int, challenge_type: Type[challenges.Challenge], + listenaddr: str = "") -> acme_standalone.HTTP01DualNetworkedServers: """Run ACME server on specified ``port``. This method is idempotent, i.e. all calls with the same pair of @@ -81,7 +89,7 @@ class ServerManager: self._instances[real_port] = servers return servers - def stop(self, port): + def stop(self, port: int) -> None: """Stop ACME server running on the specified ``port``. :param int port: @@ -94,7 +102,7 @@ class ServerManager: instance.shutdown_and_server_close() del self._instances[port] - def running(self): + def running(self) -> Dict[int, acme_standalone.HTTP01DualNetworkedServers]: """Return all running instances. Once the server is stopped using `stop`, it will not be @@ -118,7 +126,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator): description = "Spin up a temporary webserver" - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.served: ServedType = collections.defaultdict(set) @@ -127,44 +135,49 @@ class Authenticator(common.Plugin, interfaces.Authenticator): # values, main thread writes). Due to the nature of CPython's # GIL, the operations are safe, c.f. # https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe - self.certs: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]] = {} + self.certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]] = {} self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = set() self.servers = ServerManager(self.certs, self.http_01_resources) @classmethod - def add_parser_arguments(cls, add): + def add_parser_arguments(cls, add: Callable[..., None]) -> None: pass # No additional argument for the standalone plugin parser - def more_info(self): # pylint: disable=missing-function-docstring + def more_info(self) -> str: # pylint: disable=missing-function-docstring return("This authenticator creates its own ephemeral TCP listener " "on the necessary port in order to respond to incoming " "http-01 challenges from the certificate authority. Therefore, " "it does not rely on any existing server program.") - def prepare(self): # pylint: disable=missing-function-docstring + def prepare(self) -> None: # pylint: disable=missing-function-docstring pass - def get_chall_pref(self, domain): + def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]: # pylint: disable=unused-argument,missing-function-docstring return [challenges.HTTP01] - def perform(self, achalls): # pylint: disable=missing-function-docstring + def perform(self, achalls: Iterable[achallenges.AnnotatedChallenge] + ) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring return [self._try_perform_single(achall) for achall in achalls] - def _try_perform_single(self, achall): + def _try_perform_single(self, + achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse: while True: try: return self._perform_single(achall) except errors.StandaloneBindError as error: _handle_perform_error(error) - def _perform_single(self, achall): + def _perform_single(self, + achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse: servers, response = self._perform_http_01(achall) self.served[servers].add(achall) return response - def _perform_http_01(self, achall): + def _perform_http_01(self, achall: achallenges.AnnotatedChallenge + ) -> Tuple[acme_standalone.HTTP01DualNetworkedServers, + challenges.ChallengeResponse]: port = self.config.http01_port addr = self.config.http01_address servers = self.servers.run(port, challenges.HTTP01, listenaddr=addr) @@ -174,7 +187,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator): self.http_01_resources.add(resource) return servers, response - def cleanup(self, achalls): # pylint: disable=missing-function-docstring + def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring # reduce self.served and close servers if no challenges are served for unused_servers, server_achalls in self.served.items(): for achall in achalls: @@ -193,7 +206,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator): "accept inbound connections from the internet.") -def _handle_perform_error(error): +def _handle_perform_error(error: errors.StandaloneBindError) -> None: if error.socket_error.errno == errno.EACCES: raise errors.PluginError( "Could not bind TCP port {0} because you don't have " diff --git a/certbot/certbot/_internal/plugins/webroot.py b/certbot/certbot/_internal/plugins/webroot.py index 1a4892ac9..bb61d8220 100644 --- a/certbot/certbot/_internal/plugins/webroot.py +++ b/certbot/certbot/_internal/plugins/webroot.py @@ -3,10 +3,17 @@ import argparse import collections import json import logging +from typing import Any +from typing import Callable from typing import DefaultDict from typing import Dict +from typing import Iterable from typing import List +from typing import Optional +from typing import Sequence from typing import Set +from typing import Type +from typing import Union from acme import challenges from certbot import crypto_util @@ -57,11 +64,11 @@ necessary validation resources to appropriate paths on the file system. It expects that there is some other HTTP server configured to serve all files under specified web root ({0}).""" - def more_info(self): # pylint: disable=missing-function-docstring + def more_info(self) -> str: # pylint: disable=missing-function-docstring return self.MORE_INFO.format(self.conf("path")) @classmethod - def add_parser_arguments(cls, add): + def add_parser_arguments(cls, add: Callable[..., None]) -> None: add("path", "-w", default=[], action=_WebrootPathAction, help="public_html / webroot path. This can be specified multiple " "times to handle different domains; each domain will have " @@ -78,34 +85,34 @@ to serve all files under specified web root ({0}).""" "file, it needs to be on a single line, like: webroot-map = " '{"example.com":"/var/www"}.') - def auth_hint(self, failed_achalls): # pragma: no cover + def auth_hint(self, failed_achalls: Iterable[AnnotatedChallenge]) -> str: # pragma: no cover return ("The Certificate Authority failed to download the temporary challenge files " "created by Certbot. Ensure that the listed domains serve their content from " "the provided --webroot-path/-w and that files created there can be downloaded " "from the internet.") - def get_chall_pref(self, domain): # pragma: no cover + def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]: # pylint: disable=unused-argument,missing-function-docstring return [challenges.HTTP01] - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.full_roots: Dict[str, str] = {} self.performed: DefaultDict[str, Set[AnnotatedChallenge]] = collections.defaultdict(set) # stack of dirs successfully created by this authenticator self._created_dirs: List[str] = [] - def prepare(self): # pylint: disable=missing-function-docstring + def prepare(self) -> None: # pylint: disable=missing-function-docstring pass - def perform(self, achalls): # pylint: disable=missing-function-docstring + def perform(self, achalls: Iterable[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring self._set_webroots(achalls) self._create_challenge_dirs() return [self._perform_single(achall) for achall in achalls] - def _set_webroots(self, achalls): + def _set_webroots(self, achalls: Iterable[AnnotatedChallenge]) -> None: if self.conf("path"): webroot_path = self.conf("path")[-1] logger.info("Using the webroot path %s for all unmatched domains.", @@ -127,7 +134,7 @@ to serve all files under specified web root ({0}).""" known_webroots.insert(0, new_webroot) self.conf("map")[achall.domain] = new_webroot - def _prompt_for_webroot(self, domain, known_webroots): + def _prompt_for_webroot(self, domain: str, known_webroots: List[str]) -> Optional[str]: webroot = None while webroot is None: @@ -142,7 +149,8 @@ to serve all files under specified web root ({0}).""" return webroot - def _prompt_with_webroot_list(self, domain, known_webroots): + def _prompt_with_webroot_list(self, domain: str, + known_webroots: List[str]) -> Optional[str]: path_flag = "--" + self.option_name("path") while True: @@ -156,7 +164,7 @@ to serve all files under specified web root ({0}).""" "webroot when using the webroot plugin.") return None if index == 0 else known_webroots[index - 1] # code == display_util.OK - def _prompt_for_new_webroot(self, domain, allowraise=False): + def _prompt_for_new_webroot(self, domain: str, allowraise: bool = False) -> Optional[str]: code, webroot = ops.validated_directory( _validate_webroot, "Input the webroot for {0}:".format(domain), @@ -169,7 +177,7 @@ to serve all files under specified web root ({0}).""" "webroot when using the webroot plugin.") return _validate_webroot(webroot) # code == display_util.OK - def _create_challenge_dirs(self): + def _create_challenge_dirs(self) -> None: path_map = self.conf("map") if not path_map: raise errors.PluginError( @@ -227,10 +235,10 @@ to serve all files under specified web root ({0}).""" with safe_open(web_config_path, mode="w", chmod=0o644) as web_config: web_config.write(_WEB_CONFIG_CONTENT) - def _get_validation_path(self, root_path, achall): + def _get_validation_path(self, root_path: str, achall: AnnotatedChallenge) -> str: return os.path.join(root_path, achall.chall.encode("token")) - def _perform_single(self, achall): + def _perform_single(self, achall: AnnotatedChallenge) -> challenges.ChallengeResponse: response, validation = achall.response_and_validation() root_path = self.full_roots[achall.domain] @@ -249,7 +257,7 @@ to serve all files under specified web root ({0}).""" self.performed[root_path].add(achall) return response - def cleanup(self, achalls): # pylint: disable=missing-function-docstring + def cleanup(self, achalls: Iterable[AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring for achall in achalls: root_path = self.full_roots.get(achall.domain, None) if root_path is not None: @@ -270,7 +278,6 @@ to serve all files under specified web root ({0}).""" logger.info("Not cleaning up the web.config file in %s " "because it is not generated by Certbot.", root_path) - not_removed: List[str] = [] while self._created_dirs: path = self._created_dirs.pop() @@ -287,8 +294,12 @@ to serve all files under specified web root ({0}).""" class _WebrootMapAction(argparse.Action): """Action class for parsing webroot_map.""" - def __call__(self, parser, namespace, webroot_map, option_string=None): - for domains, webroot_path in json.loads(webroot_map).items(): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + webroot_map: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: + if webroot_map is None: + return + for domains, webroot_path in json.loads(str(webroot_map)).items(): webroot_path = _validate_webroot(webroot_path) namespace.webroot_map.update( (d, webroot_path) for d in cli.add_domains(namespace, domains)) @@ -297,11 +308,15 @@ class _WebrootMapAction(argparse.Action): class _WebrootPathAction(argparse.Action): """Action class for parsing webroot_path.""" - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._domain_before_webroot = False - def __call__(self, parser, namespace, webroot_path, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, + webroot_path: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: + if webroot_path is None: + return if self._domain_before_webroot: raise errors.PluginError( "If you specify multiple webroot paths, " @@ -316,10 +331,10 @@ class _WebrootPathAction(argparse.Action): elif namespace.domains: self._domain_before_webroot = True - namespace.webroot_path.append(_validate_webroot(webroot_path)) + namespace.webroot_path.append(_validate_webroot(str(webroot_path))) -def _validate_webroot(webroot_path): +def _validate_webroot(webroot_path: str) -> str: """Validates and returns the absolute path of webroot_path. :param str webroot_path: path to the webroot directory diff --git a/tox.ini b/tox.ini index 554ff5b68..ee3e4d7d2 100644 --- a/tox.ini +++ b/tox.ini @@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx all_packages = {[base]win_all_packages} certbot-apache -fully_typed_source_paths = acme/acme -partially_typed_source_paths = certbot/certbot certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py +fully_typed_source_paths = acme/acme certbot/certbot +partially_typed_source_paths = certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py [testenv] passenv =