1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Add type annotations to the certbot package (part 4) (#9087)

* Extract from #9084

* Cast/ignore types during the transition

* Remove useless casts and type ignore directives

* Fix lint

* Fix a cast

* Mandatory typing for certbot packages

* Update certbot/certbot/_internal/plugins/disco.py

Co-authored-by: alexzorin <alex@zor.io>

* Remove unused type import

* Fix iterator type

* Fix type

* Fix types in selection

Co-authored-by: alexzorin <alex@zor.io>
This commit is contained in:
Adrien Ferrand
2021-11-25 23:00:03 +01:00
committed by GitHub
parent 7d3a344d43
commit 86406ab63a
20 changed files with 471 additions and 283 deletions

View File

@@ -335,7 +335,7 @@ class Registration(ResourceBody):
@classmethod
def from_data(cls, phone: Optional[str] = None, email: Optional[str] = None,
external_account_binding: Optional[ExternalAccountBinding] = None,
external_account_binding: Optional[Dict[str, Any]] = None,
**kwargs: Any) -> 'Registration':
"""
Create registration resource from contact details.

View File

@@ -4,7 +4,10 @@ import argparse
import logging
import logging.handlers
import sys
from typing import Any
from typing import List
from typing import Optional
from typing import Type
import certbot
from certbot._internal import constants
@@ -40,9 +43,9 @@ from certbot._internal.cli.plugins_parsing import _plugins_parsing
from certbot._internal.cli.subparsers import _create_subparsers
from certbot._internal.cli.verb_help import VERB_HELP
from certbot._internal.cli.verb_help import VERB_HELP_MAP
from certbot.plugins import enhancements
from certbot._internal.plugins import disco as plugins_disco
import certbot._internal.plugins.selection as plugin_selection
from certbot.plugins import enhancements
logger = logging.getLogger(__name__)
@@ -51,7 +54,8 @@ logger = logging.getLogger(__name__)
helpful_parser: Optional[HelpfulArgumentParser] = None
def prepare_and_parse_args(plugins, args, detect_defaults=False):
def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[str],
detect_defaults: bool = False) -> argparse.Namespace:
"""Returns parsed command line arguments.
:param .PluginsRegistry plugins: available plugins
@@ -443,7 +447,7 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False):
return helpful.parse_args()
def set_by_cli(var):
def set_by_cli(var: str) -> bool:
"""
Return True if a particular config variable has been set by the user
(CLI or config file) including if the user explicitly set it to the
@@ -487,7 +491,7 @@ def set_by_cli(var):
set_by_cli.detector = None # type: ignore
def has_default_value(option, value):
def has_default_value(option: str, value: Any) -> bool:
"""Does option have the default value?
If the default value of option is not known, False is returned.
@@ -505,7 +509,7 @@ def has_default_value(option, value):
return False
def option_was_set(option, value):
def option_was_set(option: str, value: Any) -> bool:
"""Was option set by the user or does it differ from the default?
:param str option: configuration variable being considered
@@ -521,7 +525,7 @@ def option_was_set(option, value):
return set_by_cli(option) or not has_default_value(option, value)
def argparse_type(variable):
def argparse_type(variable: Any) -> Type:
"""Return our argparse type function for a config variable (default: str)"""
# pylint: disable=protected-access
if helpful_parser is not None:

View File

@@ -2,6 +2,14 @@
import argparse
import copy
import inspect
from typing import Any
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
from acme import challenges
from certbot import configuration
@@ -10,24 +18,27 @@ from certbot import util
from certbot._internal import constants
from certbot.compat import os
if TYPE_CHECKING:
from certbot._internal.cli import helpful
class _Default:
"""A class to use as a default to detect if a value is set by a user"""
def __bool__(self):
def __bool__(self) -> bool:
return False
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return isinstance(other, _Default)
def __hash__(self):
def __hash__(self) -> int:
return id(_Default)
def __nonzero__(self):
def __nonzero__(self) -> bool:
return self.__bool__()
def read_file(filename, mode="rb"):
def read_file(filename: str, mode: str = "rb") -> Tuple[str, Any]:
"""Returns the given file's contents.
:param str filename: path to file
@@ -48,7 +59,7 @@ def read_file(filename, mode="rb"):
raise argparse.ArgumentTypeError(exc.strerror)
def flag_default(name):
def flag_default(name: str) -> Any:
"""Default value for CLI flag."""
# XXX: this is an internal housekeeping notion of defaults before
# argparse has been set up; it is not accurate for all flags. Call it
@@ -57,7 +68,7 @@ def flag_default(name):
return copy.deepcopy(constants.CLI_DEFAULTS[name])
def config_help(name, hidden=False):
def config_help(name: str, hidden: bool = False) -> Optional[str]:
"""Extract the help message for a `configuration.NamespaceConfig` property docstring."""
if hidden:
return argparse.SUPPRESS
@@ -73,11 +84,11 @@ class HelpfulArgumentGroup:
HelpfulArgumentParser when necessary.
"""
def __init__(self, helpful_arg_parser, topic):
def __init__(self, helpful_arg_parser: "helpful.HelpfulArgumentParser", topic: str) -> None:
self._parser = helpful_arg_parser
self._topic = topic
def add_argument(self, *args, **kwargs):
def add_argument(self, *args: Any, **kwargs: Any) -> None:
"""Add a new command line argument to the argument group."""
self._parser.add(self._topic, *args, **kwargs)
@@ -88,12 +99,12 @@ class CustomHelpFormatter(argparse.HelpFormatter):
In particular we fix https://bugs.python.org/issue28742
"""
def _get_help_string(self, action):
def _get_help_string(self, action: argparse.Action) -> Optional[str]:
helpstr = action.help
if '%(default)' not in action.help and '(default:' not in action.help:
if action.help and '%(default)' not in action.help and '(default:' not in action.help:
if action.default != argparse.SUPPRESS:
defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE]
if action.option_strings or action.nargs in defaulting_nargs:
if helpstr and (action.option_strings or action.nargs in defaulting_nargs):
helpstr += ' (default: %(default)s)'
return helpstr
@@ -101,12 +112,15 @@ class CustomHelpFormatter(argparse.HelpFormatter):
class _DomainsAction(argparse.Action):
"""Action class for parsing domains."""
def __call__(self, parser, namespace, domain, option_string=None):
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
domain: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
"""Just wrap add_domains in argparseese."""
add_domains(namespace, domain)
add_domains(namespace, str(domain) if domain is not None else None)
def add_domains(args_or_config, domains):
def add_domains(args_or_config: Union[argparse.Namespace, configuration.NamespaceConfig],
domains: Optional[str]) -> List[str]:
"""Registers new domains to be used during the current client run.
Domains are not added to the list of requested domains if they have
@@ -121,7 +135,10 @@ def add_domains(args_or_config, domains):
:rtype: `list` of `str`
"""
validated_domains = []
validated_domains: List[str] = []
if not domains:
return validated_domains
for domain in domains.split(","):
domain = util.enforce_domain_sanity(domain.strip())
validated_domains.append(domain)
@@ -137,11 +154,13 @@ class CaseInsensitiveList(list):
This class is passed to the `choices` argument of `argparse.add_arguments`
through the `helpful` wrapper. It is necessary due to special handling of
command line arguments by `set_by_cli` in which the `type_func` is not applied."""
def __contains__(self, element):
def __contains__(self, element: object) -> bool:
if not isinstance(element, str):
return False
return super().__contains__(element.lower())
def _user_agent_comment_type(value):
def _user_agent_comment_type(value: str) -> str:
if "(" in value or ")" in value:
raise argparse.ArgumentTypeError("may not contain parentheses")
return value
@@ -150,13 +169,17 @@ def _user_agent_comment_type(value):
class _EncodeReasonAction(argparse.Action):
"""Action class for parsing revocation reason."""
def __call__(self, parser, namespace, reason, option_string=None):
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
reason: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
"""Encodes the reason for certificate revocation."""
code = constants.REVOCATION_REASONS[reason.lower()]
if reason is None:
raise ValueError("Unexpected null reason.")
code = constants.REVOCATION_REASONS[str(reason).lower()]
setattr(namespace, self.dest, code)
def parse_preferred_challenges(pref_challs):
def parse_preferred_challenges(pref_challs: Iterable[str]) -> List[str]:
"""Translate and validate preferred challenges.
:param pref_challs: list of preferred challenge types
@@ -183,9 +206,13 @@ def parse_preferred_challenges(pref_challs):
class _PrefChallAction(argparse.Action):
"""Action class for parsing preferred challenges."""
def __call__(self, parser, namespace, pref_challs, option_string=None):
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
pref_challs: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
if pref_challs is None:
raise ValueError("Unexpected null pref_challs.")
try:
challs = parse_preferred_challenges(pref_challs.split(","))
challs = parse_preferred_challenges(str(pref_challs).split(","))
except errors.Error as error:
raise argparse.ArgumentError(self, str(error))
namespace.pref_challs.extend(challs)
@@ -194,7 +221,9 @@ class _PrefChallAction(argparse.Action):
class _DeployHookAction(argparse.Action):
"""Action class for parsing deploy hooks."""
def __call__(self, parser, namespace, values, option_string=None):
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
renew_hook_set = namespace.deploy_hook != namespace.renew_hook
if renew_hook_set and namespace.renew_hook != values:
raise argparse.ArgumentError(
@@ -205,7 +234,9 @@ class _DeployHookAction(argparse.Action):
class _RenewHookAction(argparse.Action):
"""Action class for parsing renew hooks."""
def __call__(self, parser, namespace, values, option_string=None):
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
deploy_hook_set = namespace.deploy_hook is not None
if deploy_hook_set and namespace.deploy_hook != values:
raise argparse.ArgumentError(
@@ -213,7 +244,7 @@ class _RenewHookAction(argparse.Action):
namespace.renew_hook = values
def nonnegative_int(value):
def nonnegative_int(value: str) -> int:
"""Converts value to an int and checks that it is not negative.
This function should used as the type parameter for argparse

View File

@@ -1,9 +1,14 @@
"""This module contains a function to add the groups of arguments for the help
display"""
from typing import TYPE_CHECKING
from certbot._internal.cli.verb_help import VERB_HELP
if TYPE_CHECKING:
from certbot._internal.cli import helpful
def _add_all_groups(helpful):
def _add_all_groups(helpful: "helpful.HelpfulArgumentParser") -> None:
helpful.add_group("automation", description="Flags for automating execution & other tweaks")
helpful.add_group("security", description="Security parameters & server settings")
helpful.add_group("testing",

View File

@@ -7,6 +7,10 @@ import glob
import sys
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union
import configargparse
@@ -29,6 +33,7 @@ from certbot._internal.cli.cli_utils import HelpfulArgumentGroup
from certbot._internal.cli.verb_help import VERB_HELP
from certbot._internal.cli.verb_help import VERB_HELP_MAP
from certbot._internal.display import obj as display_obj
from certbot._internal.plugins import disco
from certbot.compat import os
@@ -40,7 +45,8 @@ class HelpfulArgumentParser:
'certbot --help security' for security options.
"""
def __init__(self, args, plugins, detect_defaults=False):
def __init__(self, args: List[str], plugins: Iterable[str],
detect_defaults: bool = False) -> None:
from certbot._internal import main
self.VERBS = {
"auth": main.certonly,
@@ -80,6 +86,7 @@ class HelpfulArgumentParser:
self.determine_verb()
help1 = self.prescan_for_flag("-h", self.help_topics)
help2 = self.prescan_for_flag("--help", self.help_topics)
self.help_arg: Union[str, bool]
if isinstance(help1, bool) and isinstance(help2, bool):
self.help_arg = help1 or help2
else:
@@ -111,7 +118,7 @@ class HelpfulArgumentParser:
# Help that are synonyms for --help subcommands
COMMANDS_TOPICS = ["command", "commands", "subcommand", "subcommands", "verbs"]
def _list_subcommands(self):
def _list_subcommands(self) -> str:
longest = max(len(v) for v in VERB_HELP_MAP)
text = "The full list of available SUBCOMMANDS is:\n\n"
@@ -122,7 +129,7 @@ class HelpfulArgumentParser:
text += "\nYou can get more help on a specific subcommand with --help SUBCOMMAND\n"
return text
def _usage_string(self, plugins, help_arg):
def _usage_string(self, plugins: Iterable[str], help_arg: Union[str, bool]) -> str:
"""Make usage strings late so that plugins can be initialised late
:param plugins: all discovered plugins
@@ -150,13 +157,14 @@ class HelpfulArgumentParser:
# if we're doing --help all, the OVERVIEW is part of the SHORT_USAGE at
# the top; if we're doing --help someothertopic, it's OT so it's not
usage += COMMAND_OVERVIEW % (apache_doc, nginx_doc)
else:
elif isinstance(help_arg, str):
custom = VERB_HELP_MAP.get(help_arg, {}).get("usage", None)
usage = custom if custom else usage
# Only remaining case is help_arg == False, which gives effectively usage == SHORT_USAGE.
return usage
def remove_config_file_domains_for_renewal(self, parsed_args):
def remove_config_file_domains_for_renewal(self, parsed_args: argparse.Namespace) -> None:
"""Make "certbot renew" safe if domains are set in cli.ini."""
# Works around https://github.com/certbot/certbot/issues/4096
if self.verb == "renew":
@@ -164,7 +172,7 @@ class HelpfulArgumentParser:
if source.startswith("config_file") and "domains" in flags:
parsed_args.domains = _Default() if self.detect_defaults else []
def parse_args(self):
def parse_args(self) -> argparse.Namespace:
"""Parses command line arguments and returns the result.
:returns: parsed command line arguments
@@ -224,7 +232,7 @@ class HelpfulArgumentParser:
return parsed_args
def set_test_server(self, parsed_args):
def set_test_server(self, parsed_args: argparse.Namespace) -> None:
"""We have --staging/--dry-run; perform sanity check and set config.server"""
# Flag combinations should produce these results:
@@ -253,7 +261,7 @@ class HelpfulArgumentParser:
parsed_args.tos = True
parsed_args.register_unsafely_without_email = True
def handle_csr(self, parsed_args):
def handle_csr(self, parsed_args: argparse.Namespace) -> None:
"""Process a --csr flag."""
if parsed_args.verb != "certonly":
raise errors.Error("Currently, a CSR file may only be specified "
@@ -287,7 +295,7 @@ class HelpfulArgumentParser:
.format(", ".join(csr_domains), ", ".join(config_domains)))
def determine_verb(self):
def determine_verb(self) -> None:
"""Determines the verb/subcommand provided by the user.
This function works around some of the limitations of argparse.
@@ -311,7 +319,7 @@ class HelpfulArgumentParser:
self.verb = "run"
def prescan_for_flag(self, flag, possible_arguments):
def prescan_for_flag(self, flag: str, possible_arguments: Iterable[str]) -> Union[str, bool]:
"""Checks cli input for flags.
Check for a flag, which accepts a fixed set of possible arguments, in
@@ -332,7 +340,8 @@ class HelpfulArgumentParser:
pass
return True
def add(self, topics, *args, **kwargs):
def add(self, topics: Optional[Union[List[Optional[str]], str]], *args: Any,
**kwargs: Any) -> None:
"""Add a new command line argument.
:param topics: str or [str] help topic(s) this should be listed under,
@@ -367,7 +376,7 @@ class HelpfulArgumentParser:
if self.detect_defaults:
kwargs = self.modify_kwargs_for_default_detection(**kwargs)
if self.visible_topics[topic]:
if isinstance(topic, str) and self.visible_topics[topic]:
if topic in self.groups:
group = self.groups[topic]
group.add_argument(*args, **kwargs)
@@ -377,7 +386,7 @@ class HelpfulArgumentParser:
kwargs["help"] = argparse.SUPPRESS
self.parser.add_argument(*args, **kwargs)
def modify_kwargs_for_default_detection(self, **kwargs):
def modify_kwargs_for_default_detection(self, **kwargs: Any) -> Dict[str, Any]:
"""Modify an arg so we can check if it was set by the user.
Changes the parameters given to argparse when adding an argument
@@ -399,7 +408,7 @@ class HelpfulArgumentParser:
return kwargs
def add_deprecated_argument(self, argument_name, num_args):
def add_deprecated_argument(self, argument_name: str, num_args: int) -> None:
"""Adds a deprecated argument with the name argument_name.
Deprecated arguments are not shown in the help. If they are used
@@ -407,7 +416,7 @@ class HelpfulArgumentParser:
argument is deprecated and no other action is taken.
:param str argument_name: Name of deprecated argument.
:param int nargs: Number of arguments the option takes.
:param int num_args: Number of arguments the option takes.
"""
# certbot.util.add_deprecated_argument expects the normal add_argument
@@ -427,7 +436,8 @@ class HelpfulArgumentParser:
add_func = functools.partial(self.add, None)
util.add_deprecated_argument(add_func, argument_name, num_args)
def add_group(self, topic, verbs=(), **kwargs):
def add_group(self, topic: str, verbs: Iterable[str] = (),
**kwargs: Any) -> HelpfulArgumentGroup:
"""Create a new argument group.
This method must be called once for every topic, however, calls
@@ -449,7 +459,7 @@ class HelpfulArgumentParser:
self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"])
return HelpfulArgumentGroup(self, topic)
def add_plugin_args(self, plugins):
def add_plugin_args(self, plugins: disco.PluginsRegistry) -> None:
"""
Let each of the plugins add its own command line arguments, which
@@ -461,7 +471,7 @@ class HelpfulArgumentParser:
description=plugin_ep.long_description)
plugin_ep.plugin_cls.inject_parser_options(parser_or_group, name)
def determine_help_topics(self, chosen_topic):
def determine_help_topics(self, chosen_topic: Union[str, bool]) -> Dict[str, bool]:
"""
The user may have requested help on a topic, return a dict of which

View File

@@ -1,13 +1,19 @@
"""This is a module that adds configuration to the argument parser regarding
paths for certificates"""
from typing import TYPE_CHECKING
from typing import Union
from certbot._internal.cli.cli_utils import config_help
from certbot._internal.cli.cli_utils import flag_default
from certbot.compat import os
if TYPE_CHECKING:
from certbot._internal.cli import helpful
def _paths_parser(helpful):
def _paths_parser(helpful: "helpful.HelpfulArgumentParser") -> None:
add = helpful.add
verb = helpful.verb
verb: Union[str, bool] = helpful.verb
if verb == "help":
verb = helpful.help_arg
@@ -21,7 +27,7 @@ def _paths_parser(helpful):
add(["paths", "install", "revoke", "certonly", "manage"], "--cert-path", **cpkwargs)
section = "paths"
if verb in ("install", "revoke"):
if isinstance(verb, str) and verb in ("install", "revoke"):
section = verb
add(section, "--key-path", type=os.path.abspath,
help="Path to private key for certificate installation "

View File

@@ -1,8 +1,15 @@
"""This is a module that handles parsing of plugins for the argument parser"""
from typing import TYPE_CHECKING
from certbot._internal.cli.cli_utils import flag_default
from certbot._internal.plugins import disco
if TYPE_CHECKING:
from certbot._internal.cli import helpful
def _plugins_parsing(helpful, plugins):
def _plugins_parsing(helpful: "helpful.HelpfulArgumentParser",
plugins: disco.PluginsRegistry) -> None:
# It's nuts, but there are two "plugins" topics. Somehow this works
helpful.add_group(
"plugins", description="Plugin Selection: Certbot client supports an "

View File

@@ -1,4 +1,6 @@
"""This module creates subparsers for the argument parser"""
from typing import TYPE_CHECKING
from certbot import interfaces
from certbot._internal import constants
from certbot._internal.cli.cli_utils import _EncodeReasonAction
@@ -7,8 +9,11 @@ from certbot._internal.cli.cli_utils import CaseInsensitiveList
from certbot._internal.cli.cli_utils import flag_default
from certbot._internal.cli.cli_utils import read_file
if TYPE_CHECKING:
from certbot._internal.cli import helpful
def _create_subparsers(helpful):
def _create_subparsers(helpful: "helpful.HelpfulArgumentParser") -> None:
from certbot._internal.client import sample_user_agent # avoid import loops
helpful.add(
None, "--user-agent", default=flag_default("user_agent"),

View File

@@ -2,14 +2,13 @@
import datetime
import logging
import platform
from typing import cast
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import IO
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
import warnings
@@ -241,10 +240,8 @@ def perform_registration(acme: acme_client.ClientV2, config: configuration.Names
raise errors.Error(msg)
try:
# TODO: Remove the cast once certbot package is fully typed
newreg = messages.NewRegistration.from_data(
email=config.email,
external_account_binding=cast(Optional[messages.ExternalAccountBinding], eab))
email=config.email, external_account_binding=eab)
# Until ACME v1 support is removed from Certbot, we actually need the provided
# ACME client to be a wrapper of type BackwardsCompatibleClientV2.
# TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2
@@ -416,8 +413,7 @@ class Client:
elliptic_curve=elliptic_curve,
strict_permissions=self.config.strict_permissions,
)
# TODO: Remove the cast once certbot package is fully typed
csr = crypto_util.generate_csr(key, cast(Set[str], domains), self.config.csr_dir,
csr = crypto_util.generate_csr(key, domains, self.config.csr_dir,
self.config.must_staple, self.config.strict_permissions)
orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names)
@@ -668,8 +664,7 @@ class Client:
with error_handler.ErrorHandler(self._recovery_routine_with_msg, None):
for dom in domains:
try:
# TODO: Remove the cast once certbot package is fully typed
self.installer.enhance(dom, enhancement, cast(Optional[List[str]], options))
self.installer.enhance(dom, enhancement, options)
except errors.PluginEnhancementAlreadyPresent:
logger.info("Enhancement %s was already set.", enh_label)
except errors.PluginError:

View File

@@ -1,8 +1,14 @@
"""Provides Tab completion when prompting users for a path."""
import glob
from types import TracebackType
from typing import Callable
from typing import Iterator
from typing import Optional
from typing import Type
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing_extensions import Literal
# readline module is not available on all systems
try:
@@ -28,12 +34,12 @@ class Completer:
"""
def __init__(self):
def __init__(self) -> None:
self._iter: Iterator[str]
self._original_completer: Optional[Callable]
self._original_delims: str
def complete(self, text, state):
def complete(self, text: str, state: int) -> Optional[str]:
"""Provides path completion for use with readline.
:param str text: text to offer completions for
@@ -48,7 +54,7 @@ class Completer:
self._iter = glob.iglob(text + '*')
return next(self._iter, None)
def __enter__(self):
def __enter__(self) -> None:
self._original_completer = readline.get_completer()
self._original_delims = readline.get_completer_delims()
@@ -62,6 +68,9 @@ class Completer:
else:
readline.parse_and_bind('tab: complete')
def __exit__(self, unused_type, unused_value, unused_traceback):
def __exit__(self, unused_type: Optional[Type[BaseException]],
unused_value: Optional[BaseException],
unused_traceback: Optional[TracebackType]) -> 'Literal[False]':
readline.set_completer_delims(self._original_delims)
readline.set_completer(self._original_completer)
return False

View File

@@ -1,21 +1,25 @@
"""A dummy module with no effect for use on systems without readline."""
from typing import Callable
from typing import Iterable
from typing import List
from typing import Optional
def get_completer():
def get_completer() -> Optional[Callable[[], str]]:
"""An empty implementation of readline.get_completer."""
def get_completer_delims():
def get_completer_delims() -> List[str]:
"""An empty implementation of readline.get_completer_delims."""
def parse_and_bind(unused_command):
def parse_and_bind(unused_command: str) -> None:
"""An empty implementation of readline.parse_and_bind."""
def set_completer(unused_function=None):
def set_completer(unused_function: Optional[Callable[[], str]] = None) -> None:
"""An empty implementation of readline.set_completer."""
def set_completer_delims(unused_delims):
def set_completer_delims(unused_delims: Iterable[str]) -> None:
"""An empty implementation of readline.set_completer_delims."""

View File

@@ -1,7 +1,13 @@
"""This modules define the actual display implementations used in Certbot"""
import logging
import sys
from typing import Any
from typing import Iterable
from typing import List
from typing import Optional
from typing import TextIO
from typing import Tuple
from typing import TypeVar
from typing import Union
import zope.component
@@ -35,12 +41,14 @@ it as a heading)"""
# Adding a level of indirection causes the lookup of the global _DisplayService
# object to happen first avoiding this potential bug.
class _DisplayService:
def __init__(self):
def __init__(self) -> None:
self.display: Optional[Union[FileDisplay, NoninteractiveDisplay]] = None
_SERVICE = _DisplayService()
T = TypeVar("T")
# This use of IDisplay can be removed when this class is no longer accessible
# through the public API in certbot.display.util.
@@ -49,15 +57,14 @@ class FileDisplay:
"""File-based display."""
# see https://github.com/certbot/certbot/issues/3915
def __init__(self, outfile, force_interactive):
def __init__(self, outfile: TextIO, force_interactive: bool) -> None:
super().__init__()
self.outfile = outfile
self.force_interactive = force_interactive
self.skipped_interaction = False
def notification(self, message, pause=True,
wrap=True, force_interactive=False,
decorate=True):
def notification(self, message: str, pause: bool = True, wrap: bool = True,
force_interactive: bool = False, decorate: bool = True) -> None:
"""Displays a notification and waits for user acceptance.
:param str message: Message to display
@@ -89,9 +96,11 @@ class FileDisplay:
else:
logger.debug("Not pausing for user confirmation")
def menu(self, message, choices, ok_label=None, cancel_label=None, # pylint: disable=unused-argument
help_label=None, default=None, # pylint: disable=unused-argument
cli_flag=None, force_interactive=False, **unused_kwargs):
def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]],
ok_label: Optional[str] = None, cancel_label: Optional[str] = None, # pylint: disable=unused-argument
help_label: Optional[str] = None, default: Optional[int] = None, # pylint: disable=unused-argument
cli_flag: Optional[str] = None, force_interactive: bool = False,
**unused_kwargs: Any) -> Tuple[str, int]:
"""Display a menu.
.. todo:: This doesn't enable the help label/button (I wasn't sold on
@@ -113,8 +122,9 @@ class FileDisplay:
:rtype: tuple
"""
if self._return_default(message, default, cli_flag, force_interactive):
return OK, default
return_default = self._return_default(message, default, cli_flag, force_interactive)
if return_default is not None:
return OK, return_default
self._print_menu(message, choices)
@@ -122,8 +132,8 @@ class FileDisplay:
return code, selection - 1
def input(self, message, default=None,
cli_flag=None, force_interactive=False, **unused_kwargs):
def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None,
force_interactive: bool = False, **unused_kwargs: Any) -> Tuple[str, str]:
"""Accept input from the user.
:param str message: message to display to the user
@@ -138,8 +148,9 @@ class FileDisplay:
:rtype: tuple
"""
if self._return_default(message, default, cli_flag, force_interactive):
return OK, default
return_default = self._return_default(message, default, cli_flag, force_interactive)
if return_default is not None:
return OK, return_default
# Trailing space must be added outside of util.wrap_lines to
# be preserved
@@ -150,8 +161,9 @@ class FileDisplay:
return CANCEL, "-1"
return OK, ans
def yesno(self, message, yes_label="Yes", no_label="No", default=None,
cli_flag=None, force_interactive=False, **unused_kwargs):
def yesno(self, message: str, yes_label: str = "Yes", no_label: str = "No",
default: Optional[bool] = None, cli_flag: Optional[str] = None,
force_interactive: bool = False, **unused_kwargs: Any) -> bool:
"""Query the user with a yes/no question.
Yes and No label must begin with different letters, and must contain at
@@ -169,8 +181,9 @@ class FileDisplay:
:rtype: bool
"""
if self._return_default(message, default, cli_flag, force_interactive):
return default
return_default = self._return_default(message, default, cli_flag, force_interactive)
if return_default is not None:
return return_default
message = util.wrap_lines(message)
@@ -192,8 +205,9 @@ class FileDisplay:
ans.startswith(no_label[0].upper())):
return False
def checklist(self, message, tags, default=None,
cli_flag=None, force_interactive=False, **unused_kwargs):
def checklist(self, message: str, tags: List[str], default: Optional[List[str]] = None,
cli_flag: Optional[str] = None, force_interactive: bool = False,
**unused_kwargs: Any) -> Tuple[str, List[str]]:
"""Display a checklist.
:param str message: Message to display to user
@@ -209,8 +223,9 @@ class FileDisplay:
:rtype: tuple
"""
if self._return_default(message, default, cli_flag, force_interactive):
return OK, default
return_default = self._return_default(message, default, cli_flag, force_interactive)
if return_default is not None:
return OK, return_default
while True:
self._print_menu(message, tags)
@@ -233,22 +248,23 @@ class FileDisplay:
else:
return code, []
def _return_default(self, prompt, default, cli_flag, force_interactive):
def _return_default(self, prompt: str, default: Optional[T],
cli_flag: Optional[str], force_interactive: bool) -> Optional[T]:
"""Should we return the default instead of prompting the user?
:param str prompt: prompt for the user
:param default: default answer to prompt
:param T default: default answer to prompt
:param str cli_flag: command line option for setting an answer
to this question
:param bool force_interactive: if interactivity is forced
:returns: True if we should return the default without prompting
:rtype: bool
:returns: The default value if we should return it else `None`
:rtype: T or `None`
"""
# assert_valid_call(prompt, default, cli_flag, force_interactive)
if self._can_interact(force_interactive):
return False
return None
if default is None:
msg = "Unable to get an answer for the question:\n{0}".format(prompt)
if cli_flag:
@@ -259,9 +275,9 @@ class FileDisplay:
logger.debug(
"Falling back to default %s for the prompt:\n%s",
default, prompt)
return True
return default
def _can_interact(self, force_interactive):
def _can_interact(self, force_interactive: bool) -> bool:
"""Can we safely interact with the user?
:param bool force_interactive: if interactivity is forced
@@ -282,8 +298,9 @@ class FileDisplay:
self.skipped_interaction = True
return False
def directory_select(self, message, default=None, cli_flag=None,
force_interactive=False, **unused_kwargs):
def directory_select(self, message: str, default: Optional[str] = None,
cli_flag: Optional[str] = None, force_interactive: bool = False,
**unused_kwargs: Any) -> Tuple[str, str]:
"""Display a directory selection screen.
:param str message: prompt to give the user
@@ -300,7 +317,8 @@ class FileDisplay:
with completer.Completer():
return self.input(message, default, cli_flag, force_interactive)
def _scrub_checklist_input(self, indices, tags):
def _scrub_checklist_input(self, indices: Iterable[Union[str, int]],
tags: List[str]) -> List[str]:
"""Validate input and transform indices to appropriate tags.
:param list indices: input
@@ -312,21 +330,22 @@ class FileDisplay:
"""
# They should all be of type int
try:
indices = [int(index) for index in indices]
indices_int = [int(index) for index in indices]
except ValueError:
return []
# Remove duplicates
indices = list(set(indices))
indices_int = list(set(indices_int))
# Check all input is within range
for index in indices:
for index in indices_int:
if index < 1 or index > len(tags):
return []
# Transform indices to appropriate tags
return [tags[index - 1] for index in indices]
# Transform indices_int to appropriate tags
return [tags[index - 1] for index in indices_int]
def _print_menu(self, message, choices):
def _print_menu(self, message: str,
choices: Union[List[Tuple[str, str]], List[str]]) -> None:
"""Print a menu on the screen.
:param str message: title of menu
@@ -355,7 +374,7 @@ class FileDisplay:
self.outfile.write(SIDE_FRAME + os.linesep)
self.outfile.flush()
def _get_valid_int_ans(self, max_):
def _get_valid_int_ans(self, max_: int) -> Tuple[str, int]:
"""Get a numerical selection.
:param int max: The maximum entry (len of choices), must be positive
@@ -398,21 +417,23 @@ class FileDisplay:
class NoninteractiveDisplay:
"""A display utility implementation that never asks for interactive user input"""
def __init__(self, outfile, *unused_args, **unused_kwargs):
def __init__(self, outfile: TextIO, *unused_args: Any, **unused_kwargs: Any) -> None:
super().__init__()
self.outfile = outfile
def _interaction_fail(self, message, cli_flag, extra=""):
"""Error out in case of an attempt to interact in noninteractive mode"""
def _interaction_fail(self, message: str, cli_flag: Optional[str],
extra: str = "") -> errors.MissingCommandlineFlag:
"""Return error to raise in case of an attempt to interact in noninteractive mode"""
msg = "Missing command line flag or config entry for this setting:\n"
msg += message
if extra:
msg += "\n" + extra
if cli_flag:
msg += "\n\n(You can set this with the {0} flag)".format(cli_flag)
raise errors.MissingCommandlineFlag(msg)
return errors.MissingCommandlineFlag(msg)
def notification(self, message, pause=False, wrap=True, decorate=True, **unused_kwargs): # pylint: disable=unused-argument
def notification(self, message: str, pause: bool = False, wrap: bool = True, # pylint: disable=unused-argument
decorate: bool = True, **unused_kwargs: Any) -> None:
"""Displays a notification without waiting for user acceptance.
:param str message: Message to display to stdout
@@ -434,8 +455,10 @@ class NoninteractiveDisplay:
)
self.outfile.flush()
def menu(self, message, choices, ok_label=None, cancel_label=None,
help_label=None, default=None, cli_flag=None, **unused_kwargs):
def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]],
ok_label: Optional[str] = None, cancel_label: Optional[str] = None,
help_label: Optional[str] = None, default: Optional[int] = None,
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, int]:
# pylint: disable=unused-argument
"""Avoid displaying a menu.
@@ -454,11 +477,12 @@ class NoninteractiveDisplay:
"""
if default is None:
self._interaction_fail(message, cli_flag, "Choices: " + repr(choices))
raise self._interaction_fail(message, cli_flag, "Choices: " + repr(choices))
return OK, default
def input(self, message, default=None, cli_flag=None, **unused_kwargs):
def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None,
**unused_kwargs: Any) -> Tuple[str, str]:
"""Accept input from the user.
:param str message: message to display to the user
@@ -471,11 +495,12 @@ class NoninteractiveDisplay:
"""
if default is None:
self._interaction_fail(message, cli_flag)
raise self._interaction_fail(message, cli_flag)
return OK, default
def yesno(self, message, yes_label=None, no_label=None, # pylint: disable=unused-argument
default=None, cli_flag=None, **unused_kwargs):
def yesno(self, message: str, yes_label: Optional[str] = None, no_label: Optional[str] = None, # pylint: disable=unused-argument
default: Optional[bool] = None, cli_flag: Optional[str] = None,
**unused_kwargs: Any) -> bool:
"""Decide Yes or No, without asking anybody
:param str message: question for the user
@@ -487,11 +512,11 @@ class NoninteractiveDisplay:
"""
if default is None:
self._interaction_fail(message, cli_flag)
raise self._interaction_fail(message, cli_flag)
return default
def checklist(self, message, tags, default=None,
cli_flag=None, **unused_kwargs):
def checklist(self, message: str, tags: Iterable[str], default: Optional[List[str]] = None,
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, List[str]]:
"""Display a checklist.
:param str message: Message to display to user
@@ -505,11 +530,11 @@ class NoninteractiveDisplay:
"""
if default is None:
self._interaction_fail(message, cli_flag, "? ".join(tags) + "?")
raise self._interaction_fail(message, cli_flag, "? ".join(tags) + "?")
return OK, default
def directory_select(self, message, default=None,
cli_flag=None, **unused_kwargs):
def directory_select(self, message: str, default: Optional[str] = None,
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, str]:
"""Simulate prompting the user for a directory.
This function returns default if it is not ``None``, otherwise,

View File

@@ -1,12 +1,13 @@
"""Internal Certbot display utilities."""
from typing import List
import textwrap
import sys
import textwrap
from typing import List
from typing import Optional
from certbot.compat import misc
def wrap_lines(msg):
def wrap_lines(msg: str) -> str:
"""Format lines nicely to 80 chars.
:param str msg: Original message
@@ -28,7 +29,7 @@ def wrap_lines(msg):
return '\n'.join(fixed_l)
def parens_around_char(label):
def parens_around_char(label: str) -> str:
"""Place parens around first character of label.
:param str label: Must contain at least one character
@@ -37,7 +38,7 @@ def parens_around_char(label):
return "({first}){rest}".format(first=label[0], rest=label[1:])
def input_with_timeout(prompt=None, timeout=36000.0):
def input_with_timeout(prompt: Optional[str] = None, timeout: float = 36000.0) -> str:
"""Get user input with a timeout.
Behaves the same as the builtin input, however, an error is raised if
@@ -67,7 +68,7 @@ def input_with_timeout(prompt=None, timeout=36000.0):
return line.rstrip('\n')
def separate_list_input(input_):
def separate_list_input(input_: str) -> List[str]:
"""Separate a comma or space separated list.
:param str input_: input from the user
@@ -97,10 +98,10 @@ def summarize_domain_list(domains: List[str]) -> str:
if not domains:
return ""
l = len(domains)
if l == 1:
length = len(domains)
if length == 1:
return domains[0]
elif l == 2:
elif length == 2:
return " and ".join(domains)
else:
return "{0} and {1} more domains".format(domains[0], l-1)
return "{0} and {1} more domains".format(domains[0], length-1)

View File

@@ -1,9 +1,14 @@
"""Utilities for plugins discovery and selection."""
from collections.abc import Mapping
import itertools
import logging
import sys
from typing import Callable
from typing import cast
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import Optional
from typing import Type
from typing import Union
@@ -13,6 +18,7 @@ import pkg_resources
import zope.interface
import zope.interface.verify
from certbot import configuration
from certbot import errors
from certbot import interfaces
from certbot._internal import constants
@@ -49,7 +55,7 @@ class PluginEntryPoint:
# this object is mutable, don't allow it to be hashed!
__hash__ = None # type: ignore
def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix=False):
def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix: bool = False) -> None:
self.name = self.entry_point_to_plugin_name(entry_point, with_prefix)
self.plugin_cls: Type[interfaces.Plugin] = entry_point.load()
self.entry_point = entry_point
@@ -59,7 +65,7 @@ class PluginEntryPoint:
self._hidden = False
self._long_description: Optional[str] = None
def check_name(self, name):
def check_name(self, name: Optional[str]) -> bool:
"""Check if the name refers to this plugin."""
if name == self.name:
if self.warning_message:
@@ -68,43 +74,46 @@ class PluginEntryPoint:
return False
@classmethod
def entry_point_to_plugin_name(cls, entry_point, with_prefix):
def entry_point_to_plugin_name(cls, entry_point: pkg_resources.EntryPoint,
with_prefix: bool) -> str:
"""Unique plugin name for an ``entry_point``"""
if with_prefix:
if not entry_point.dist:
raise errors.Error(f"Entrypoint {entry_point.name} has no distribution!")
return entry_point.dist.key + ":" + entry_point.name
return entry_point.name
@property
def description(self):
def description(self) -> str:
"""Description of the plugin."""
return self.plugin_cls.description
@property
def description_with_name(self):
def description_with_name(self) -> str:
"""Description with name. Handy for UI."""
return "{0} ({1})".format(self.description, self.name)
@property
def long_description(self):
def long_description(self) -> str:
"""Long description of the plugin."""
if self._long_description:
return self._long_description
return getattr(self.plugin_cls, "long_description", self.description)
@long_description.setter
def long_description(self, description):
def long_description(self, description: str) -> None:
self._long_description = description
@property
def hidden(self):
def hidden(self) -> bool:
"""Should this plugin be hidden from UI?"""
return self._hidden or getattr(self.plugin_cls, "hidden", False)
@hidden.setter
def hidden(self, hide):
def hidden(self, hide: bool) -> None:
self._hidden = hide
def ifaces(self, *ifaces_groups):
def ifaces(self, *ifaces_groups: Iterable[Type]) -> bool:
"""Does plugin implements specified interface groups?"""
return not ifaces_groups or any(
all(_implements(self.plugin_cls, iface)
@@ -112,20 +121,20 @@ class PluginEntryPoint:
for ifaces in ifaces_groups)
@property
def initialized(self):
def initialized(self) -> bool:
"""Has the plugin been initialized already?"""
return self._initialized is not None
def init(self, config=None):
def init(self, config: Optional[configuration.NamespaceConfig] = None) -> interfaces.Plugin:
"""Memoized plugin initialization."""
if not self.initialized:
if not self._initialized:
self.entry_point.require() # fetch extras!
# For plugins implementing ABCs Plugin, Authenticator or Installer, the following
# line will raise an exception if some implementations of abstract methods are missing.
self._initialized = self.plugin_cls(config, self.name)
return self._initialized
def verify(self, ifaces):
def verify(self, ifaces: Iterable[Type]) -> bool:
"""Verify that the plugin conforms to the specified interfaces."""
if not self.initialized:
raise ValueError("Plugin is not initialized.")
@@ -136,13 +145,13 @@ class PluginEntryPoint:
return True
@property
def prepared(self):
def prepared(self) -> bool:
"""Has the plugin been prepared already?"""
if not self.initialized:
logger.debug(".prepared called on uninitialized %r", self)
return self._prepared is not None
def prepare(self):
def prepare(self) -> Union[bool, Error]:
"""Memoized plugin preparation."""
if self._initialized is None:
raise ValueError("Plugin is not initialized.")
@@ -161,29 +170,30 @@ class PluginEntryPoint:
self._prepared = error
else:
self._prepared = True
return self._prepared
# Mypy seems to fail to understand the actual type here, let's help it.
return cast(Union[bool, Error], self._prepared)
@property
def misconfigured(self):
def misconfigured(self) -> bool:
"""Is plugin misconfigured?"""
return isinstance(self._prepared, errors.MisconfigurationError)
@property
def problem(self):
def problem(self) -> Optional[Exception]:
"""Return the Exception raised during plugin setup, or None if all is well"""
if isinstance(self._prepared, Exception):
return self._prepared
return None
@property
def available(self):
def available(self) -> bool:
"""Is plugin available, i.e. prepared or misconfigured?"""
return self._prepared is True or self.misconfigured
def __repr__(self):
def __repr__(self) -> str:
return "PluginEntryPoint#{0}".format(self.name)
def __str__(self):
def __str__(self) -> str:
lines = [
"* {0}".format(self.name),
"Description: {0}".format(self.plugin_cls.description),
@@ -205,7 +215,7 @@ class PluginEntryPoint:
class PluginsRegistry(Mapping):
"""Plugins registry."""
def __init__(self, plugins):
def __init__(self, plugins: Mapping[str, PluginEntryPoint]) -> None:
# plugins are sorted so the same order is used between runs.
# This prevents deadlock caused by plugins acquiring a lock
# and ensures at least one concurrent Certbot instance will run
@@ -213,7 +223,7 @@ class PluginsRegistry(Mapping):
self._plugins = dict(sorted(plugins.items()))
@classmethod
def find_all(cls):
def find_all(cls) -> 'PluginsRegistry':
"""Find plugins using setuptools entry points."""
plugins: Dict[str, PluginEntryPoint] = {}
plugin_paths_string = os.getenv('CERTBOT_PLUGIN_PATH')
@@ -245,7 +255,9 @@ class PluginsRegistry(Mapping):
return cls(plugins)
@classmethod
def _load_entry_point(cls, entry_point, plugins, with_prefix):
def _load_entry_point(cls, entry_point: pkg_resources.EntryPoint,
plugins: Dict[str, PluginEntryPoint],
with_prefix: bool) -> PluginEntryPoint:
plugin_ep = PluginEntryPoint(entry_point, with_prefix)
if plugin_ep.name in plugins:
other_ep = plugins[plugin_ep.name]
@@ -261,47 +273,47 @@ class PluginsRegistry(Mapping):
return plugin_ep
def __getitem__(self, name):
def __getitem__(self, name: str) -> PluginEntryPoint:
return self._plugins[name]
def __iter__(self):
def __iter__(self) -> Iterator[str]:
return iter(self._plugins)
def __len__(self):
def __len__(self) -> int:
return len(self._plugins)
def init(self, config):
def init(self, config: configuration.NamespaceConfig) -> List[interfaces.Plugin]:
"""Initialize all plugins in the registry."""
return [plugin_ep.init(config) for plugin_ep
in self._plugins.values()]
def filter(self, pred):
def filter(self, pred: Callable[[PluginEntryPoint], bool]) -> "PluginsRegistry":
"""Filter plugins based on predicate."""
return type(self)({name: plugin_ep for name, plugin_ep
in self._plugins.items() if pred(plugin_ep)})
in self._plugins.items() if pred(plugin_ep)})
def visible(self):
def visible(self) -> "PluginsRegistry":
"""Filter plugins based on visibility."""
return self.filter(lambda plugin_ep: not plugin_ep.hidden)
def ifaces(self, *ifaces_groups):
def ifaces(self, *ifaces_groups: Iterable[Type]) -> "PluginsRegistry":
"""Filter plugins based on interfaces."""
return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups))
def verify(self, ifaces):
def verify(self, ifaces: Iterable[Type]) -> "PluginsRegistry":
"""Filter plugins based on verification."""
return self.filter(lambda p_ep: p_ep.verify(ifaces))
def prepare(self):
def prepare(self) -> List[Union[bool, Error]]:
"""Prepare all plugins in the registry."""
return [plugin_ep.prepare() for plugin_ep in self._plugins.values()]
def available(self):
def available(self) -> "PluginsRegistry":
"""Filter plugins based on availability."""
return self.filter(lambda p_ep: p_ep.available)
# successfully prepared + misconfigured
def find_init(self, plugin):
def find_init(self, plugin: interfaces.Plugin) -> Optional[PluginEntryPoint]:
"""Find an initialized plugin.
This is particularly useful for finding a name for the plugin::
@@ -321,12 +333,12 @@ class PluginsRegistry(Mapping):
return candidates[0]
return None
def __repr__(self):
def __repr__(self) -> str:
return "{0}({1})".format(
self.__class__.__name__, ','.join(
repr(p_ep) for p_ep in self._plugins.values()))
def __str__(self):
def __str__(self) -> str:
if not self._plugins:
return "No plugins"
return "\n\n".join(str(p_ep) for p_ep in self._plugins.values())

View File

@@ -1,6 +1,12 @@
"""Manual authenticator plugin"""
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Tuple
from typing import Type
from acme import challenges
from certbot import achallenges
@@ -88,7 +94,7 @@ asked to create multiple distinct TXT records with the same name. This is
permitted by DNS standards.)
"""
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.reverter = reverter.Reverter(self.config)
self.reverter.recovery_routine()
@@ -97,14 +103,14 @@ permitted by DNS standards.)
self.subsequent_any_challenge = False
@classmethod
def add_parser_arguments(cls, add):
def add_parser_arguments(cls, add: Callable[..., None]) -> None:
add('auth-hook',
help='Path or command to execute for the authentication script')
add('cleanup-hook',
help='Path or command to execute for the cleanup script')
util.add_deprecated_argument(add, 'public-ip-logging-ok', 0)
def prepare(self): # pylint: disable=missing-function-docstring
def prepare(self) -> None: # pylint: disable=missing-function-docstring
if self.config.noninteractive_mode and not self.conf('auth-hook'):
raise errors.PluginError(
'An authentication script must be provided with --{0} when '
@@ -112,7 +118,7 @@ permitted by DNS standards.)
self.option_name('auth-hook')))
self._validate_hooks()
def _validate_hooks(self):
def _validate_hooks(self) -> None:
if self.config.validate_hooks:
for name in ('auth-hook', 'cleanup-hook'):
hook = self.conf(name)
@@ -120,13 +126,13 @@ permitted by DNS standards.)
hook_prefix = self.option_name(name)[:-len('-hook')]
hooks.validate_hook(hook, hook_prefix)
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str: # pylint: disable=missing-function-docstring
return (
'This plugin allows the user to customize setup for domain '
'validation challenges either through shell scripts provided by '
'the user or by performing the setup manually.')
def auth_hint(self, failed_achalls):
def auth_hint(self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]) -> str:
has_chall = lambda cls: any(isinstance(achall.chall, cls) for achall in failed_achalls)
has_dns = has_chall(challenges.DNS01)
@@ -162,11 +168,12 @@ permitted by DNS standards.)
)
)
def get_chall_pref(self, domain):
def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01, challenges.DNS01]
def perform(self, achalls): # pylint: disable=missing-function-docstring
def perform(self, achalls: List[achallenges.AnnotatedChallenge]
) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
responses = []
last_dns_achall = 0
for i, achall in enumerate(achalls):
@@ -180,7 +187,8 @@ permitted by DNS standards.)
responses.append(achall.response(achall.account_key))
return responses
def _perform_achall_with_script(self, achall, achalls):
def _perform_achall_with_script(self, achall: achallenges.AnnotatedChallenge,
achalls: List[achallenges.AnnotatedChallenge]) -> None:
env = dict(CERTBOT_DOMAIN=achall.domain,
CERTBOT_VALIDATION=achall.validation(achall.account_key),
CERTBOT_ALL_DOMAINS=','.join(one_achall.domain for one_achall in achalls),
@@ -194,7 +202,8 @@ permitted by DNS standards.)
env['CERTBOT_AUTH_OUTPUT'] = out.strip()
self.env[achall] = env
def _perform_achall_manually(self, achall, last_dns_achall=False):
def _perform_achall_manually(self, achall: achallenges.AnnotatedChallenge,
last_dns_achall: bool = False) -> None:
validation = achall.validation(achall.account_key)
if isinstance(achall.chall, challenges.HTTP01):
msg = self._HTTP_INSTRUCTIONS.format(
@@ -225,7 +234,7 @@ permitted by DNS standards.)
display_util.notification(msg, wrap=False, force_interactive=True)
self.subsequent_any_challenge = True
def cleanup(self, achalls): # pylint: disable=missing-function-docstring
def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
if self.conf('cleanup-hook'):
for achall in achalls:
env = self.env.pop(achall)
@@ -235,7 +244,7 @@ permitted by DNS standards.)
self._execute_hook('cleanup-hook', achall.domain)
self.reverter.recovery_routine()
def _execute_hook(self, hook_name, achall_domain):
def _execute_hook(self, hook_name: str, achall_domain: str) -> Tuple[str, str]:
returncode, err, out = misc.execute_command_status(
self.option_name(hook_name), self.conf(hook_name),
env=util.env_no_snap_for_external_calls()

View File

@@ -1,5 +1,9 @@
"""Null plugin."""
import logging
from typing import Callable
from typing import List
from typing import Optional
from typing import Union
from certbot import interfaces
from certbot.plugins import common
@@ -14,41 +18,42 @@ class Installer(common.Plugin, interfaces.Installer):
hidden = True
@classmethod
def add_parser_arguments(cls, add):
def add_parser_arguments(cls, add: Callable[..., None]) -> None:
pass
# pylint: disable=missing-function-docstring
def prepare(self):
def prepare(self) -> None:
pass # pragma: no cover
def more_info(self):
def more_info(self) -> str:
return "Installer that doesn't do anything (for testing)."
def get_all_names(self):
def get_all_names(self) -> List[str]:
return []
def deploy_cert(self, domain, cert_path, key_path,
chain_path=None, fullchain_path=None):
def deploy_cert(self, domain: str, cert_path: str, key_path: str,
chain_path: str, fullchain_path: str) -> None:
pass # pragma: no cover
def enhance(self, domain, enhancement, options=None):
def enhance(self, domain: str, enhancement: str,
options: Optional[Union[List[str], str]] = None) -> None:
pass # pragma: no cover
def supported_enhancements(self):
def supported_enhancements(self) -> List[str]:
return []
def save(self, title=None, temporary=False):
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
pass # pragma: no cover
def rollback_checkpoints(self, rollback=1):
def rollback_checkpoints(self, rollback: int = 1) -> None:
pass # pragma: no cover
def recovery_routine(self):
def recovery_routine(self) -> None:
pass # pragma: no cover
def config_test(self):
def config_test(self) -> None:
pass # pragma: no cover
def restart(self):
def restart(self) -> None:
pass # pragma: no cover

View File

@@ -1,8 +1,13 @@
"""Decide which plugins to use for authentication & installation"""
import logging
from typing import cast
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar
from certbot import configuration
from certbot import errors
@@ -14,32 +19,36 @@ from certbot.display import util as display_util
logger = logging.getLogger(__name__)
def pick_configurator(
config, default, plugins,
question="How would you like to authenticate and install "
"certificates?"):
def pick_configurator(config: configuration.NamespaceConfig, default: Optional[str],
plugins: disco.PluginsRegistry,
question: str = "How would you like to authenticate and install "
"certificates?") -> Optional[interfaces.Plugin]:
"""Pick configurator plugin."""
return pick_plugin(
config, default, plugins, question,
(interfaces.Authenticator, interfaces.Installer))
def pick_installer(config, default, plugins,
question="How would you like to install certificates?"):
def pick_installer(config: configuration.NamespaceConfig, default: Optional[str],
plugins: disco.PluginsRegistry,
question: str = "How would you like to install certificates?"
) -> Optional[interfaces.Installer]:
"""Pick installer plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.Installer,))
return pick_plugin(config, default, plugins, question, (interfaces.Installer,))
def pick_authenticator(
config, default, plugins, question="How would you "
"like to authenticate with the ACME CA?"):
def pick_authenticator(config: configuration.NamespaceConfig, default: Optional[str],
plugins: disco.PluginsRegistry,
question: str = "How would you "
"like to authenticate with the ACME CA?"
) -> Optional[interfaces.Authenticator]:
"""Pick authentication plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.Authenticator,))
def get_unprepared_installer(config, plugins):
def get_unprepared_installer(config: configuration.NamespaceConfig,
plugins: disco.PluginsRegistry) -> Optional[interfaces.Installer]:
"""
Get an unprepared interfaces.Installer object.
@@ -69,10 +78,15 @@ def get_unprepared_installer(config, plugins):
"Could not select or initialize the requested installer %s." % req_inst)
def pick_plugin(config, default, plugins, question, ifaces):
P = TypeVar('P', bound=interfaces.Plugin)
def pick_plugin(config: configuration.NamespaceConfig, default: Optional[str],
plugins: disco.PluginsRegistry, question: str,
ifaces: Iterable[Type]) -> Optional[P]:
"""Pick plugin.
:param certbot.configuration.NamespaceConfig: Configuration
:param certbot.configuration.NamespaceConfig config: Configuration
:param str default: Plugin name supplied by user or ``None``.
:param certbot._internal.plugins.disco.PluginsRegistry plugins:
All plugins registered as entry points.
@@ -108,22 +122,23 @@ def pick_plugin(config, default, plugins, question, ifaces):
if len(prepared) > 1:
logger.debug("Multiple candidate plugins: %s", prepared)
plugin_ep = choose_plugin(list(prepared.values()), question)
if plugin_ep is None:
plugin_ep1 = choose_plugin(list(prepared.values()), question)
if plugin_ep1 is None:
return None
return plugin_ep.init()
return cast(P, plugin_ep1.init())
elif len(prepared) == 1:
plugin_ep = list(prepared.values())[0]
logger.debug("Single candidate plugin: %s", plugin_ep)
if plugin_ep.misconfigured:
plugin_ep2 = list(prepared.values())[0]
logger.debug("Single candidate plugin: %s", plugin_ep2)
if plugin_ep2.misconfigured:
return None
return plugin_ep.init()
return plugin_ep2.init()
else:
logger.debug("No candidate plugin")
return None
def choose_plugin(prepared, question):
def choose_plugin(prepared: List[disco.PluginEntryPoint],
question: str) -> Optional[disco.PluginEntryPoint]:
"""Allow the user to choose their plugin.
:param list prepared: List of `~.PluginEntryPoint`.
@@ -152,17 +167,29 @@ def choose_plugin(prepared, question):
else:
return None
noninstaller_plugins = ["webroot", "manual", "standalone", "dns-cloudflare", "dns-cloudxns",
"dns-digitalocean", "dns-dnsimple", "dns-dnsmadeeasy", "dns-gehirn",
"dns-google", "dns-linode", "dns-luadns", "dns-nsone", "dns-ovh",
"dns-rfc2136", "dns-route53", "dns-sakuracloud"]
def record_chosen_plugins(config, plugins, auth, inst):
"Update the config entries to reflect the plugins we actually selected."
config.authenticator = plugins.find_init(auth).name if auth else None
config.installer = plugins.find_init(inst).name if inst else None
def record_chosen_plugins(config: configuration.NamespaceConfig, plugins: disco.PluginsRegistry,
auth: Optional[interfaces.Authenticator],
inst: Optional[interfaces.Installer]) -> None:
"""Update the config entries to reflect the plugins we actually selected."""
config.authenticator = None
if auth:
auth_ep = plugins.find_init(auth)
if auth_ep:
config.authenticator = auth_ep.name
config.installer = None
if inst:
inst_ep = plugins.find_init(inst)
if inst_ep:
config.installer = inst_ep.name
logger.info("Plugins selected: Authenticator %s, Installer %s",
config.authenticator, config.installer)
config.authenticator, config.installer)
def choose_configurator_plugins(config: configuration.NamespaceConfig,
@@ -181,7 +208,7 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig,
"""
req_auth, req_inst = cli_plugin_requests(config)
installer_question = None
installer_question = ""
if verb == "enhance":
installer_question = ("Which installer would you like to use to "
@@ -209,11 +236,14 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig,
logger.warning("Specifying an authenticator doesn't make sense when "
"running Certbot with verb \"%s\"", verb)
# Try to meet the user's request and/or ask them to pick plugins
authenticator = installer = None
authenticator: Optional[interfaces.Authenticator] = None
installer: Optional[interfaces.Installer] = None
if verb == "run" and req_auth == req_inst:
# Unless the user has explicitly asked for different auth/install,
# only consider offering a single choice
authenticator = installer = pick_configurator(config, req_inst, plugins)
configurator = pick_configurator(config, req_inst, plugins)
authenticator = cast(Optional[interfaces.Authenticator], configurator)
installer = cast(Optional[interfaces.Installer], configurator)
else:
if need_inst or req_inst:
installer = pick_installer(config, req_inst, plugins, installer_question)
@@ -231,11 +261,11 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig,
return installer, authenticator
def set_configurator(previously, now):
def set_configurator(previously: Optional[str], now: Optional[str]) -> Optional[str]:
"""
Setting configurators multiple ways is okay, as long as they all agree
:param str previously: previously identified request for the installer/authenticator
:param str requested: the request currently being processed
:param str now: the request currently being processed
"""
if not now:
# we're not actually setting anything
@@ -247,7 +277,8 @@ def set_configurator(previously, now):
return now
def cli_plugin_requests(config):
def cli_plugin_requests(config: configuration.NamespaceConfig
) -> Tuple[Optional[str], Optional[str]]:
"""
Figure out which plugins the user requested with CLI and config options
@@ -302,7 +333,8 @@ def cli_plugin_requests(config):
return req_auth, req_inst
def diagnose_configurator_problem(cfg_type, requested, plugins):
def diagnose_configurator_problem(cfg_type: str, requested: Optional[str],
plugins: disco.PluginsRegistry) -> None:
"""
Raise the most helpful error message about a plugin being unavailable

View File

@@ -3,14 +3,19 @@ import collections
import errno
import logging
import socket
from typing import Any
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
import OpenSSL
from OpenSSL import crypto
from acme import challenges
from acme import standalone as acme_standalone
@@ -42,12 +47,15 @@ class ServerManager:
will serve the same URLs!
"""
def __init__(self, certs, http_01_resources):
self._instances: Dict[int, acme_standalone.BaseDualNetworkedServers] = {}
def __init__(self, certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]],
http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
) -> None:
self._instances: Dict[int, acme_standalone.HTTP01DualNetworkedServers] = {}
self.certs = certs
self.http_01_resources = http_01_resources
def run(self, port, challenge_type, listenaddr=""):
def run(self, port: int, challenge_type: Type[challenges.Challenge],
listenaddr: str = "") -> acme_standalone.HTTP01DualNetworkedServers:
"""Run ACME server on specified ``port``.
This method is idempotent, i.e. all calls with the same pair of
@@ -81,7 +89,7 @@ class ServerManager:
self._instances[real_port] = servers
return servers
def stop(self, port):
def stop(self, port: int) -> None:
"""Stop ACME server running on the specified ``port``.
:param int port:
@@ -94,7 +102,7 @@ class ServerManager:
instance.shutdown_and_server_close()
del self._instances[port]
def running(self):
def running(self) -> Dict[int, acme_standalone.HTTP01DualNetworkedServers]:
"""Return all running instances.
Once the server is stopped using `stop`, it will not be
@@ -118,7 +126,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
description = "Spin up a temporary webserver"
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.served: ServedType = collections.defaultdict(set)
@@ -127,44 +135,49 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
# values, main thread writes). Due to the nature of CPython's
# GIL, the operations are safe, c.f.
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
self.certs: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]] = {}
self.certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]] = {}
self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = set()
self.servers = ServerManager(self.certs, self.http_01_resources)
@classmethod
def add_parser_arguments(cls, add):
def add_parser_arguments(cls, add: Callable[..., None]) -> None:
pass # No additional argument for the standalone plugin parser
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str: # pylint: disable=missing-function-docstring
return("This authenticator creates its own ephemeral TCP listener "
"on the necessary port in order to respond to incoming "
"http-01 challenges from the certificate authority. Therefore, "
"it does not rely on any existing server program.")
def prepare(self): # pylint: disable=missing-function-docstring
def prepare(self) -> None: # pylint: disable=missing-function-docstring
pass
def get_chall_pref(self, domain):
def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01]
def perform(self, achalls): # pylint: disable=missing-function-docstring
def perform(self, achalls: Iterable[achallenges.AnnotatedChallenge]
) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
return [self._try_perform_single(achall) for achall in achalls]
def _try_perform_single(self, achall):
def _try_perform_single(self,
achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse:
while True:
try:
return self._perform_single(achall)
except errors.StandaloneBindError as error:
_handle_perform_error(error)
def _perform_single(self, achall):
def _perform_single(self,
achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse:
servers, response = self._perform_http_01(achall)
self.served[servers].add(achall)
return response
def _perform_http_01(self, achall):
def _perform_http_01(self, achall: achallenges.AnnotatedChallenge
) -> Tuple[acme_standalone.HTTP01DualNetworkedServers,
challenges.ChallengeResponse]:
port = self.config.http01_port
addr = self.config.http01_address
servers = self.servers.run(port, challenges.HTTP01, listenaddr=addr)
@@ -174,7 +187,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
self.http_01_resources.add(resource)
return servers, response
def cleanup(self, achalls): # pylint: disable=missing-function-docstring
def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
# reduce self.served and close servers if no challenges are served
for unused_servers, server_achalls in self.served.items():
for achall in achalls:
@@ -193,7 +206,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
"accept inbound connections from the internet.")
def _handle_perform_error(error):
def _handle_perform_error(error: errors.StandaloneBindError) -> None:
if error.socket_error.errno == errno.EACCES:
raise errors.PluginError(
"Could not bind TCP port {0} because you don't have "

View File

@@ -3,10 +3,17 @@ import argparse
import collections
import json
import logging
from typing import Any
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Type
from typing import Union
from acme import challenges
from certbot import crypto_util
@@ -57,11 +64,11 @@ necessary validation resources to appropriate paths on the file
system. It expects that there is some other HTTP server configured
to serve all files under specified web root ({0})."""
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str: # pylint: disable=missing-function-docstring
return self.MORE_INFO.format(self.conf("path"))
@classmethod
def add_parser_arguments(cls, add):
def add_parser_arguments(cls, add: Callable[..., None]) -> None:
add("path", "-w", default=[], action=_WebrootPathAction,
help="public_html / webroot path. This can be specified multiple "
"times to handle different domains; each domain will have "
@@ -78,34 +85,34 @@ to serve all files under specified web root ({0})."""
"file, it needs to be on a single line, like: webroot-map = "
'{"example.com":"/var/www"}.')
def auth_hint(self, failed_achalls): # pragma: no cover
def auth_hint(self, failed_achalls: Iterable[AnnotatedChallenge]) -> str: # pragma: no cover
return ("The Certificate Authority failed to download the temporary challenge files "
"created by Certbot. Ensure that the listed domains serve their content from "
"the provided --webroot-path/-w and that files created there can be downloaded "
"from the internet.")
def get_chall_pref(self, domain): # pragma: no cover
def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01]
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.full_roots: Dict[str, str] = {}
self.performed: DefaultDict[str, Set[AnnotatedChallenge]] = collections.defaultdict(set)
# stack of dirs successfully created by this authenticator
self._created_dirs: List[str] = []
def prepare(self): # pylint: disable=missing-function-docstring
def prepare(self) -> None: # pylint: disable=missing-function-docstring
pass
def perform(self, achalls): # pylint: disable=missing-function-docstring
def perform(self, achalls: Iterable[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
self._set_webroots(achalls)
self._create_challenge_dirs()
return [self._perform_single(achall) for achall in achalls]
def _set_webroots(self, achalls):
def _set_webroots(self, achalls: Iterable[AnnotatedChallenge]) -> None:
if self.conf("path"):
webroot_path = self.conf("path")[-1]
logger.info("Using the webroot path %s for all unmatched domains.",
@@ -127,7 +134,7 @@ to serve all files under specified web root ({0})."""
known_webroots.insert(0, new_webroot)
self.conf("map")[achall.domain] = new_webroot
def _prompt_for_webroot(self, domain, known_webroots):
def _prompt_for_webroot(self, domain: str, known_webroots: List[str]) -> Optional[str]:
webroot = None
while webroot is None:
@@ -142,7 +149,8 @@ to serve all files under specified web root ({0})."""
return webroot
def _prompt_with_webroot_list(self, domain, known_webroots):
def _prompt_with_webroot_list(self, domain: str,
known_webroots: List[str]) -> Optional[str]:
path_flag = "--" + self.option_name("path")
while True:
@@ -156,7 +164,7 @@ to serve all files under specified web root ({0})."""
"webroot when using the webroot plugin.")
return None if index == 0 else known_webroots[index - 1] # code == display_util.OK
def _prompt_for_new_webroot(self, domain, allowraise=False):
def _prompt_for_new_webroot(self, domain: str, allowraise: bool = False) -> Optional[str]:
code, webroot = ops.validated_directory(
_validate_webroot,
"Input the webroot for {0}:".format(domain),
@@ -169,7 +177,7 @@ to serve all files under specified web root ({0})."""
"webroot when using the webroot plugin.")
return _validate_webroot(webroot) # code == display_util.OK
def _create_challenge_dirs(self):
def _create_challenge_dirs(self) -> None:
path_map = self.conf("map")
if not path_map:
raise errors.PluginError(
@@ -227,10 +235,10 @@ to serve all files under specified web root ({0})."""
with safe_open(web_config_path, mode="w", chmod=0o644) as web_config:
web_config.write(_WEB_CONFIG_CONTENT)
def _get_validation_path(self, root_path, achall):
def _get_validation_path(self, root_path: str, achall: AnnotatedChallenge) -> str:
return os.path.join(root_path, achall.chall.encode("token"))
def _perform_single(self, achall):
def _perform_single(self, achall: AnnotatedChallenge) -> challenges.ChallengeResponse:
response, validation = achall.response_and_validation()
root_path = self.full_roots[achall.domain]
@@ -249,7 +257,7 @@ to serve all files under specified web root ({0})."""
self.performed[root_path].add(achall)
return response
def cleanup(self, achalls): # pylint: disable=missing-function-docstring
def cleanup(self, achalls: Iterable[AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
for achall in achalls:
root_path = self.full_roots.get(achall.domain, None)
if root_path is not None:
@@ -270,7 +278,6 @@ to serve all files under specified web root ({0})."""
logger.info("Not cleaning up the web.config file in %s "
"because it is not generated by Certbot.", root_path)
not_removed: List[str] = []
while self._created_dirs:
path = self._created_dirs.pop()
@@ -287,8 +294,12 @@ to serve all files under specified web root ({0})."""
class _WebrootMapAction(argparse.Action):
"""Action class for parsing webroot_map."""
def __call__(self, parser, namespace, webroot_map, option_string=None):
for domains, webroot_path in json.loads(webroot_map).items():
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
webroot_map: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
if webroot_map is None:
return
for domains, webroot_path in json.loads(str(webroot_map)).items():
webroot_path = _validate_webroot(webroot_path)
namespace.webroot_map.update(
(d, webroot_path) for d in cli.add_domains(namespace, domains))
@@ -297,11 +308,15 @@ class _WebrootMapAction(argparse.Action):
class _WebrootPathAction(argparse.Action):
"""Action class for parsing webroot_path."""
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._domain_before_webroot = False
def __call__(self, parser, namespace, webroot_path, option_string=None):
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
webroot_path: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
if webroot_path is None:
return
if self._domain_before_webroot:
raise errors.PluginError(
"If you specify multiple webroot paths, "
@@ -316,10 +331,10 @@ class _WebrootPathAction(argparse.Action):
elif namespace.domains:
self._domain_before_webroot = True
namespace.webroot_path.append(_validate_webroot(webroot_path))
namespace.webroot_path.append(_validate_webroot(str(webroot_path)))
def _validate_webroot(webroot_path):
def _validate_webroot(webroot_path: str) -> str:
"""Validates and returns the absolute path of webroot_path.
:param str webroot_path: path to the webroot directory

View File

@@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py
dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud
win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx
all_packages = {[base]win_all_packages} certbot-apache
fully_typed_source_paths = acme/acme
partially_typed_source_paths = certbot/certbot certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py
fully_typed_source_paths = acme/acme certbot/certbot
partially_typed_source_paths = certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py
[testenv]
passenv =