mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
Add type annotations to the certbot package (part 3) (#9086)
* Extract from #9084 * Cast/ignore types during the transition * Fix after review * Fix lint * Update certbot/certbot/_internal/storage.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/storage.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/main.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/main.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/client.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/client.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/auth_handler.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/auth_handler.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/auth_handler.py Co-authored-by: alexzorin <alex@zor.io> * Remove a cast usage * Fix import * Remove now useless cast * Update certbot/certbot/_internal/client.py Co-authored-by: alexzorin <alex@zor.io> Co-authored-by: alexzorin <alex@zor.io>
This commit is contained in:
@@ -5,9 +5,13 @@ import hashlib
|
||||
import logging
|
||||
import shutil
|
||||
import socket
|
||||
from typing import cast
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import josepy as jose
|
||||
@@ -17,6 +21,7 @@ import pytz
|
||||
from acme import fields as acme_fields
|
||||
from acme import messages
|
||||
from acme.client import ClientBase # pylint: disable=unused-import
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
@@ -53,7 +58,8 @@ class Account:
|
||||
creation_host = jose.Field("creation_host")
|
||||
register_to_eff = jose.Field("register_to_eff", omitempty=True)
|
||||
|
||||
def __init__(self, regr, key, meta=None):
|
||||
def __init__(self, regr: messages.RegistrationResource, key: jose.JWK,
|
||||
meta: Optional['Meta'] = None) -> None:
|
||||
self.key = key
|
||||
self.regr = regr
|
||||
self.meta = self.Meta(
|
||||
@@ -84,16 +90,16 @@ class Account:
|
||||
# account key (and thus its fingerprint) to be updated...
|
||||
|
||||
@property
|
||||
def slug(self):
|
||||
def slug(self) -> str:
|
||||
"""Short account identification string, useful for UI."""
|
||||
return "{1}@{0} ({2})".format(pyrfc3339.generate(
|
||||
self.meta.creation_dt), self.meta.creation_host, self.id[:4])
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
return "<{0}({1}, {2}, {3})>".format(
|
||||
self.__class__.__name__, self.regr, self.id, self.meta)
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
return (isinstance(other, self.__class__) and
|
||||
self.key == other.key and self.regr == other.regr and
|
||||
self.meta == other.meta)
|
||||
@@ -102,18 +108,18 @@ class Account:
|
||||
class AccountMemoryStorage(interfaces.AccountStorage):
|
||||
"""In-memory account storage."""
|
||||
|
||||
def __init__(self, initial_accounts=None):
|
||||
def __init__(self, initial_accounts: Dict[str, Account] = None) -> None:
|
||||
self.accounts = initial_accounts if initial_accounts is not None else {}
|
||||
|
||||
def find_all(self):
|
||||
def find_all(self) -> List[Account]:
|
||||
return list(self.accounts.values())
|
||||
|
||||
def save(self, account, client):
|
||||
def save(self, account: Account, client: ClientBase) -> None:
|
||||
if account.id in self.accounts:
|
||||
logger.debug("Overwriting account: %s", account.id)
|
||||
self.accounts[account.id] = account
|
||||
|
||||
def load(self, account_id):
|
||||
def load(self, account_id: str) -> Account:
|
||||
try:
|
||||
return self.accounts[account_id]
|
||||
except KeyError:
|
||||
@@ -136,30 +142,30 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
:ivar certbot.configuration.NamespaceConfig config: Client configuration
|
||||
|
||||
"""
|
||||
def __init__(self, config):
|
||||
def __init__(self, config: configuration.NamespaceConfig) -> None:
|
||||
self.config = config
|
||||
util.make_or_verify_dir(config.accounts_dir, 0o700, self.config.strict_permissions)
|
||||
|
||||
def _account_dir_path(self, account_id):
|
||||
def _account_dir_path(self, account_id: str) -> str:
|
||||
return self._account_dir_path_for_server_path(account_id, self.config.server_path)
|
||||
|
||||
def _account_dir_path_for_server_path(self, account_id, server_path):
|
||||
def _account_dir_path_for_server_path(self, account_id: str, server_path: str) -> str:
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
return os.path.join(accounts_dir, account_id)
|
||||
|
||||
@classmethod
|
||||
def _regr_path(cls, account_dir_path):
|
||||
def _regr_path(cls, account_dir_path: str) -> str:
|
||||
return os.path.join(account_dir_path, "regr.json")
|
||||
|
||||
@classmethod
|
||||
def _key_path(cls, account_dir_path):
|
||||
def _key_path(cls, account_dir_path: str) -> str:
|
||||
return os.path.join(account_dir_path, "private_key.json")
|
||||
|
||||
@classmethod
|
||||
def _metadata_path(cls, account_dir_path):
|
||||
def _metadata_path(cls, account_dir_path: str) -> str:
|
||||
return os.path.join(account_dir_path, "meta.json")
|
||||
|
||||
def _find_all_for_server_path(self, server_path):
|
||||
def _find_all_for_server_path(self, server_path: str) -> List[Account]:
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
try:
|
||||
candidates = os.listdir(accounts_dir)
|
||||
@@ -186,15 +192,16 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
accounts = prev_accounts
|
||||
return accounts
|
||||
|
||||
def find_all(self):
|
||||
def find_all(self) -> List[Account]:
|
||||
return self._find_all_for_server_path(self.config.server_path)
|
||||
|
||||
def _symlink_to_account_dir(self, prev_server_path, server_path, account_id):
|
||||
def _symlink_to_account_dir(self, prev_server_path: str, server_path: str,
|
||||
account_id: str) -> None:
|
||||
prev_account_dir = self._account_dir_path_for_server_path(account_id, prev_server_path)
|
||||
new_account_dir = self._account_dir_path_for_server_path(account_id, server_path)
|
||||
os.symlink(prev_account_dir, new_account_dir)
|
||||
|
||||
def _symlink_to_accounts_dir(self, prev_server_path, server_path):
|
||||
def _symlink_to_accounts_dir(self, prev_server_path: str, server_path: str) -> None:
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
if os.path.islink(accounts_dir):
|
||||
os.unlink(accounts_dir)
|
||||
@@ -203,7 +210,7 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
prev_account_dir = self.config.accounts_dir_for_server_path(prev_server_path)
|
||||
os.symlink(prev_account_dir, accounts_dir)
|
||||
|
||||
def _load_for_server_path(self, account_id, server_path):
|
||||
def _load_for_server_path(self, account_id: str, server_path: str) -> Account:
|
||||
account_dir_path = self._account_dir_path_for_server_path(account_id, server_path)
|
||||
if not os.path.isdir(account_dir_path): # isdir is also true for symlinks
|
||||
if server_path in constants.LE_REUSE_SERVERS:
|
||||
@@ -222,17 +229,21 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
|
||||
try:
|
||||
with open(self._regr_path(account_dir_path)) as regr_file:
|
||||
regr = messages.RegistrationResource.json_loads(regr_file.read())
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
regr = cast(messages.RegistrationResource,
|
||||
messages.RegistrationResource.json_loads(regr_file.read()))
|
||||
with open(self._key_path(account_dir_path)) as key_file:
|
||||
key = jose.JWK.json_loads(key_file.read())
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
key = cast(jose.JWK, jose.JWK.json_loads(key_file.read()))
|
||||
with open(self._metadata_path(account_dir_path)) as metadata_file:
|
||||
meta = Account.Meta.json_loads(metadata_file.read())
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
meta = cast(Account.Meta, Account.Meta.json_loads(metadata_file.read()))
|
||||
except IOError as error:
|
||||
raise errors.AccountStorageError(error)
|
||||
|
||||
return Account(regr, key, meta)
|
||||
|
||||
def load(self, account_id):
|
||||
def load(self, account_id: str) -> Account:
|
||||
return self._load_for_server_path(account_id, self.config.server_path)
|
||||
|
||||
def save(self, account: Account, client: ClientBase) -> None:
|
||||
@@ -275,7 +286,7 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
except IOError as error:
|
||||
raise errors.AccountStorageError(error)
|
||||
|
||||
def delete(self, account_id):
|
||||
def delete(self, account_id: str) -> None:
|
||||
"""Delete registration info from disk
|
||||
|
||||
:param account_id: id of account which should be deleted
|
||||
@@ -292,17 +303,18 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
if not os.listdir(self.config.accounts_dir):
|
||||
self._delete_accounts_dir_for_server_path(self.config.server_path)
|
||||
|
||||
def _delete_account_dir_for_server_path(self, account_id, server_path):
|
||||
def _delete_account_dir_for_server_path(self, account_id: str, server_path: str) -> None:
|
||||
link_func = functools.partial(self._account_dir_path_for_server_path, account_id)
|
||||
nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
|
||||
shutil.rmtree(nonsymlinked_dir)
|
||||
|
||||
def _delete_accounts_dir_for_server_path(self, server_path):
|
||||
def _delete_accounts_dir_for_server_path(self, server_path: str) -> None:
|
||||
link_func = self.config.accounts_dir_for_server_path
|
||||
nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
|
||||
os.rmdir(nonsymlinked_dir)
|
||||
|
||||
def _delete_links_and_find_target_dir(self, server_path, link_func):
|
||||
def _delete_links_and_find_target_dir(self, server_path: str,
|
||||
link_func: Callable[[str], str]) -> str:
|
||||
"""Delete symlinks and return the nonsymlinked directory path.
|
||||
|
||||
:param str server_path: file path based on server
|
||||
@@ -368,6 +380,6 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
uri=regr.uri)
|
||||
regr_file.write(regr.json_dumps())
|
||||
|
||||
def _update_meta(self, account, dir_path):
|
||||
def _update_meta(self, account: Account, dir_path: str) -> None:
|
||||
with open(self._metadata_path(dir_path), "w") as metadata_file:
|
||||
metadata_file.write(account.meta.json_dumps())
|
||||
|
||||
@@ -3,15 +3,25 @@ import datetime
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
|
||||
import josepy
|
||||
from requests.models import Response
|
||||
|
||||
from acme import challenges
|
||||
from acme import client
|
||||
from acme import errors as acme_errors
|
||||
from acme import messages
|
||||
from certbot import achallenges
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot._internal import error_handler
|
||||
from certbot._internal.account import Account
|
||||
from certbot.display import util as display_util
|
||||
from certbot.plugins import common as plugin_common
|
||||
|
||||
@@ -34,14 +44,17 @@ class AuthHandler:
|
||||
type strings with the most preferred challenge listed first
|
||||
|
||||
"""
|
||||
def __init__(self, auth, acme_client, account, pref_challs):
|
||||
def __init__(self, auth: interfaces.Authenticator, acme_client: Optional[client.ClientV2],
|
||||
account: Optional[Account], pref_challs: List[str]) -> None:
|
||||
self.auth = auth
|
||||
self.acme = acme_client
|
||||
|
||||
self.account = account
|
||||
self.pref_challs = pref_challs
|
||||
|
||||
def handle_authorizations(self, orderr, config, best_effort=False, max_retries=30):
|
||||
def handle_authorizations(self, orderr: messages.OrderResource,
|
||||
config: configuration.NamespaceConfig, best_effort: bool = False,
|
||||
max_retries: int = 30) -> List[messages.AuthorizationResource]:
|
||||
"""
|
||||
Retrieve all authorizations, perform all challenges required to validate
|
||||
these authorizations, then poll and wait for the authorization to be checked.
|
||||
@@ -57,6 +70,8 @@ class AuthHandler:
|
||||
authzrs = orderr.authorizations[:]
|
||||
if not authzrs:
|
||||
raise errors.AuthorizationError('No authorization to handle.')
|
||||
if not self.acme:
|
||||
raise errors.Error("No ACME client defined, authorizations cannot be handled.")
|
||||
|
||||
# Retrieve challenges that need to be performed to validate authorizations.
|
||||
achalls = self._choose_challenges(authzrs)
|
||||
@@ -97,6 +112,8 @@ class AuthHandler:
|
||||
|
||||
return authzrs_validated
|
||||
|
||||
raise errors.Error("An unexpected error occurred while handling the authorizations.")
|
||||
|
||||
def deactivate_valid_authorizations(self, orderr: messages.OrderResource) -> Tuple[List, List]:
|
||||
"""
|
||||
Deactivate all `valid` authorizations in the order, so that they cannot be re-used
|
||||
@@ -106,6 +123,9 @@ class AuthHandler:
|
||||
list of unsuccessfully deactivated authorizations.
|
||||
:rtype: tuple
|
||||
"""
|
||||
if not self.acme:
|
||||
raise errors.Error("No ACME client defined, cannot deactivate valid authorizations.")
|
||||
|
||||
to_deactivate = [authzr for authzr in orderr.authorizations
|
||||
if authzr.body.status == messages.STATUS_VALID]
|
||||
deactivated = []
|
||||
@@ -121,17 +141,22 @@ class AuthHandler:
|
||||
|
||||
return (deactivated, failed)
|
||||
|
||||
def _poll_authorizations(self, authzrs, max_retries, best_effort):
|
||||
def _poll_authorizations(self, authzrs: List[messages.AuthorizationResource], max_retries: int,
|
||||
best_effort: bool) -> None:
|
||||
"""
|
||||
Poll the ACME CA server, to wait for confirmation that authorizations have their challenges
|
||||
all verified. The poll may occur several times, until all authorizations are checked
|
||||
(valid or invalid), or after a maximum of retries.
|
||||
"""
|
||||
authzrs_to_check = {index: (authzr, None)
|
||||
if not self.acme:
|
||||
raise errors.Error("No ACME client defined, cannot poll authorizations.")
|
||||
|
||||
authzrs_to_check: Dict[int, Tuple[messages.AuthorizationResource,
|
||||
Optional[Response]]] = {index: (authzr, None)
|
||||
for index, authzr in enumerate(authzrs)}
|
||||
authzrs_failed_to_report = []
|
||||
# Give an initial second to the ACME CA server to check the authorizations
|
||||
sleep_seconds = 1
|
||||
sleep_seconds: float = 1
|
||||
for _ in range(max_retries):
|
||||
# Wait for appropriate time (from Retry-After, initial wait, or no wait)
|
||||
if sleep_seconds > 0:
|
||||
@@ -166,8 +191,10 @@ class AuthHandler:
|
||||
# and wait this time before polling again in next loop iteration.
|
||||
# From all the pending authorizations, we take the greatest Retry-After value
|
||||
# to avoid polling an authorization before its relevant Retry-After value.
|
||||
# (by construction resp cannot be None at that time, but mypy do not know it).
|
||||
retry_after = max(self.acme.retry_after(resp, 3)
|
||||
for _, resp in authzrs_to_check.values())
|
||||
for _, resp in authzrs_to_check.values()
|
||||
if resp is not None)
|
||||
sleep_seconds = (retry_after - datetime.datetime.now()).total_seconds()
|
||||
|
||||
# In case of failed authzrs, create a report to the user.
|
||||
@@ -181,12 +208,16 @@ class AuthHandler:
|
||||
# Here authzrs_to_check is still not empty, meaning we exceeded the max polling attempt.
|
||||
raise errors.AuthorizationError('All authorizations were not finalized by the CA.')
|
||||
|
||||
def _choose_challenges(self, authzrs):
|
||||
def _choose_challenges(self, authzrs: Iterable[messages.AuthorizationResource]
|
||||
) -> List[achallenges.AnnotatedChallenge]:
|
||||
"""
|
||||
Retrieve necessary and pending challenges to satisfy server.
|
||||
NB: Necessary and already validated challenges are not retrieved,
|
||||
as they can be reused for a certificate issuance.
|
||||
"""
|
||||
if not self.acme:
|
||||
raise errors.Error("No ACME client defined, cannot choose the challenges.")
|
||||
|
||||
pending_authzrs = [authzr for authzr in authzrs
|
||||
if authzr.body.status != messages.STATUS_VALID]
|
||||
achalls: List[achallenges.AnnotatedChallenge] = []
|
||||
@@ -208,7 +239,7 @@ class AuthHandler:
|
||||
|
||||
return achalls
|
||||
|
||||
def _get_chall_pref(self, domain):
|
||||
def _get_chall_pref(self, domain: str) -> List[Type[challenges.Challenge]]:
|
||||
"""Return list of challenge preferences.
|
||||
|
||||
:param str domain: domain for which you are requesting preferences
|
||||
@@ -230,7 +261,7 @@ class AuthHandler:
|
||||
chall_prefs.extend(plugin_pref)
|
||||
return chall_prefs
|
||||
|
||||
def _cleanup_challenges(self, achalls):
|
||||
def _cleanup_challenges(self, achalls: List[achallenges.AnnotatedChallenge]) -> None:
|
||||
"""Cleanup challenges.
|
||||
|
||||
:param achalls: annotated challenges to cleanup
|
||||
@@ -240,7 +271,8 @@ class AuthHandler:
|
||||
logger.info("Cleaning up challenges")
|
||||
self.auth.cleanup(achalls)
|
||||
|
||||
def _challenge_factory(self, authzr, path):
|
||||
def _challenge_factory(self, authzr: messages.AuthorizationResource,
|
||||
path: List[int]) -> List[achallenges.AnnotatedChallenge]:
|
||||
"""Construct Namedtuple Challenges
|
||||
|
||||
:param messages.AuthorizationResource authzr: authorization
|
||||
@@ -248,12 +280,14 @@ class AuthHandler:
|
||||
:param list path: List of indices from `challenges`.
|
||||
|
||||
:returns: achalls, list of challenge type
|
||||
:class:`certbot.achallenges.Indexed`
|
||||
:class:`certbot.achallenges.AnnotatedChallenge`
|
||||
:rtype: list
|
||||
|
||||
:raises .errors.Error: if challenge type is not recognized
|
||||
|
||||
"""
|
||||
if not self.account:
|
||||
raise errors.Error("Account is not set.")
|
||||
achalls = []
|
||||
|
||||
for index in path:
|
||||
@@ -265,6 +299,8 @@ class AuthHandler:
|
||||
|
||||
def _report_failed_authzrs(self, failed_authzrs: List[messages.AuthorizationResource]) -> None:
|
||||
"""Notifies the user about failed authorizations."""
|
||||
if not self.account:
|
||||
raise errors.Error("Account is not set.")
|
||||
problems: Dict[str, List[achallenges.AnnotatedChallenge]] = {}
|
||||
failed_achalls = [challb_to_achall(challb, self.account.key, authzr.body.identifier.value)
|
||||
for authzr in failed_authzrs for challb in authzr.body.challenges
|
||||
@@ -273,8 +309,9 @@ class AuthHandler:
|
||||
for achall in failed_achalls:
|
||||
problems.setdefault(achall.error.typ, []).append(achall)
|
||||
|
||||
msg = [f"\nCertbot failed to authenticate some domains (authenticator: {self.auth.name})."
|
||||
" The Certificate Authority reported these problems:"]
|
||||
msg = ["\nCertbot failed to authenticate some domains "
|
||||
f"(authenticator: {self.auth.name})."
|
||||
" The Certificate Authority reported these problems:"]
|
||||
|
||||
for _, achalls in sorted(problems.items(), key=lambda item: item[0]):
|
||||
msg.append(_generate_failed_chall_msg(achalls))
|
||||
@@ -287,7 +324,8 @@ class AuthHandler:
|
||||
display_util.notify("".join(msg))
|
||||
|
||||
|
||||
def challb_to_achall(challb, account_key, domain):
|
||||
def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK,
|
||||
domain: str) -> achallenges.AnnotatedChallenge:
|
||||
"""Converts a ChallengeBody object to an AnnotatedChallenge.
|
||||
|
||||
:param .ChallengeBody challb: ChallengeBody
|
||||
@@ -310,7 +348,9 @@ def challb_to_achall(challb, account_key, domain):
|
||||
"Received unsupported challenge of type: {0}".format(chall.typ))
|
||||
|
||||
|
||||
def gen_challenge_path(challbs, preferences, combinations):
|
||||
def gen_challenge_path(challbs: List[messages.ChallengeBody],
|
||||
preferences: List[Type[challenges.Challenge]],
|
||||
combinations: Tuple[List[int], ...]) -> List[int]:
|
||||
"""Generate a plan to get authority over the identity.
|
||||
|
||||
.. todo:: This can be possibly be rewritten to use resolved_combinations.
|
||||
@@ -328,8 +368,8 @@ def gen_challenge_path(challbs, preferences, combinations):
|
||||
:class:`acme.messages.Challenge`, each of which would
|
||||
be sufficient to prove possession of the identifier.
|
||||
|
||||
:returns: tuple of indices from ``challenges``.
|
||||
:rtype: tuple
|
||||
:returns: list of indices from ``challenges``.
|
||||
:rtype: list
|
||||
|
||||
:raises certbot.errors.AuthorizationError: If a
|
||||
path cannot be created that satisfies the CA given the preferences and
|
||||
@@ -341,7 +381,10 @@ def gen_challenge_path(challbs, preferences, combinations):
|
||||
return _find_dumb_path(challbs, preferences)
|
||||
|
||||
|
||||
def _find_smart_path(challbs, preferences, combinations):
|
||||
def _find_smart_path(challbs: List[messages.ChallengeBody],
|
||||
preferences: List[Type[challenges.Challenge]],
|
||||
combinations: Tuple[List[int], ...]
|
||||
) -> List[int]:
|
||||
"""Find challenge path with server hints.
|
||||
|
||||
Can be called if combinations is included. Function uses a simple
|
||||
@@ -356,7 +399,7 @@ def _find_smart_path(challbs, preferences, combinations):
|
||||
|
||||
# max_cost is now equal to sum(indices) + 1
|
||||
|
||||
best_combo = None
|
||||
best_combo: Optional[List[int]] = None
|
||||
# Set above completing all of the available challenges
|
||||
best_combo_cost = max_cost
|
||||
|
||||
@@ -373,12 +416,13 @@ def _find_smart_path(challbs, preferences, combinations):
|
||||
combo_total = 0
|
||||
|
||||
if not best_combo:
|
||||
_report_no_chall_path(challbs)
|
||||
raise _report_no_chall_path(challbs)
|
||||
|
||||
return best_combo
|
||||
|
||||
|
||||
def _find_dumb_path(challbs, preferences):
|
||||
def _find_dumb_path(challbs: List[messages.ChallengeBody],
|
||||
preferences: List[Type[challenges.Challenge]]) -> List[int]:
|
||||
"""Find challenge path without server hints.
|
||||
|
||||
Should be called if the combinations hint is not included by the
|
||||
@@ -394,16 +438,19 @@ def _find_dumb_path(challbs, preferences):
|
||||
if supported:
|
||||
path.append(i)
|
||||
else:
|
||||
_report_no_chall_path(challbs)
|
||||
raise _report_no_chall_path(challbs)
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def _report_no_chall_path(challbs):
|
||||
"""Logs and raises an error that no satisfiable chall path exists.
|
||||
def _report_no_chall_path(challbs: List[messages.ChallengeBody]) -> errors.AuthorizationError:
|
||||
"""Logs and return a raisable error reporting that no satisfiable chall path exists.
|
||||
|
||||
:param challbs: challenges from the authorization that can't be satisfied
|
||||
|
||||
:returns: An authorization error
|
||||
:rtype: certbot.errors.AuthorizationError
|
||||
|
||||
"""
|
||||
msg = ("Client with the currently selected authenticator does not support "
|
||||
"any combination of challenges that will satisfy the CA.")
|
||||
@@ -412,7 +459,7 @@ def _report_no_chall_path(challbs):
|
||||
" You may need to use an authenticator "
|
||||
"plugin that can do challenges over DNS.")
|
||||
logger.critical(msg)
|
||||
raise errors.AuthorizationError(msg)
|
||||
return errors.AuthorizationError(msg)
|
||||
|
||||
|
||||
def _generate_failed_chall_msg(failed_achalls: List[achallenges.AnnotatedChallenge]) -> str:
|
||||
|
||||
@@ -3,10 +3,18 @@ import datetime
|
||||
import logging
|
||||
import re
|
||||
import traceback
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
import pytz
|
||||
|
||||
from certbot import configuration
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import ocsp
|
||||
@@ -22,7 +30,7 @@ logger = logging.getLogger(__name__)
|
||||
###################
|
||||
|
||||
|
||||
def update_live_symlinks(config):
|
||||
def update_live_symlinks(config: configuration.NamespaceConfig) -> None:
|
||||
"""Update the certificate file family symlinks to use archive_dir.
|
||||
|
||||
Use the information in the config file to make symlinks point to
|
||||
@@ -38,7 +46,7 @@ def update_live_symlinks(config):
|
||||
storage.RenewableCert(renewal_file, config, update_symlinks=True)
|
||||
|
||||
|
||||
def rename_lineage(config):
|
||||
def rename_lineage(config: configuration.NamespaceConfig) -> None:
|
||||
"""Rename the specified lineage to the new name.
|
||||
|
||||
:param config: Configuration.
|
||||
@@ -64,7 +72,7 @@ def rename_lineage(config):
|
||||
.format(certname, new_certname), pause=False)
|
||||
|
||||
|
||||
def certificates(config):
|
||||
def certificates(config: configuration.NamespaceConfig) -> None:
|
||||
"""Display information about certs configured with Certbot
|
||||
|
||||
:param config: Configuration.
|
||||
@@ -87,7 +95,7 @@ def certificates(config):
|
||||
_describe_certs(config, parsed_certs, parse_failures)
|
||||
|
||||
|
||||
def delete(config):
|
||||
def delete(config: configuration.NamespaceConfig) -> None:
|
||||
"""Delete Certbot files associated with a certificate lineage."""
|
||||
certnames = get_certnames(config, "delete", allow_multiple=True)
|
||||
msg = ["The following certificate(s) are selected for deletion:\n"]
|
||||
@@ -112,7 +120,9 @@ def delete(config):
|
||||
# Public Helpers
|
||||
###################
|
||||
|
||||
def lineage_for_certname(cli_config, certname):
|
||||
|
||||
def lineage_for_certname(cli_config: configuration.NamespaceConfig,
|
||||
certname: str) -> Optional[storage.RenewableCert]:
|
||||
"""Find a lineage object with name certname."""
|
||||
configs_dir = cli_config.renewal_configs_dir
|
||||
# Verify the directory is there
|
||||
@@ -129,13 +139,16 @@ def lineage_for_certname(cli_config, certname):
|
||||
return None
|
||||
|
||||
|
||||
def domains_for_certname(config, certname):
|
||||
def domains_for_certname(config: configuration.NamespaceConfig,
|
||||
certname: str) -> Optional[List[str]]:
|
||||
"""Find the domains in the cert with name certname."""
|
||||
lineage = lineage_for_certname(config, certname)
|
||||
return lineage.names() if lineage else None
|
||||
|
||||
|
||||
def find_duplicative_certs(config, domains):
|
||||
def find_duplicative_certs(config: configuration.NamespaceConfig,
|
||||
domains: List[str]) -> Tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]:
|
||||
"""Find existing certs that match the given domain names.
|
||||
|
||||
This function searches for certificates whose domains are equal to
|
||||
@@ -158,7 +171,11 @@ def find_duplicative_certs(config, domains):
|
||||
:rtype: `tuple` of `storage.RenewableCert` or `None`
|
||||
|
||||
"""
|
||||
def update_certs_for_domain_matches(candidate_lineage, rv):
|
||||
def update_certs_for_domain_matches(candidate_lineage: storage.RenewableCert,
|
||||
rv: Tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]
|
||||
) -> Tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]:
|
||||
"""Return cert as identical_names_cert if it matches,
|
||||
or subset_names_cert if it matches as subset
|
||||
"""
|
||||
@@ -177,10 +194,12 @@ def find_duplicative_certs(config, domains):
|
||||
subset_names_cert = candidate_lineage
|
||||
return (identical_names_cert, subset_names_cert)
|
||||
|
||||
return _search_lineages(config, update_certs_for_domain_matches, (None, None))
|
||||
init: Tuple[Optional[storage.RenewableCert], Optional[storage.RenewableCert]] = (None, None)
|
||||
|
||||
return _search_lineages(config, update_certs_for_domain_matches, init)
|
||||
|
||||
|
||||
def _archive_files(candidate_lineage, filetype):
|
||||
def _archive_files(candidate_lineage: storage.RenewableCert, filetype: str) -> Optional[List[str]]:
|
||||
""" In order to match things like:
|
||||
/etc/letsencrypt/archive/example.com/chain1.pem.
|
||||
|
||||
@@ -202,7 +221,8 @@ def _archive_files(candidate_lineage, filetype):
|
||||
return None
|
||||
|
||||
|
||||
def _acceptable_matches():
|
||||
def _acceptable_matches() -> List[Union[Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[List[str]]]]]:
|
||||
""" Generates the list that's passed to match_and_check_overlaps. Is its own function to
|
||||
make unit testing easier.
|
||||
|
||||
@@ -213,7 +233,7 @@ def _acceptable_matches():
|
||||
lambda x: _archive_files(x, "cert"), lambda x: _archive_files(x, "fullchain")]
|
||||
|
||||
|
||||
def cert_path_to_lineage(cli_config):
|
||||
def cert_path_to_lineage(cli_config: configuration.NamespaceConfig) -> str:
|
||||
""" If config.cert_path is defined, try to find an appropriate value for config.certname.
|
||||
|
||||
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments
|
||||
@@ -226,11 +246,16 @@ def cert_path_to_lineage(cli_config):
|
||||
"""
|
||||
acceptable_matches = _acceptable_matches()
|
||||
match = match_and_check_overlaps(cli_config, acceptable_matches,
|
||||
lambda x: cli_config.cert_path, lambda x: x.lineagename)
|
||||
lambda x: cli_config.cert_path, lambda x: x.lineagename)
|
||||
return match[0]
|
||||
|
||||
|
||||
def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func):
|
||||
def match_and_check_overlaps(cli_config: configuration.NamespaceConfig,
|
||||
acceptable_matches: Iterable[Union[
|
||||
Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[List[str]]]]],
|
||||
match_func: Callable[[storage.RenewableCert], str],
|
||||
rv_func: Callable[[storage.RenewableCert], str]) -> List[str]:
|
||||
""" Searches through all lineages for a match, and checks for duplicates.
|
||||
If a duplicate is found, an error is raised, as performing operations on lineages
|
||||
that have their properties incorrectly duplicated elsewhere is probably a bad idea.
|
||||
@@ -241,21 +266,24 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
|
||||
:param function rv_func: specifies what to return
|
||||
|
||||
"""
|
||||
def find_matches(candidate_lineage, return_value, acceptable_matches):
|
||||
def find_matches(candidate_lineage: storage.RenewableCert, return_value: List[str],
|
||||
acceptable_matches: Iterable[Union[
|
||||
Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[List[str]]]]]) -> List[str]:
|
||||
"""Returns a list of matches using _search_lineages."""
|
||||
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
|
||||
acceptable_matches_resolved = [func(candidate_lineage) for func in acceptable_matches]
|
||||
acceptable_matches_rv: List[str] = []
|
||||
for item in acceptable_matches:
|
||||
for item in acceptable_matches_resolved:
|
||||
if isinstance(item, list):
|
||||
acceptable_matches_rv += item
|
||||
else:
|
||||
elif item:
|
||||
acceptable_matches_rv.append(item)
|
||||
match = match_func(candidate_lineage)
|
||||
if match in acceptable_matches_rv:
|
||||
return_value.append(rv_func(candidate_lineage))
|
||||
return return_value
|
||||
|
||||
matched = _search_lineages(cli_config, find_matches, [], acceptable_matches)
|
||||
matched: List[str] = _search_lineages(cli_config, find_matches, [], acceptable_matches)
|
||||
if not matched:
|
||||
raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path))
|
||||
elif len(matched) > 1:
|
||||
@@ -263,7 +291,8 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
|
||||
return matched
|
||||
|
||||
|
||||
def human_readable_cert_info(config, cert, skip_filter_checks=False):
|
||||
def human_readable_cert_info(config: configuration.NamespaceConfig, cert: storage.RenewableCert,
|
||||
skip_filter_checks: bool = False) -> Optional[str]:
|
||||
""" Returns a human readable description of info about a RenewableCert object"""
|
||||
certinfo = []
|
||||
checker = ocsp.RevocationChecker()
|
||||
@@ -312,7 +341,8 @@ def human_readable_cert_info(config, cert, skip_filter_checks=False):
|
||||
return "".join(certinfo)
|
||||
|
||||
|
||||
def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
|
||||
def get_certnames(config: configuration.NamespaceConfig, verb: str, allow_multiple: bool = False,
|
||||
custom_prompt: Optional[str] = None) -> List[str]:
|
||||
"""Get certname from flag, interactively, or error out."""
|
||||
certname = config.certname
|
||||
if certname:
|
||||
@@ -350,12 +380,13 @@ def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
|
||||
###################
|
||||
|
||||
|
||||
def _report_lines(msgs):
|
||||
def _report_lines(msgs: Iterable[str]) -> str:
|
||||
"""Format a results report for a category of single-line renewal outcomes"""
|
||||
return " " + "\n ".join(str(msg) for msg in msgs)
|
||||
|
||||
|
||||
def _report_human_readable(config, parsed_certs):
|
||||
def _report_human_readable(config: configuration.NamespaceConfig,
|
||||
parsed_certs: Iterable[storage.RenewableCert]) -> str:
|
||||
"""Format a results report for a parsed cert"""
|
||||
certinfo = []
|
||||
for cert in parsed_certs:
|
||||
@@ -365,7 +396,9 @@ def _report_human_readable(config, parsed_certs):
|
||||
return "\n".join(certinfo)
|
||||
|
||||
|
||||
def _describe_certs(config, parsed_certs, parse_failures):
|
||||
def _describe_certs(config: configuration.NamespaceConfig,
|
||||
parsed_certs: Iterable[storage.RenewableCert],
|
||||
parse_failures: Iterable[str]) -> None:
|
||||
"""Print information about the certs we know about"""
|
||||
out: List[str] = []
|
||||
|
||||
@@ -386,7 +419,10 @@ def _describe_certs(config, parsed_certs, parse_failures):
|
||||
display_util.notification("\n".join(out), pause=False, wrap=False)
|
||||
|
||||
|
||||
def _search_lineages(cli_config, func, initial_rv, *args):
|
||||
T = TypeVar('T')
|
||||
|
||||
def _search_lineages(cli_config: configuration.NamespaceConfig, func: Callable[..., T],
|
||||
initial_rv: T, *args: Any) -> T:
|
||||
"""Iterate func over unbroken lineages, allowing custom return conditions.
|
||||
|
||||
Allows flexible customization of return values, including multiple
|
||||
|
||||
@@ -2,11 +2,15 @@
|
||||
import datetime
|
||||
import logging
|
||||
import platform
|
||||
from typing import cast
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import IO
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
@@ -20,8 +24,10 @@ from acme import crypto_util as acme_crypto_util
|
||||
from acme import errors as acme_errors
|
||||
from acme import messages
|
||||
import certbot
|
||||
from certbot import configuration
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
from certbot._internal import account
|
||||
from certbot._internal import auth_handler
|
||||
@@ -30,15 +36,20 @@ from certbot._internal import constants
|
||||
from certbot._internal import eff
|
||||
from certbot._internal import error_handler
|
||||
from certbot._internal import storage
|
||||
from certbot._internal.plugins import disco as plugin_disco
|
||||
from certbot._internal.plugins import selection as plugin_selection
|
||||
from certbot.compat import os
|
||||
from certbot.display import ops as display_ops
|
||||
from certbot.display import util as display_util
|
||||
from certbot.interfaces import AccountStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def acme_from_config_key(config, key, regr=None):
|
||||
"Wrangle ACME client construction"
|
||||
|
||||
def acme_from_config_key(config: configuration.NamespaceConfig, key: jose.JWK,
|
||||
regr: Optional[messages.RegistrationResource] = None
|
||||
) -> acme_client.ClientV2:
|
||||
"""Wrangle ACME client construction"""
|
||||
# TODO: Allow for other alg types besides RS256
|
||||
net = acme_client.ClientNetwork(key, account=regr, verify_ssl=(not config.no_verify_ssl),
|
||||
user_agent=determine_user_agent(config))
|
||||
@@ -52,10 +63,10 @@ def acme_from_config_key(config, key, regr=None):
|
||||
"Certbot is configured to use an ACMEv1 server (%s). ACMEv1 support is deprecated"
|
||||
" and will soon be removed. See https://community.letsencrypt.org/t/143839 for "
|
||||
"more information.", config.server)
|
||||
return client
|
||||
return cast(acme_client.ClientV2, client)
|
||||
|
||||
|
||||
def determine_user_agent(config):
|
||||
def determine_user_agent(config: configuration.NamespaceConfig) -> str:
|
||||
"""
|
||||
Set a user_agent string in the config based on the choice of plugins.
|
||||
(this wasn't knowable at construction time)
|
||||
@@ -86,8 +97,9 @@ def determine_user_agent(config):
|
||||
ua = config.user_agent
|
||||
return ua
|
||||
|
||||
def ua_flags(config):
|
||||
"Turn some very important CLI flags into clues in the user agent."
|
||||
|
||||
def ua_flags(config: configuration.NamespaceConfig) -> str:
|
||||
"""Turn some very important CLI flags into clues in the user agent."""
|
||||
if isinstance(config, DummyConfig):
|
||||
return "FLAGS"
|
||||
flags = []
|
||||
@@ -105,25 +117,30 @@ def ua_flags(config):
|
||||
flags.append("hook")
|
||||
return " ".join(flags)
|
||||
|
||||
|
||||
class DummyConfig:
|
||||
"Shim for computing a sample user agent."
|
||||
def __init__(self):
|
||||
"""Shim for computing a sample user agent."""
|
||||
def __init__(self) -> None:
|
||||
self.authenticator = "XXX"
|
||||
self.installer = "YYY"
|
||||
self.user_agent = None
|
||||
self.verb = "SUBCOMMAND"
|
||||
|
||||
def __getattr__(self, name):
|
||||
"Any config properties we might have are None."
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
"""Any config properties we might have are None."""
|
||||
return None
|
||||
|
||||
def sample_user_agent():
|
||||
"Document what this Certbot's user agent string will be like."
|
||||
|
||||
return determine_user_agent(DummyConfig())
|
||||
def sample_user_agent() -> str:
|
||||
"""Document what this Certbot's user agent string will be like."""
|
||||
# DummyConfig is designed to mock certbot.configuration.NamespaceConfig.
|
||||
# Let mypy accept that.
|
||||
return determine_user_agent(cast(configuration.NamespaceConfig, DummyConfig()))
|
||||
|
||||
|
||||
def register(config, account_storage, tos_cb=None):
|
||||
def register(config: configuration.NamespaceConfig, account_storage: AccountStorage,
|
||||
tos_cb: Optional[Callable[[str], None]] = None
|
||||
) -> Tuple[account.Account, acme_client.ClientV2]:
|
||||
"""Register new account with an ACME CA.
|
||||
|
||||
This function takes care of generating fresh private key,
|
||||
@@ -143,13 +160,11 @@ def register(config, account_storage, tos_cb=None):
|
||||
Service before registering account, client action is
|
||||
necessary. For example, a CLI tool would prompt the user
|
||||
acceptance. `tos_cb` must be a callable that should accept
|
||||
`.RegistrationResource` and return a `bool`: ``True`` iff the
|
||||
Terms of Service present in the contained
|
||||
`.Registration.terms_of_service` is accepted by the client, and
|
||||
``False`` otherwise. ``tos_cb`` will be called only if the
|
||||
client action is necessary, i.e. when ``terms_of_service is not
|
||||
None``. This argument is optional, if not supplied it will
|
||||
default to automatic acceptance!
|
||||
a Term of Service URL as a string, and raise an exception
|
||||
if the TOS is not accepted by the client. ``tos_cb`` will be
|
||||
called only if the client action is necessary, i.e. when
|
||||
``terms_of_service is not None``. This argument is optional,
|
||||
if not supplied it will default to automatic acceptance!
|
||||
|
||||
:raises certbot.errors.Error: In case of any client problems, in
|
||||
particular registration failure, or unaccepted Terms of Service.
|
||||
@@ -194,12 +209,13 @@ def register(config, account_storage, tos_cb=None):
|
||||
return acc, acme
|
||||
|
||||
|
||||
def perform_registration(acme, config, tos_cb):
|
||||
def perform_registration(acme: acme_client.ClientV2, config: configuration.NamespaceConfig,
|
||||
tos_cb: Optional[Callable[[str], None]]) -> messages.RegistrationResource:
|
||||
"""
|
||||
Actually register new account, trying repeatedly if there are email
|
||||
problems
|
||||
|
||||
:param acme.client.Client client: ACME client object.
|
||||
:param acme.client.Client acme: ACME client object.
|
||||
:param certbot.configuration.NamespaceConfig config: Client configuration.
|
||||
:param Callable tos_cb: a callback to handle Term of Service agreement.
|
||||
|
||||
@@ -210,11 +226,11 @@ def perform_registration(acme, config, tos_cb):
|
||||
eab_credentials_supplied = config.eab_kid and config.eab_hmac_key
|
||||
eab: Optional[Dict[str, Any]]
|
||||
if eab_credentials_supplied:
|
||||
account_public_key = acme.client.net.key.public_key()
|
||||
account_public_key = acme.net.key.public_key()
|
||||
eab = messages.ExternalAccountBinding.from_data(account_public_key=account_public_key,
|
||||
kid=config.eab_kid,
|
||||
hmac_key=config.eab_hmac_key,
|
||||
directory=acme.client.directory)
|
||||
directory=acme.directory)
|
||||
else:
|
||||
eab = None
|
||||
|
||||
@@ -229,7 +245,15 @@ def perform_registration(acme, config, tos_cb):
|
||||
newreg = messages.NewRegistration.from_data(
|
||||
email=config.email,
|
||||
external_account_binding=cast(Optional[messages.ExternalAccountBinding], eab))
|
||||
return acme.new_account_and_tos(newreg, tos_cb)
|
||||
# Until ACME v1 support is removed from Certbot, we actually need the provided
|
||||
# ACME client to be a wrapper of type BackwardsCompatibleClientV2.
|
||||
# TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2
|
||||
try:
|
||||
return cast(acme_client.BackwardsCompatibleClientV2,
|
||||
acme).new_account_and_tos(newreg, tos_cb)
|
||||
except AttributeError:
|
||||
raise errors.Error("The ACME client must be an instance of "
|
||||
"acme.client.BackwardsCompatibleClientV2")
|
||||
except messages.Error as e:
|
||||
if e.code in ('invalidEmail', 'invalidContact'):
|
||||
if config.noninteractive_mode:
|
||||
@@ -258,7 +282,10 @@ class Client:
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, config, account_, auth, installer, acme=None):
|
||||
def __init__(self, config: configuration.NamespaceConfig, account_: Optional[account.Account],
|
||||
auth: Optional[interfaces.Authenticator],
|
||||
installer: Optional[interfaces.Installer],
|
||||
acme: Optional[acme_client.ClientV2] = None) -> None:
|
||||
"""Initialize a client."""
|
||||
self.config = config
|
||||
self.account = account_
|
||||
@@ -277,7 +304,9 @@ class Client:
|
||||
else:
|
||||
self.auth_handler = None
|
||||
|
||||
def obtain_certificate_from_csr(self, csr, orderr=None):
|
||||
def obtain_certificate_from_csr(self, csr: util.CSR,
|
||||
orderr: Optional[messages.OrderResource] = None
|
||||
) -> Tuple[bytes, bytes]:
|
||||
"""Obtain certificate.
|
||||
|
||||
:param .util.CSR csr: PEM-encoded Certificate Signing
|
||||
@@ -294,8 +323,10 @@ class Client:
|
||||
"not set.")
|
||||
logger.error(msg)
|
||||
raise errors.Error(msg)
|
||||
if self.account.regr is None:
|
||||
if self.account is None or self.account.regr is None:
|
||||
raise errors.Error("Please register with the ACME server first.")
|
||||
if self.acme is None:
|
||||
raise errors.Error("ACME client is not set.")
|
||||
|
||||
logger.debug("CSR: %s", csr)
|
||||
|
||||
@@ -303,19 +334,18 @@ class Client:
|
||||
orderr = self._get_order_and_authorizations(csr.data, best_effort=False)
|
||||
|
||||
deadline = datetime.datetime.now() + datetime.timedelta(seconds=90)
|
||||
get_alt_chains = self.config.preferred_chain is not None
|
||||
orderr = self.acme.finalize_order(orderr, deadline,
|
||||
fetch_alternative_chains=get_alt_chains)
|
||||
orderr = self.acme.finalize_order(
|
||||
orderr, deadline, fetch_alternative_chains=self.config.preferred_chain is not None)
|
||||
fullchain = orderr.fullchain_pem
|
||||
if get_alt_chains and orderr.alternative_fullchains_pem:
|
||||
fullchain = crypto_util.find_chain_with_issuer([fullchain] + \
|
||||
orderr.alternative_fullchains_pem,
|
||||
self.config.preferred_chain,
|
||||
not self.config.dry_run)
|
||||
if self.config.preferred_chain and orderr.alternative_fullchains_pem:
|
||||
fullchain = crypto_util.find_chain_with_issuer(
|
||||
[fullchain] + orderr.alternative_fullchains_pem,
|
||||
self.config.preferred_chain, not self.config.dry_run)
|
||||
cert, chain = crypto_util.cert_and_chain_from_fullchain(fullchain)
|
||||
return cert.encode(), chain.encode()
|
||||
|
||||
def obtain_certificate(self, domains, old_keypath=None):
|
||||
def obtain_certificate(self, domains: List[str], old_keypath: Optional[str] = None
|
||||
) -> Tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
"""Obtains a certificate from the ACME server.
|
||||
|
||||
`.register` must be called before `.obtain_certificate`
|
||||
@@ -386,7 +416,8 @@ class Client:
|
||||
elliptic_curve=elliptic_curve,
|
||||
strict_permissions=self.config.strict_permissions,
|
||||
)
|
||||
csr = crypto_util.generate_csr(key, domains, self.config.csr_dir,
|
||||
# TODO: Remove the cast once certbot package is fully typed
|
||||
csr = crypto_util.generate_csr(key, cast(Set[str], domains), self.config.csr_dir,
|
||||
self.config.must_staple, self.config.strict_permissions)
|
||||
|
||||
orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names)
|
||||
@@ -408,11 +439,11 @@ class Client:
|
||||
cert, chain = self.obtain_certificate_from_csr(csr, orderr)
|
||||
return cert, chain, key, csr
|
||||
|
||||
def _get_order_and_authorizations(self, csr_pem: str,
|
||||
def _get_order_and_authorizations(self, csr_pem: bytes,
|
||||
best_effort: bool) -> messages.OrderResource:
|
||||
"""Request a new order and complete its authorizations.
|
||||
|
||||
:param str csr_pem: A CSR in PEM format.
|
||||
:param bytes csr_pem: A CSR in PEM format.
|
||||
:param bool best_effort: True if failing to complete all
|
||||
authorizations should not raise an exception
|
||||
|
||||
@@ -420,6 +451,8 @@ class Client:
|
||||
:rtype: acme.messages.OrderResource
|
||||
|
||||
"""
|
||||
if not self.acme:
|
||||
raise errors.Error("ACME client is not set.")
|
||||
try:
|
||||
orderr = self.acme.new_order(csr_pem)
|
||||
except acme_errors.WildcardUnsupportedError:
|
||||
@@ -442,7 +475,8 @@ class Client:
|
||||
authzr = self.auth_handler.handle_authorizations(orderr, self.config, best_effort)
|
||||
return orderr.update(authorizations=authzr)
|
||||
|
||||
def obtain_and_enroll_certificate(self, domains, certname):
|
||||
def obtain_and_enroll_certificate(self, domains: List[str], certname: Optional[str]
|
||||
) -> Optional[storage.RenewableCert]:
|
||||
"""Obtain and enroll certificate.
|
||||
|
||||
Get a new certificate for the specified domains using the specified
|
||||
@@ -455,8 +489,7 @@ class Client:
|
||||
:type certname: `str` or `None`
|
||||
|
||||
:returns: A new :class:`certbot._internal.storage.RenewableCert` instance
|
||||
referred to the enrolled cert lineage, False if the cert could not
|
||||
be obtained, or None if doing a successful dry run.
|
||||
referred to the enrolled cert lineage, or None if doing a successful dry run.
|
||||
|
||||
"""
|
||||
cert, chain, key, _ = self.obtain_certificate(domains)
|
||||
@@ -470,15 +503,14 @@ class Client:
|
||||
new_name = self._choose_lineagename(domains, certname)
|
||||
|
||||
if self.config.dry_run:
|
||||
logger.debug("Dry run: Skipping creating new lineage for %s",
|
||||
new_name)
|
||||
logger.debug("Dry run: Skipping creating new lineage for %s", new_name)
|
||||
return None
|
||||
return storage.RenewableCert.new_lineage(
|
||||
new_name, cert,
|
||||
key.pem, chain,
|
||||
self.config)
|
||||
|
||||
def _choose_lineagename(self, domains, certname):
|
||||
def _choose_lineagename(self, domains: List[str], certname: Optional[str]) -> str:
|
||||
"""Chooses a name for the new lineage.
|
||||
|
||||
:param domains: domains in certificate request
|
||||
@@ -497,12 +529,13 @@ class Client:
|
||||
return domains[0][2:]
|
||||
return domains[0]
|
||||
|
||||
def save_certificate(self, cert_pem, chain_pem,
|
||||
cert_path, chain_path, fullchain_path):
|
||||
def save_certificate(self, cert_pem: bytes, chain_pem: bytes,
|
||||
cert_path: str, chain_path: str, fullchain_path: str
|
||||
) -> Tuple[str, str, str]:
|
||||
"""Saves the certificate received from the ACME server.
|
||||
|
||||
:param str cert_pem:
|
||||
:param str chain_pem:
|
||||
:param bytes cert_pem:
|
||||
:param bytes chain_pem:
|
||||
:param str cert_path: Candidate path to a certificate.
|
||||
:param str chain_path: Candidate path to a certificate chain.
|
||||
:param str fullchain_path: Candidate path to a full cert chain.
|
||||
@@ -517,7 +550,6 @@ class Client:
|
||||
for path in cert_path, chain_path, fullchain_path:
|
||||
util.make_or_verify_dir(os.path.dirname(path), 0o755, self.config.strict_permissions)
|
||||
|
||||
|
||||
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)
|
||||
|
||||
try:
|
||||
@@ -525,17 +557,16 @@ class Client:
|
||||
finally:
|
||||
cert_file.close()
|
||||
|
||||
chain_file, abs_chain_path =\
|
||||
_open_pem_file('chain_path', chain_path)
|
||||
fullchain_file, abs_fullchain_path =\
|
||||
_open_pem_file('fullchain_path', fullchain_path)
|
||||
chain_file, abs_chain_path = _open_pem_file('chain_path', chain_path)
|
||||
fullchain_file, abs_fullchain_path = _open_pem_file('fullchain_path', fullchain_path)
|
||||
|
||||
_save_chain(chain_pem, chain_file)
|
||||
_save_chain(cert_pem + chain_pem, fullchain_file)
|
||||
|
||||
return abs_cert_path, abs_chain_path, abs_fullchain_path
|
||||
|
||||
def deploy_certificate(self, domains, privkey_path, cert_path, chain_path, fullchain_path):
|
||||
def deploy_certificate(self, domains: List[str], privkey_path: str, cert_path: str,
|
||||
chain_path: str, fullchain_path: str) -> None:
|
||||
"""Install certificate
|
||||
|
||||
:param list domains: list of domains to install the certificate
|
||||
@@ -572,7 +603,8 @@ class Client:
|
||||
# sites may have been enabled / final cleanup
|
||||
self.installer.restart()
|
||||
|
||||
def enhance_config(self, domains, chain_path, redirect_default=True):
|
||||
def enhance_config(self, domains: List[str], chain_path: str,
|
||||
redirect_default: bool = True) -> None:
|
||||
"""Enhance the configuration.
|
||||
|
||||
:param list domains: list of domains to configure
|
||||
@@ -630,11 +662,14 @@ class Client:
|
||||
|
||||
|
||||
"""
|
||||
if not self.installer:
|
||||
raise errors.Error("No installer plugin has been set.")
|
||||
enh_label = options if enhancement == "ensure-http-header" else enhancement
|
||||
with error_handler.ErrorHandler(self._recovery_routine_with_msg, None):
|
||||
for dom in domains:
|
||||
try:
|
||||
self.installer.enhance(dom, enhancement, options)
|
||||
# TODO: Remove the cast once certbot package is fully typed
|
||||
self.installer.enhance(dom, enhancement, cast(Optional[List[str]], options))
|
||||
except errors.PluginEnhancementAlreadyPresent:
|
||||
logger.info("Enhancement %s was already set.", enh_label)
|
||||
except errors.PluginError:
|
||||
@@ -649,32 +684,34 @@ class Client:
|
||||
:param str success_msg: message to show on successful recovery
|
||||
|
||||
"""
|
||||
self.installer.recovery_routine()
|
||||
if success_msg:
|
||||
display_util.notify(success_msg)
|
||||
if self.installer:
|
||||
self.installer.recovery_routine()
|
||||
if success_msg:
|
||||
display_util.notify(success_msg)
|
||||
|
||||
def _rollback_and_restart(self, success_msg):
|
||||
def _rollback_and_restart(self, success_msg: str) -> None:
|
||||
"""Rollback the most recent checkpoint and restart the webserver
|
||||
|
||||
:param str success_msg: message to show on successful rollback
|
||||
|
||||
"""
|
||||
logger.info("Rolling back to previous server configuration...")
|
||||
try:
|
||||
self.installer.rollback_checkpoints()
|
||||
self.installer.restart()
|
||||
except:
|
||||
logger.error(
|
||||
"An error occurred and we failed to restore your config and "
|
||||
"restart your server. Please post to "
|
||||
"https://community.letsencrypt.org/c/help "
|
||||
"with details about your configuration and this error you received."
|
||||
)
|
||||
raise
|
||||
display_util.notify(success_msg)
|
||||
if self.installer:
|
||||
logger.info("Rolling back to previous server configuration...")
|
||||
try:
|
||||
self.installer.rollback_checkpoints()
|
||||
self.installer.restart()
|
||||
except:
|
||||
logger.error(
|
||||
"An error occurred and we failed to restore your config and "
|
||||
"restart your server. Please post to "
|
||||
"https://community.letsencrypt.org/c/help "
|
||||
"with details about your configuration and this error you received."
|
||||
)
|
||||
raise
|
||||
display_util.notify(success_msg)
|
||||
|
||||
|
||||
def validate_key_csr(privkey, csr=None):
|
||||
def validate_key_csr(privkey: util.Key, csr: Optional[util.CSR] = None) -> None:
|
||||
"""Validate Key and CSR files.
|
||||
|
||||
Verifies that the client key and csr arguments are valid and correspond to
|
||||
@@ -720,13 +757,16 @@ def validate_key_csr(privkey, csr=None):
|
||||
raise errors.Error("The key and CSR do not match")
|
||||
|
||||
|
||||
def rollback(default_installer, checkpoints, config, plugins):
|
||||
def rollback(default_installer: str, checkpoints: int,
|
||||
config: configuration.NamespaceConfig, plugins: plugin_disco.PluginsRegistry) -> None:
|
||||
"""Revert configuration the specified number of checkpoints.
|
||||
|
||||
:param str default_installer: Default installer name to use for the rollback
|
||||
:param int checkpoints: Number of checkpoints to revert.
|
||||
|
||||
:param config: Configuration.
|
||||
:type config: :class:`certbot.configuration.NamespaceConfiguration`
|
||||
:param plugins: Plugins available
|
||||
:type plugins: :class:`certbot._internal.plugins.disco.PluginsRegistry`
|
||||
|
||||
"""
|
||||
# Misconfigurations are only a slight problems... allow the user to rollback
|
||||
@@ -741,7 +781,8 @@ def rollback(default_installer, checkpoints, config, plugins):
|
||||
installer.rollback_checkpoints(checkpoints)
|
||||
installer.restart()
|
||||
|
||||
def _open_pem_file(cli_arg_path, pem_path):
|
||||
|
||||
def _open_pem_file(cli_arg_path: str, pem_path: str) -> Tuple[IO, str]:
|
||||
"""Open a pem file.
|
||||
|
||||
If cli_arg_path was set by the client, open that.
|
||||
@@ -759,10 +800,11 @@ def _open_pem_file(cli_arg_path, pem_path):
|
||||
uniq = util.unique_file(pem_path, 0o644, "wb")
|
||||
return uniq[0], os.path.abspath(uniq[1])
|
||||
|
||||
def _save_chain(chain_pem, chain_file):
|
||||
|
||||
def _save_chain(chain_pem: bytes, chain_file: IO) -> None:
|
||||
"""Saves chain_pem at a unique path based on chain_path.
|
||||
|
||||
:param str chain_pem: certificate chain in PEM format
|
||||
:param bytes chain_pem: certificate chain in PEM format
|
||||
:param str chain_file: chain file object
|
||||
|
||||
"""
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Certbot constants."""
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
|
||||
import pkg_resources
|
||||
|
||||
@@ -13,7 +15,7 @@ SETUPTOOLS_PLUGINS_ENTRY_POINT = "certbot.plugins"
|
||||
OLD_SETUPTOOLS_PLUGINS_ENTRY_POINT = "letsencrypt.plugins"
|
||||
"""Plugins Setuptools entry point before rename."""
|
||||
|
||||
CLI_DEFAULTS = dict(
|
||||
CLI_DEFAULTS: Dict[str, Any] = dict(
|
||||
config_files=[
|
||||
os.path.join(misc.get_default_folder('config'), 'cli.ini'),
|
||||
# https://freedesktop.org/wiki/Software/xdg-user-dirs/
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Subscribes users to the EFF newsletter."""
|
||||
import logging
|
||||
from typing import cast
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
@@ -32,16 +33,18 @@ def prepare_subscription(config: configuration.NamespaceConfig, acc: Account) ->
|
||||
if config.email is None:
|
||||
_report_failure("you didn't provide an e-mail address")
|
||||
else:
|
||||
acc.meta = acc.meta.update(register_to_eff=config.email)
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
acc.meta = cast(Account.Meta, acc.meta.update(register_to_eff=config.email))
|
||||
elif config.email and _want_subscription():
|
||||
acc.meta = acc.meta.update(register_to_eff=config.email)
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
acc.meta = cast(Account.Meta, acc.meta.update(register_to_eff=config.email))
|
||||
|
||||
if acc.meta.register_to_eff:
|
||||
storage = AccountFileStorage(config)
|
||||
storage.update_meta(acc)
|
||||
|
||||
|
||||
def handle_subscription(config: configuration.NamespaceConfig, acc: Account) -> None:
|
||||
def handle_subscription(config: configuration.NamespaceConfig, acc: Optional[Account]) -> None:
|
||||
"""High level function to take care of EFF newsletter subscriptions.
|
||||
|
||||
Once subscription is handled, it will not be handled again.
|
||||
@@ -50,12 +53,14 @@ def handle_subscription(config: configuration.NamespaceConfig, acc: Account) ->
|
||||
:param Account acc: Current client account.
|
||||
|
||||
"""
|
||||
if config.dry_run:
|
||||
if config.dry_run or not acc:
|
||||
return
|
||||
if acc.meta.register_to_eff:
|
||||
subscribe(acc.meta.register_to_eff)
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
subscribe(cast(str, acc.meta.register_to_eff))
|
||||
|
||||
acc.meta = acc.meta.update(register_to_eff=None)
|
||||
# TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
|
||||
acc.meta = cast(Account.Meta, acc.meta.update(register_to_eff=None))
|
||||
storage = AccountFileStorage(config)
|
||||
storage.update_meta(acc)
|
||||
|
||||
|
||||
@@ -3,10 +3,13 @@ import functools
|
||||
import logging
|
||||
import signal
|
||||
import traceback
|
||||
from types import TracebackType
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Type
|
||||
from typing import Union
|
||||
|
||||
from certbot import errors
|
||||
@@ -74,7 +77,7 @@ class ErrorHandler:
|
||||
deferred until they finish.
|
||||
|
||||
"""
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
|
||||
self.call_on_regular_exit = False
|
||||
self.body_executed = False
|
||||
self.funcs: List[Callable[[], Any]] = []
|
||||
@@ -83,11 +86,13 @@ class ErrorHandler:
|
||||
if func is not None:
|
||||
self.register(func, *args, **kwargs)
|
||||
|
||||
def __enter__(self):
|
||||
def __enter__(self) -> None:
|
||||
self.body_executed = False
|
||||
self._set_signal_handlers()
|
||||
|
||||
def __exit__(self, exec_type, exec_value, trace):
|
||||
def __exit__(self, exec_type: Optional[Type[BaseException]],
|
||||
exec_value: Optional[BaseException],
|
||||
trace: Optional[TracebackType]) -> bool:
|
||||
self.body_executed = True
|
||||
retval = False
|
||||
# SystemExit is ignored to properly handle forks that don't exec
|
||||
@@ -108,7 +113,7 @@ class ErrorHandler:
|
||||
self._call_signals()
|
||||
return retval
|
||||
|
||||
def register(self, func: Callable, *args: Any, **kwargs: Any) -> None:
|
||||
def register(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
|
||||
"""Sets func to be run with the given arguments during cleanup.
|
||||
|
||||
:param function func: function to be called in case of an error
|
||||
@@ -116,7 +121,7 @@ class ErrorHandler:
|
||||
"""
|
||||
self.funcs.append(functools.partial(func, *args, **kwargs))
|
||||
|
||||
def _call_registered(self):
|
||||
def _call_registered(self) -> None:
|
||||
"""Calls all registered functions"""
|
||||
logger.debug("Calling registered functions")
|
||||
while self.funcs:
|
||||
@@ -128,7 +133,7 @@ class ErrorHandler:
|
||||
''.join(output).rstrip())
|
||||
self.funcs.pop()
|
||||
|
||||
def _set_signal_handlers(self):
|
||||
def _set_signal_handlers(self) -> None:
|
||||
"""Sets signal handlers for signals in _SIGNALS."""
|
||||
for signum in _SIGNALS:
|
||||
prev_handler = signal.getsignal(signum)
|
||||
@@ -137,13 +142,13 @@ class ErrorHandler:
|
||||
self.prev_handlers[signum] = prev_handler
|
||||
signal.signal(signum, self._signal_handler)
|
||||
|
||||
def _reset_signal_handlers(self):
|
||||
def _reset_signal_handlers(self) -> None:
|
||||
"""Resets signal handlers for signals in _SIGNALS."""
|
||||
for signum, handler in self.prev_handlers.items():
|
||||
signal.signal(signum, handler)
|
||||
self.prev_handlers.clear()
|
||||
|
||||
def _signal_handler(self, signum, unused_frame):
|
||||
def _signal_handler(self, signum: int, unused_frame: Any) -> None:
|
||||
"""Replacement function for handling received signals.
|
||||
|
||||
Store the received signal. If we are executing the code block in
|
||||
@@ -156,7 +161,7 @@ class ErrorHandler:
|
||||
if not self.body_executed:
|
||||
raise errors.SignalExit
|
||||
|
||||
def _call_signals(self):
|
||||
def _call_signals(self) -> None:
|
||||
"""Finally call the deferred signals."""
|
||||
for signum in self.received_signals:
|
||||
logger.debug("Calling signal %s", signum)
|
||||
@@ -169,6 +174,6 @@ class ExitHandler(ErrorHandler):
|
||||
In addition to cleaning up on all signals, also cleans up on
|
||||
regular exit.
|
||||
"""
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
|
||||
ErrorHandler.__init__(self, func, *args, **kwargs)
|
||||
self.call_on_regular_exit = True
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
|
||||
import logging
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot.compat import filesystem
|
||||
@@ -15,7 +17,7 @@ from certbot.plugins import util as plug_util
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def validate_hooks(config):
|
||||
def validate_hooks(config: configuration.NamespaceConfig) -> None:
|
||||
"""Check hook commands are executable."""
|
||||
validate_hook(config.pre_hook, "pre")
|
||||
validate_hook(config.post_hook, "post")
|
||||
@@ -23,7 +25,7 @@ def validate_hooks(config):
|
||||
validate_hook(config.renew_hook, "renew")
|
||||
|
||||
|
||||
def _prog(shell_cmd):
|
||||
def _prog(shell_cmd: str) -> Optional[str]:
|
||||
"""Extract the program run by a shell command.
|
||||
|
||||
:param str shell_cmd: command to be executed
|
||||
@@ -36,10 +38,11 @@ def _prog(shell_cmd):
|
||||
plug_util.path_surgery(shell_cmd)
|
||||
if not util.exe_exists(shell_cmd):
|
||||
return None
|
||||
|
||||
return os.path.basename(shell_cmd)
|
||||
|
||||
|
||||
def validate_hook(shell_cmd, hook_name):
|
||||
def validate_hook(shell_cmd: str, hook_name: str) -> None:
|
||||
"""Check that a command provided as a hook is plausibly executable.
|
||||
|
||||
:raises .errors.HookCommandNotFound: if the command is not found
|
||||
@@ -57,7 +60,7 @@ def validate_hook(shell_cmd, hook_name):
|
||||
raise errors.HookCommandNotFound(msg)
|
||||
|
||||
|
||||
def pre_hook(config):
|
||||
def pre_hook(config: configuration.NamespaceConfig) -> None:
|
||||
"""Run pre-hooks if they exist and haven't already been run.
|
||||
|
||||
When Certbot is running with the renew subcommand, this function
|
||||
@@ -81,7 +84,7 @@ def pre_hook(config):
|
||||
executed_pre_hooks: Set[str] = set()
|
||||
|
||||
|
||||
def _run_pre_hook_if_necessary(command):
|
||||
def _run_pre_hook_if_necessary(command: str) -> None:
|
||||
"""Run the specified pre-hook if we haven't already.
|
||||
|
||||
If we've already run this exact command before, a message is logged
|
||||
@@ -97,7 +100,7 @@ def _run_pre_hook_if_necessary(command):
|
||||
executed_pre_hooks.add(command)
|
||||
|
||||
|
||||
def post_hook(config):
|
||||
def post_hook(config: configuration.NamespaceConfig) -> None:
|
||||
"""Run post-hooks if defined.
|
||||
|
||||
This function also registers any executables found in
|
||||
@@ -131,7 +134,7 @@ def post_hook(config):
|
||||
post_hooks: List[str] = []
|
||||
|
||||
|
||||
def _run_eventually(command):
|
||||
def _run_eventually(command: str) -> None:
|
||||
"""Registers a post-hook to be run eventually.
|
||||
|
||||
All commands given to this function will be run exactly once in the
|
||||
@@ -144,13 +147,14 @@ def _run_eventually(command):
|
||||
post_hooks.append(command)
|
||||
|
||||
|
||||
def run_saved_post_hooks():
|
||||
def run_saved_post_hooks() -> None:
|
||||
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
|
||||
for cmd in post_hooks:
|
||||
_run_hook("post-hook", cmd)
|
||||
|
||||
|
||||
def deploy_hook(config, domains, lineage_path):
|
||||
def deploy_hook(config: configuration.NamespaceConfig, domains: List[str],
|
||||
lineage_path: str) -> None:
|
||||
"""Run post-issuance hook if defined.
|
||||
|
||||
:param configuration.NamespaceConfig config: Certbot settings
|
||||
@@ -164,7 +168,8 @@ def deploy_hook(config, domains, lineage_path):
|
||||
lineage_path, config.dry_run)
|
||||
|
||||
|
||||
def renew_hook(config, domains, lineage_path):
|
||||
def renew_hook(config: configuration.NamespaceConfig, domains: List[str],
|
||||
lineage_path: str) -> None:
|
||||
"""Run post-renewal hooks.
|
||||
|
||||
This function runs any hooks found in
|
||||
@@ -196,7 +201,7 @@ def renew_hook(config, domains, lineage_path):
|
||||
lineage_path, config.dry_run)
|
||||
|
||||
|
||||
def _run_deploy_hook(command, domains, lineage_path, dry_run):
|
||||
def _run_deploy_hook(command: str, domains: List[str], lineage_path: str, dry_run: bool) -> None:
|
||||
"""Run the specified deploy-hook (if not doing a dry run).
|
||||
|
||||
If dry_run is True, command is not run and a message is logged
|
||||
@@ -220,7 +225,7 @@ def _run_deploy_hook(command, domains, lineage_path, dry_run):
|
||||
_run_hook("deploy-hook", command)
|
||||
|
||||
|
||||
def _run_hook(cmd_name, shell_cmd):
|
||||
def _run_hook(cmd_name: str, shell_cmd: str) -> str:
|
||||
"""Run a hook command.
|
||||
|
||||
:param str cmd_name: the user facing name of the hook being run
|
||||
@@ -234,7 +239,7 @@ def _run_hook(cmd_name, shell_cmd):
|
||||
return err
|
||||
|
||||
|
||||
def list_hooks(dir_path):
|
||||
def list_hooks(dir_path: str) -> List[str]:
|
||||
"""List paths to all hooks found in dir_path in sorted order.
|
||||
|
||||
:param str dir_path: directory to search
|
||||
|
||||
@@ -89,10 +89,10 @@ class _BaseLockMechanism:
|
||||
"""
|
||||
return self._fd is not None
|
||||
|
||||
def acquire(self): # pylint: disable=missing-function-docstring
|
||||
def acquire(self) -> None: # pylint: disable=missing-function-docstring
|
||||
pass # pragma: no cover
|
||||
|
||||
def release(self): # pylint: disable=missing-function-docstring
|
||||
def release(self) -> None: # pylint: disable=missing-function-docstring
|
||||
pass # pragma: no cover
|
||||
|
||||
|
||||
@@ -143,7 +143,8 @@ class _UnixLockMechanism(_BaseLockMechanism):
|
||||
# Normally os module should not be imported in certbot codebase except in certbot.compat
|
||||
# for the sake of compatibility over Windows and Linux.
|
||||
# We make an exception here, since _lock_success is private and called only on Linux.
|
||||
from os import stat, fstat # pylint: disable=os-module-forbidden
|
||||
from os import fstat # pylint: disable=os-module-forbidden
|
||||
from os import stat # pylint: disable=os-module-forbidden
|
||||
try:
|
||||
stat1 = stat(self._path)
|
||||
except OSError as err:
|
||||
@@ -196,7 +197,7 @@ class _WindowsLockMechanism(_BaseLockMechanism):
|
||||
Consequently, mscvrt.locking is sufficient to obtain an effective lock, and the race
|
||||
condition encountered on Linux is not possible on Windows, leading to a simpler workflow.
|
||||
"""
|
||||
def acquire(self):
|
||||
def acquire(self) -> None:
|
||||
"""Acquire the lock"""
|
||||
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
|
||||
|
||||
@@ -220,7 +221,7 @@ class _WindowsLockMechanism(_BaseLockMechanism):
|
||||
|
||||
self._fd = fd
|
||||
|
||||
def release(self):
|
||||
def release(self) -> None:
|
||||
"""Release the lock."""
|
||||
try:
|
||||
if not self._fd:
|
||||
|
||||
@@ -29,9 +29,14 @@ import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
from types import TracebackType
|
||||
from typing import Any
|
||||
from typing import IO
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
|
||||
from acme import messages
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal import constants
|
||||
@@ -45,7 +50,7 @@ FILE_FMT = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def pre_arg_parse_setup():
|
||||
def pre_arg_parse_setup() -> None:
|
||||
"""Setup logging before command line arguments are parsed.
|
||||
|
||||
Terminal logging is setup using
|
||||
@@ -85,7 +90,7 @@ def pre_arg_parse_setup():
|
||||
log_path=temp_handler.path)
|
||||
|
||||
|
||||
def post_arg_parse_setup(config):
|
||||
def post_arg_parse_setup(config: configuration.NamespaceConfig) -> None:
|
||||
"""Setup logging after command line arguments are parsed.
|
||||
|
||||
This function assumes `pre_arg_parse_setup` was called earlier and
|
||||
@@ -137,7 +142,8 @@ def post_arg_parse_setup(config):
|
||||
debug=config.debug, quiet=config.quiet, log_path=file_path)
|
||||
|
||||
|
||||
def setup_log_file_handler(config, logfile, fmt):
|
||||
def setup_log_file_handler(config: configuration.NamespaceConfig, logfile: str,
|
||||
fmt: str) -> Tuple[logging.Handler, str]:
|
||||
"""Setup file debug logging.
|
||||
|
||||
:param certbot.configuration.NamespaceConfig config: Configuration object
|
||||
@@ -179,13 +185,13 @@ class ColoredStreamHandler(logging.StreamHandler):
|
||||
:ivar bool red_level: The level at which to output
|
||||
|
||||
"""
|
||||
def __init__(self, stream=None):
|
||||
def __init__(self, stream: Optional[IO] = None) -> None:
|
||||
super().__init__(stream)
|
||||
self.colored = (sys.stderr.isatty() if stream is None else
|
||||
stream.isatty())
|
||||
self.red_level = logging.WARNING
|
||||
|
||||
def format(self, record):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
"""Formats the string representation of record.
|
||||
|
||||
:param logging.LogRecord record: Record to be formatted
|
||||
@@ -207,11 +213,12 @@ class MemoryHandler(logging.handlers.MemoryHandler):
|
||||
only happens when flush(force=True) is called.
|
||||
|
||||
"""
|
||||
def __init__(self, target=None, capacity=10000):
|
||||
def __init__(self, target: Optional[logging.Handler] = None,
|
||||
capacity: int = 10000) -> None:
|
||||
# capacity doesn't matter because should_flush() is overridden
|
||||
super().__init__(capacity, target=target)
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
"""Close the memory handler, but don't set the target to None."""
|
||||
# This allows the logging module which may only have a weak
|
||||
# reference to the target handler to properly flush and close it.
|
||||
@@ -219,7 +226,7 @@ class MemoryHandler(logging.handlers.MemoryHandler):
|
||||
super().close()
|
||||
self.target = target
|
||||
|
||||
def flush(self, force=False): # pylint: disable=arguments-differ
|
||||
def flush(self, force: bool = False) -> None: # pylint: disable=arguments-differ
|
||||
"""Flush the buffer if force=True.
|
||||
|
||||
If force=False, this call is a noop.
|
||||
@@ -232,7 +239,7 @@ class MemoryHandler(logging.handlers.MemoryHandler):
|
||||
if force:
|
||||
super().flush()
|
||||
|
||||
def shouldFlush(self, record):
|
||||
def shouldFlush(self, record: logging.LogRecord) -> bool:
|
||||
"""Should the buffer be automatically flushed?
|
||||
|
||||
:param logging.LogRecord record: log record to be considered
|
||||
@@ -254,7 +261,7 @@ class TempHandler(logging.StreamHandler):
|
||||
:ivar str path: file system path to the temporary log file
|
||||
|
||||
"""
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._workdir = tempfile.mkdtemp()
|
||||
self.path = os.path.join(self._workdir, 'log')
|
||||
stream = util.safe_open(self.path, mode='w', chmod=0o600)
|
||||
@@ -264,7 +271,7 @@ class TempHandler(logging.StreamHandler):
|
||||
self.stream: IO[str]
|
||||
self._delete = True
|
||||
|
||||
def emit(self, record):
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
"""Log the specified logging record.
|
||||
|
||||
:param logging.LogRecord record: Record to be formatted
|
||||
@@ -273,7 +280,7 @@ class TempHandler(logging.StreamHandler):
|
||||
self._delete = False
|
||||
super().emit(record)
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
"""Close the handler and the temporary log file.
|
||||
|
||||
The temporary log file is deleted if it wasn't used.
|
||||
@@ -292,7 +299,8 @@ class TempHandler(logging.StreamHandler):
|
||||
self.release()
|
||||
|
||||
|
||||
def pre_arg_parse_except_hook(memory_handler, *args, **kwargs):
|
||||
def pre_arg_parse_except_hook(memory_handler: MemoryHandler,
|
||||
*args: Any, **kwargs: Any) -> None:
|
||||
"""A simple wrapper around post_arg_parse_except_hook.
|
||||
|
||||
The additional functionality provided by this wrapper is the memory
|
||||
@@ -319,8 +327,9 @@ def pre_arg_parse_except_hook(memory_handler, *args, **kwargs):
|
||||
memory_handler.flush(force=True)
|
||||
|
||||
|
||||
def post_arg_parse_except_hook(exc_type: type, exc_value: BaseException, trace: TracebackType,
|
||||
debug: bool, quiet: bool, log_path: str):
|
||||
def post_arg_parse_except_hook(exc_type: Type[BaseException], exc_value: BaseException,
|
||||
trace: TracebackType, debug: bool, quiet: bool,
|
||||
log_path: str) -> None:
|
||||
"""Logs fatal exceptions and reports them to the user.
|
||||
|
||||
If debug is True, the full exception and traceback is shown to the
|
||||
@@ -369,7 +378,7 @@ def post_arg_parse_except_hook(exc_type: type, exc_value: BaseException, trace:
|
||||
exit_func()
|
||||
|
||||
|
||||
def exit_with_advice(log_path: str):
|
||||
def exit_with_advice(log_path: str) -> None:
|
||||
"""Print a link to the community forums, the debug log path, and exit
|
||||
|
||||
The message is printed to stderr and the program will exit with a
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
import configobj
|
||||
@@ -18,6 +19,7 @@ import josepy as jose
|
||||
import zope.component
|
||||
import zope.interface
|
||||
|
||||
from acme import client as acme_client
|
||||
from acme import errors as acme_errors
|
||||
import certbot
|
||||
from certbot import configuration
|
||||
@@ -56,7 +58,7 @@ USER_CANCELLED = ("User chose to cancel the operation and may "
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _suggest_donation_if_appropriate(config):
|
||||
def _suggest_donation_if_appropriate(config: configuration.NamespaceConfig) -> None:
|
||||
"""Potentially suggest a donation to support Certbot.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -82,7 +84,10 @@ def _suggest_donation_if_appropriate(config):
|
||||
)
|
||||
|
||||
|
||||
def _get_and_save_cert(le_client, config, domains=None, certname=None, lineage=None):
|
||||
def _get_and_save_cert(le_client: client.Client, config: configuration.NamespaceConfig,
|
||||
domains: Optional[List[str]] = None, certname: Optional[str] = None,
|
||||
lineage: Optional[storage.RenewableCert] = None
|
||||
) -> Optional[storage.RenewableCert]:
|
||||
"""Authenticate and enroll certificate.
|
||||
|
||||
This method finds the relevant lineage, figures out what to do with it,
|
||||
@@ -115,14 +120,15 @@ def _get_and_save_cert(le_client, config, domains=None, certname=None, lineage=N
|
||||
display_util.notify(
|
||||
"{action} for {domains}".format(
|
||||
action="Simulating renewal of an existing certificate"
|
||||
if config.dry_run else "Renewing an existing certificate",
|
||||
if config.dry_run else "Renewing an existing certificate",
|
||||
domains=internal_display_util.summarize_domain_list(domains or lineage.names())
|
||||
)
|
||||
)
|
||||
renewal.renew_cert(config, domains, le_client, lineage)
|
||||
else:
|
||||
# TREAT AS NEW REQUEST
|
||||
assert domains is not None
|
||||
if domains is None:
|
||||
raise errors.Error("Domain list cannot be none if the lineage is not set.")
|
||||
display_util.notify(
|
||||
"{action} for {domains}".format(
|
||||
action="Simulating a certificate request" if config.dry_run else
|
||||
@@ -164,7 +170,7 @@ def _handle_unexpected_key_type_migration(config: configuration.NamespaceConfig,
|
||||
|
||||
|
||||
def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
||||
domains: List[str],
|
||||
domains: Iterable[str],
|
||||
cert: storage.RenewableCert
|
||||
) -> Tuple[str, Optional[storage.RenewableCert]]:
|
||||
"""Figure out what to do if a previous cert had a subset of the names now requested
|
||||
@@ -264,7 +270,8 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
|
||||
raise AssertionError('This is impossible')
|
||||
|
||||
|
||||
def _find_lineage_for_domains(config, domains):
|
||||
def _find_lineage_for_domains(config: configuration.NamespaceConfig, domains: List[str]
|
||||
) -> Tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
"""Determine whether there are duplicated names and how to handle
|
||||
them (renew, reinstall, newcert, or raising an error to stop
|
||||
the client run if the user chooses to cancel the operation when
|
||||
@@ -304,7 +311,8 @@ def _find_lineage_for_domains(config, domains):
|
||||
return None, None
|
||||
|
||||
|
||||
def _find_cert(config, domains, certname):
|
||||
def _find_cert(config: configuration.NamespaceConfig, domains: List[str], certname: str
|
||||
) -> Tuple[bool, Optional[storage.RenewableCert]]:
|
||||
"""Finds an existing certificate object given domains and/or a certificate name.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -328,10 +336,9 @@ def _find_cert(config, domains, certname):
|
||||
return (action != "reinstall"), lineage
|
||||
|
||||
|
||||
def _find_lineage_for_domains_and_certname(config: configuration.NamespaceConfig,
|
||||
domains: List[str],
|
||||
certname: str
|
||||
) -> Tuple[str, Optional[storage.RenewableCert]]:
|
||||
def _find_lineage_for_domains_and_certname(
|
||||
config: configuration.NamespaceConfig, domains: List[str],
|
||||
certname: str) -> Tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
"""Find appropriate lineage based on given domains and/or certname.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -357,7 +364,8 @@ def _find_lineage_for_domains_and_certname(config: configuration.NamespaceConfig
|
||||
lineage = cert_manager.lineage_for_certname(config, certname)
|
||||
if lineage:
|
||||
if domains:
|
||||
if set(cert_manager.domains_for_certname(config, certname)) != set(domains):
|
||||
computed_domains = cert_manager.domains_for_certname(config, certname)
|
||||
if computed_domains and set(computed_domains) != set(domains):
|
||||
_handle_unexpected_key_type_migration(config, lineage)
|
||||
_ask_user_to_confirm_new_names(config, domains, certname,
|
||||
lineage.names()) # raises if no
|
||||
@@ -367,10 +375,14 @@ def _find_lineage_for_domains_and_certname(config: configuration.NamespaceConfig
|
||||
elif domains:
|
||||
return "newcert", None
|
||||
raise errors.ConfigurationError("No certificate with name {0} found. "
|
||||
"Use -d to specify domains, or run certbot certificates to see "
|
||||
"possible certificate names.".format(certname))
|
||||
"Use -d to specify domains, or run certbot certificates to see "
|
||||
"possible certificate names.".format(certname))
|
||||
|
||||
def _get_added_removed(after, before):
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def _get_added_removed(after: Iterable[T], before: Iterable[T]) -> Tuple[List[T], List[T]]:
|
||||
"""Get lists of items removed from `before`
|
||||
and a lists of items added to `after`
|
||||
"""
|
||||
@@ -380,7 +392,8 @@ def _get_added_removed(after, before):
|
||||
removed.sort()
|
||||
return added, removed
|
||||
|
||||
def _format_list(character, strings):
|
||||
|
||||
def _format_list(character: str, strings: Iterable[str]) -> str:
|
||||
"""Format list with given character
|
||||
"""
|
||||
if not strings:
|
||||
@@ -392,7 +405,10 @@ def _format_list(character, strings):
|
||||
br=os.linesep
|
||||
)
|
||||
|
||||
def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
|
||||
|
||||
def _ask_user_to_confirm_new_names(config: configuration.NamespaceConfig,
|
||||
new_domains: Iterable[str], certname: str,
|
||||
old_domains: Iterable[str]) -> None:
|
||||
"""Ask user to confirm update cert certname to contain new_domains.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -429,7 +445,9 @@ def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
|
||||
raise errors.ConfigurationError("Specified mismatched certificate name and domains.")
|
||||
|
||||
|
||||
def _find_domains_or_certname(config, installer, question=None):
|
||||
def _find_domains_or_certname(config: configuration.NamespaceConfig,
|
||||
installer: Optional[interfaces.Installer],
|
||||
question: Optional[str] = None) -> Tuple[List[str], str]:
|
||||
"""Retrieve domains and certname from config or user input.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -596,7 +614,7 @@ def _is_interactive_only_auth(config: configuration.NamespaceConfig) -> bool:
|
||||
|
||||
|
||||
def _csr_report_new_cert(config: configuration.NamespaceConfig, cert_path: Optional[str],
|
||||
chain_path: Optional[str], fullchain_path: Optional[str]):
|
||||
chain_path: Optional[str], fullchain_path: Optional[str]) -> None:
|
||||
""" --csr variant of _report_new_cert.
|
||||
|
||||
Until --csr is overhauled (#8332) this is transitional function to report the creation
|
||||
@@ -633,7 +651,9 @@ def _csr_report_new_cert(config: configuration.NamespaceConfig, cert_path: Optio
|
||||
)
|
||||
|
||||
|
||||
def _determine_account(config):
|
||||
def _determine_account(config: configuration.NamespaceConfig
|
||||
) -> Tuple[account.Account,
|
||||
Optional[acme_client.ClientV2]]:
|
||||
"""Determine which account to use.
|
||||
|
||||
If ``config.account`` is ``None``, it will be updated based on the
|
||||
@@ -649,9 +669,9 @@ def _determine_account(config):
|
||||
:raises errors.Error: If unable to register an account with ACME server
|
||||
|
||||
"""
|
||||
def _tos_cb(terms_of_service):
|
||||
def _tos_cb(terms_of_service: str) -> None:
|
||||
if config.tos:
|
||||
return True
|
||||
return
|
||||
msg = ("Please read the Terms of Service at {0}. You "
|
||||
"must agree in order to register with the ACME "
|
||||
"server. Do you agree?".format(terms_of_service))
|
||||
@@ -660,17 +680,19 @@ def _determine_account(config):
|
||||
raise errors.Error(
|
||||
"Registration cannot proceed without accepting "
|
||||
"Terms of Service.")
|
||||
return None
|
||||
|
||||
account_storage = account.AccountFileStorage(config)
|
||||
acme = None
|
||||
acme: Optional[acme_client.ClientV2] = None
|
||||
|
||||
if config.account is not None:
|
||||
acc = account_storage.load(config.account)
|
||||
else:
|
||||
accounts = account_storage.find_all()
|
||||
if len(accounts) > 1:
|
||||
acc = display_ops.choose_account(accounts)
|
||||
potential_acc = display_ops.choose_account(accounts)
|
||||
if not potential_acc:
|
||||
raise errors.Error("No account has been chosen.")
|
||||
acc = potential_acc
|
||||
elif len(accounts) == 1:
|
||||
acc = accounts[0]
|
||||
else: # no account registered yet
|
||||
@@ -691,7 +713,7 @@ def _determine_account(config):
|
||||
return acc, acme
|
||||
|
||||
|
||||
def _delete_if_appropriate(config):
|
||||
def _delete_if_appropriate(config: configuration.NamespaceConfig) -> None:
|
||||
"""Does the user want to delete their now-revoked certs? If run in non-interactive mode,
|
||||
deleting happens automatically.
|
||||
|
||||
@@ -729,21 +751,23 @@ def _delete_if_appropriate(config):
|
||||
config, config.certname)
|
||||
try:
|
||||
cert_manager.match_and_check_overlaps(config, [lambda x: archive_dir],
|
||||
lambda x: x.archive_dir, lambda x: x)
|
||||
lambda x: x.archive_dir, lambda x: x.lineagename)
|
||||
except errors.OverlappingMatchFound:
|
||||
logger.warning("Not deleting revoked certificates due to overlapping archive dirs. "
|
||||
"More than one certificate is using %s", archive_dir)
|
||||
return
|
||||
except Exception as e:
|
||||
msg = ('config.default_archive_dir: {0}, config.live_dir: {1}, archive_dir: {2},'
|
||||
'original exception: {3}')
|
||||
'original exception: {3}')
|
||||
msg = msg.format(config.default_archive_dir, config.live_dir, archive_dir, e)
|
||||
raise errors.Error(msg)
|
||||
|
||||
cert_manager.delete(config)
|
||||
|
||||
|
||||
def _init_le_client(config, authenticator, installer):
|
||||
def _init_le_client(config: configuration.NamespaceConfig,
|
||||
authenticator: Optional[interfaces.Authenticator],
|
||||
installer: Optional[interfaces.Installer]) -> client.Client:
|
||||
"""Initialize Let's Encrypt Client
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -758,19 +782,19 @@ def _init_le_client(config, authenticator, installer):
|
||||
:rtype: client.Client
|
||||
|
||||
"""
|
||||
acc: Optional[account.Account]
|
||||
if authenticator is not None:
|
||||
# if authenticator was given, then we will need account...
|
||||
acc, acme = _determine_account(config)
|
||||
logger.debug("Picked account: %r", acc)
|
||||
# XXX
|
||||
#crypto_util.validate_key_csr(acc.key)
|
||||
else:
|
||||
acc, acme = None, None
|
||||
|
||||
return client.Client(config, acc, authenticator, installer, acme=acme)
|
||||
|
||||
|
||||
def unregister(config, unused_plugins):
|
||||
def unregister(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Deactivate account on server
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -779,8 +803,8 @@ def unregister(config, unused_plugins):
|
||||
:param unused_plugins: List of plugins (deprecated)
|
||||
:type unused_plugins: plugins_disco.PluginsRegistry
|
||||
|
||||
:returns: `None`
|
||||
:rtype: None
|
||||
:returns: `None` or a string indicating an error
|
||||
:rtype: None or str
|
||||
|
||||
"""
|
||||
account_storage = account.AccountFileStorage(config)
|
||||
@@ -799,6 +823,9 @@ def unregister(config, unused_plugins):
|
||||
acc, acme = _determine_account(config)
|
||||
cb_client = client.Client(config, acc, None, None, acme=acme)
|
||||
|
||||
if not cb_client.acme:
|
||||
raise errors.Error("ACME client is not set.")
|
||||
|
||||
# delete on boulder
|
||||
cb_client.acme.deactivate_registration(acc.regr)
|
||||
account_files = account.AccountFileStorage(config)
|
||||
@@ -809,7 +836,8 @@ def unregister(config, unused_plugins):
|
||||
return None
|
||||
|
||||
|
||||
def register(config, unused_plugins):
|
||||
def register(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Create accounts on the server.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -839,7 +867,8 @@ def register(config, unused_plugins):
|
||||
return None
|
||||
|
||||
|
||||
def update_account(config, unused_plugins):
|
||||
def update_account(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Modify accounts on the server.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -864,8 +893,11 @@ def update_account(config, unused_plugins):
|
||||
|
||||
acc, acme = _determine_account(config)
|
||||
cb_client = client.Client(config, acc, None, None, acme=acme)
|
||||
# Empty list of contacts in case the user is removing all emails
|
||||
|
||||
if not cb_client.acme:
|
||||
raise errors.Error("ACME client is not set.")
|
||||
|
||||
# Empty list of contacts in case the user is removing all emails
|
||||
acc_contacts: Iterable[str] = ()
|
||||
if config.email:
|
||||
acc_contacts = ['mailto:' + email for email in config.email.split(',')]
|
||||
@@ -904,7 +936,8 @@ def _cert_name_from_config_or_lineage(config: configuration.NamespaceConfig,
|
||||
return None
|
||||
|
||||
|
||||
def _install_cert(config, le_client, domains, lineage=None):
|
||||
def _install_cert(config: configuration.NamespaceConfig, le_client: client.Client,
|
||||
domains: List[str], lineage: Optional[storage.RenewableCert] = None) -> None:
|
||||
"""Install a cert
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -923,7 +956,8 @@ def _install_cert(config, le_client, domains, lineage=None):
|
||||
:rtype: None
|
||||
|
||||
"""
|
||||
path_provider = lineage if lineage else config
|
||||
path_provider: Union[storage.RenewableCert,
|
||||
configuration.NamespaceConfig] = lineage if lineage else config
|
||||
assert path_provider.cert_path is not None
|
||||
|
||||
le_client.deploy_certificate(domains, path_provider.key_path, path_provider.cert_path,
|
||||
@@ -931,7 +965,8 @@ def _install_cert(config, le_client, domains, lineage=None):
|
||||
le_client.enhance_config(domains, path_provider.chain_path)
|
||||
|
||||
|
||||
def install(config, plugins):
|
||||
def install(config: configuration.NamespaceConfig,
|
||||
plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Install a previously obtained cert in a server.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -940,8 +975,8 @@ def install(config, plugins):
|
||||
:param plugins: List of plugins
|
||||
:type plugins: plugins_disco.PluginsRegistry
|
||||
|
||||
:returns: `None`
|
||||
:rtype: None
|
||||
:returns: `None` or the error message
|
||||
:rtype: None or str
|
||||
|
||||
"""
|
||||
# XXX: Update for renewer/RenewableCert
|
||||
@@ -990,7 +1025,8 @@ def install(config, plugins):
|
||||
|
||||
return None
|
||||
|
||||
def _populate_from_certname(config):
|
||||
|
||||
def _populate_from_certname(config: configuration.NamespaceConfig) -> configuration.NamespaceConfig:
|
||||
"""Helper function for install to populate missing config values from lineage
|
||||
defined by --cert-name."""
|
||||
|
||||
@@ -1007,14 +1043,18 @@ def _populate_from_certname(config):
|
||||
config.namespace.fullchain_path = lineage.fullchain_path
|
||||
return config
|
||||
|
||||
def _check_certificate_and_key(config):
|
||||
|
||||
def _check_certificate_and_key(config: configuration.NamespaceConfig) -> None:
|
||||
if not os.path.isfile(filesystem.realpath(config.cert_path)):
|
||||
raise errors.ConfigurationError("Error while reading certificate from path "
|
||||
"{0}".format(config.cert_path))
|
||||
if not os.path.isfile(filesystem.realpath(config.key_path)):
|
||||
raise errors.ConfigurationError("Error while reading private key from path "
|
||||
"{0}".format(config.key_path))
|
||||
def plugins_cmd(config, plugins):
|
||||
|
||||
|
||||
def plugins_cmd(config: configuration.NamespaceConfig,
|
||||
plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""List server software plugins.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1052,7 +1092,8 @@ def plugins_cmd(config, plugins):
|
||||
notify(str(available))
|
||||
|
||||
|
||||
def enhance(config, plugins):
|
||||
def enhance(config: configuration.NamespaceConfig,
|
||||
plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Add security enhancements to existing configuration
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1061,8 +1102,8 @@ def enhance(config, plugins):
|
||||
:param plugins: List of plugins
|
||||
:type plugins: plugins_disco.PluginsRegistry
|
||||
|
||||
:returns: `None`
|
||||
:rtype: None
|
||||
:returns: `None` or a string indicating an error
|
||||
:rtype: None or str
|
||||
|
||||
"""
|
||||
supported_enhancements = ["hsts", "redirect", "uir", "staple"]
|
||||
@@ -1089,6 +1130,8 @@ def enhance(config, plugins):
|
||||
config, "enhance", allow_multiple=False,
|
||||
custom_prompt=certname_question)[0]
|
||||
cert_domains = cert_manager.domains_for_certname(config, config.certname)
|
||||
if cert_domains is None:
|
||||
raise errors.Error("Could not find the list of domains for the given certificate name.")
|
||||
if config.noninteractive_mode:
|
||||
domains = cert_domains
|
||||
else:
|
||||
@@ -1100,6 +1143,8 @@ def enhance(config, plugins):
|
||||
"defined, exiting.")
|
||||
|
||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||
if not lineage:
|
||||
raise errors.Error("Could not find the lineage for the given certificate name.")
|
||||
if not config.chain_path:
|
||||
config.chain_path = lineage.chain_path
|
||||
if oldstyle_enh:
|
||||
@@ -1111,7 +1156,7 @@ def enhance(config, plugins):
|
||||
return None
|
||||
|
||||
|
||||
def rollback(config, plugins):
|
||||
def rollback(config: configuration.NamespaceConfig, plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Rollback server configuration changes made during install.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1126,7 +1171,9 @@ def rollback(config, plugins):
|
||||
"""
|
||||
client.rollback(config.installer, config.checkpoints, config, plugins)
|
||||
|
||||
def update_symlinks(config, unused_plugins):
|
||||
|
||||
def update_symlinks(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Update the certificate file family symlinks
|
||||
|
||||
Use the information in the config file to make symlinks point to
|
||||
@@ -1144,7 +1191,9 @@ def update_symlinks(config, unused_plugins):
|
||||
"""
|
||||
cert_manager.update_live_symlinks(config)
|
||||
|
||||
def rename(config, unused_plugins):
|
||||
|
||||
def rename(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Rename a certificate
|
||||
|
||||
Use the information in the config file to rename an existing
|
||||
@@ -1162,7 +1211,9 @@ def rename(config, unused_plugins):
|
||||
"""
|
||||
cert_manager.rename_lineage(config)
|
||||
|
||||
def delete(config, unused_plugins):
|
||||
|
||||
def delete(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Delete a certificate
|
||||
|
||||
Use the information in the config file to delete an existing
|
||||
@@ -1181,7 +1232,8 @@ def delete(config, unused_plugins):
|
||||
cert_manager.delete(config)
|
||||
|
||||
|
||||
def certificates(config, unused_plugins):
|
||||
def certificates(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Display information about certs configured with Certbot
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1197,7 +1249,8 @@ def certificates(config, unused_plugins):
|
||||
cert_manager.certificates(config)
|
||||
|
||||
|
||||
def revoke(config, unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
def revoke(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Revoke a previously obtained certificate.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1251,7 +1304,8 @@ def revoke(config, unused_plugins: plugins_disco.PluginsRegistry) -> Optional[st
|
||||
return None
|
||||
|
||||
|
||||
def run(config, plugins):
|
||||
def run(config: configuration.NamespaceConfig,
|
||||
plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
|
||||
"""Obtain a certificate and install.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1361,7 +1415,8 @@ def _csr_get_and_save_cert(config: configuration.NamespaceConfig,
|
||||
return cert_path, chain_path, fullchain_path
|
||||
|
||||
|
||||
def renew_cert(config, plugins, lineage):
|
||||
def renew_cert(config: configuration.NamespaceConfig, plugins: plugins_disco.PluginsRegistry,
|
||||
lineage: storage.RenewableCert) -> None:
|
||||
"""Renew & save an existing cert. Do not install it.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1385,6 +1440,9 @@ def renew_cert(config, plugins, lineage):
|
||||
|
||||
renewed_lineage = _get_and_save_cert(le_client, config, lineage=lineage)
|
||||
|
||||
if not renewed_lineage:
|
||||
raise errors.Error("An existing certificate for the given name could not be found.")
|
||||
|
||||
if installer and not config.dry_run:
|
||||
# In case of a renewal, reload server to pick up new certificate.
|
||||
updater.run_renewal_deployer(config, renewed_lineage, installer)
|
||||
@@ -1392,7 +1450,7 @@ def renew_cert(config, plugins, lineage):
|
||||
installer.restart()
|
||||
|
||||
|
||||
def certonly(config, plugins):
|
||||
def certonly(config: configuration.NamespaceConfig, plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Authenticate & obtain cert, but do not install it.
|
||||
|
||||
This implements the 'certonly' subcommand.
|
||||
@@ -1412,7 +1470,6 @@ def certonly(config, plugins):
|
||||
# SETUP: Select plugins and construct a client instance
|
||||
# installers are used in auth mode to determine domain names
|
||||
installer, auth = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
|
||||
|
||||
le_client = _init_le_client(config, auth, installer)
|
||||
|
||||
if config.csr:
|
||||
@@ -1443,7 +1500,8 @@ def certonly(config, plugins):
|
||||
eff.handle_subscription(config, le_client.account)
|
||||
|
||||
|
||||
def renew(config, unused_plugins):
|
||||
def renew(config: configuration.NamespaceConfig,
|
||||
unused_plugins: plugins_disco.PluginsRegistry) -> None:
|
||||
"""Renew previously-obtained certificates.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1462,7 +1520,7 @@ def renew(config, unused_plugins):
|
||||
hooks.run_saved_post_hooks()
|
||||
|
||||
|
||||
def make_or_verify_needed_dirs(config):
|
||||
def make_or_verify_needed_dirs(config: configuration.NamespaceConfig) -> None:
|
||||
"""Create or verify existence of config, work, and hook directories.
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -1514,7 +1572,7 @@ def make_displayer(config: configuration.NamespaceConfig
|
||||
devnull.close()
|
||||
|
||||
|
||||
def main(cli_args=None):
|
||||
def main(cli_args: List[str] = None) -> Optional[Union[str, int]]:
|
||||
"""Run Certbot.
|
||||
|
||||
:param cli_args: command line to Certbot, defaults to ``sys.argv[1:]``
|
||||
|
||||
@@ -7,8 +7,13 @@ import random
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
@@ -50,7 +55,8 @@ CONFIG_ITEMS = set(itertools.chain(
|
||||
BOOL_CONFIG_ITEMS, INT_CONFIG_ITEMS, STR_CONFIG_ITEMS, ('pref_challs',)))
|
||||
|
||||
|
||||
def _reconstitute(config, full_path):
|
||||
def _reconstitute(config: configuration.NamespaceConfig,
|
||||
full_path: str) -> Optional[storage.RenewableCert]:
|
||||
"""Try to instantiate a RenewableCert, updating config with relevant items.
|
||||
|
||||
This is specifically for use in renewal and enforces several checks
|
||||
@@ -108,7 +114,8 @@ def _reconstitute(config, full_path):
|
||||
return renewal_candidate
|
||||
|
||||
|
||||
def _restore_webroot_config(config, renewalparams):
|
||||
def _restore_webroot_config(config: configuration.NamespaceConfig,
|
||||
renewalparams: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
webroot_map is, uniquely, a dict, and the general-purpose configuration
|
||||
restoring logic is not able to correctly parse it from the serialized
|
||||
@@ -125,7 +132,8 @@ def _restore_webroot_config(config, renewalparams):
|
||||
config.webroot_path = wp
|
||||
|
||||
|
||||
def _restore_plugin_configs(config, renewalparams):
|
||||
def _restore_plugin_configs(config: configuration.NamespaceConfig,
|
||||
renewalparams: Mapping[str, Any]) -> None:
|
||||
"""Sets plugin specific values in config from renewalparams
|
||||
|
||||
:param configuration.NamespaceConfig config: configuration for the
|
||||
@@ -168,7 +176,8 @@ def _restore_plugin_configs(config, renewalparams):
|
||||
setattr(config, config_item, cast(config_value))
|
||||
|
||||
|
||||
def restore_required_config_elements(config, renewalparams):
|
||||
def restore_required_config_elements(config: configuration.NamespaceConfig,
|
||||
renewalparams: Mapping[str, Any]) -> None:
|
||||
"""Sets non-plugin specific values in config from renewalparams
|
||||
|
||||
:param configuration.NamespaceConfig config: configuration for the
|
||||
@@ -189,7 +198,7 @@ def restore_required_config_elements(config, renewalparams):
|
||||
setattr(config.namespace, item_name, value)
|
||||
|
||||
|
||||
def _remove_deprecated_config_elements(renewalparams):
|
||||
def _remove_deprecated_config_elements(renewalparams: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
"""Removes deprecated config options from the parsed renewalparams.
|
||||
|
||||
:param dict renewalparams: list of parsed renewalparams
|
||||
@@ -202,7 +211,7 @@ def _remove_deprecated_config_elements(renewalparams):
|
||||
if option_name not in cli.DEPRECATED_OPTIONS}
|
||||
|
||||
|
||||
def _restore_pref_challs(unused_name, value):
|
||||
def _restore_pref_challs(unused_name: str, value: Union[List[str], str]) -> List[str]:
|
||||
"""Restores preferred challenges from a renewal config file.
|
||||
|
||||
If value is a `str`, it should be a single challenge type.
|
||||
@@ -224,7 +233,7 @@ def _restore_pref_challs(unused_name, value):
|
||||
return cli.parse_preferred_challenges(value)
|
||||
|
||||
|
||||
def _restore_bool(name, value):
|
||||
def _restore_bool(name: str, value: str) -> bool:
|
||||
"""Restores a boolean key-value pair from a renewal config file.
|
||||
|
||||
:param str name: option name
|
||||
@@ -243,7 +252,7 @@ def _restore_bool(name, value):
|
||||
return lowercase_value == "true"
|
||||
|
||||
|
||||
def _restore_int(name, value):
|
||||
def _restore_int(name: str, value: str) -> int:
|
||||
"""Restores an integer key-value pair from a renewal config file.
|
||||
|
||||
:param str name: option name
|
||||
@@ -265,7 +274,7 @@ def _restore_int(name, value):
|
||||
raise errors.Error("Expected a numeric value for {0}".format(name))
|
||||
|
||||
|
||||
def _restore_str(name, value):
|
||||
def _restore_str(name: str, value: str) -> Optional[str]:
|
||||
"""Restores a string key-value pair from a renewal config file.
|
||||
|
||||
:param str name: option name
|
||||
@@ -290,7 +299,7 @@ def _restore_str(name, value):
|
||||
return None if value == "None" else value
|
||||
|
||||
|
||||
def should_renew(config, lineage):
|
||||
def should_renew(config: configuration.NamespaceConfig, lineage: storage.RenewableCert) -> bool:
|
||||
"""Return true if any of the circumstances for automatic renewal apply."""
|
||||
if config.renew_by_default:
|
||||
logger.debug("Auto-renewal forced with --force-renewal...")
|
||||
@@ -305,7 +314,8 @@ def should_renew(config, lineage):
|
||||
return False
|
||||
|
||||
|
||||
def _avoid_invalidating_lineage(config, lineage, original_server):
|
||||
def _avoid_invalidating_lineage(config: configuration.NamespaceConfig,
|
||||
lineage: storage.RenewableCert, original_server: str) -> None:
|
||||
"""Do not renew a valid cert with one from a staging server!"""
|
||||
if util.is_staging(config.server):
|
||||
if not util.is_staging(original_server):
|
||||
@@ -344,7 +354,7 @@ def renew_cert(config: configuration.NamespaceConfig, domains: Optional[List[str
|
||||
hooks.renew_hook(config, domains, lineage.live_dir)
|
||||
|
||||
|
||||
def report(msgs, category):
|
||||
def report(msgs: Iterable[str], category: str) -> str:
|
||||
"""Format a results report for a category of renewal outcomes"""
|
||||
lines = ("%s (%s)" % (m, category) for m in msgs)
|
||||
return " " + "\n ".join(lines)
|
||||
@@ -398,7 +408,7 @@ def _renew_describe_results(config: configuration.NamespaceConfig, renew_success
|
||||
notify(display_obj.SIDE_FRAME)
|
||||
|
||||
|
||||
def handle_renewal_request(config):
|
||||
def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
|
||||
"""Examine each lineage; renew if due and report results"""
|
||||
|
||||
# This is trivially False if config.domains is empty
|
||||
@@ -448,7 +458,7 @@ def handle_renewal_request(config):
|
||||
continue
|
||||
|
||||
try:
|
||||
if renewal_candidate is None:
|
||||
if not renewal_candidate:
|
||||
parse_failures.append(renewal_file)
|
||||
else:
|
||||
# This call is done only for retro-compatibility purposes.
|
||||
@@ -490,7 +500,8 @@ def handle_renewal_request(config):
|
||||
lineagename, e
|
||||
)
|
||||
logger.debug("Traceback was:\n%s", traceback.format_exc())
|
||||
renew_failures.append(renewal_candidate.fullchain)
|
||||
if renewal_candidate:
|
||||
renew_failures.append(renewal_candidate.fullchain)
|
||||
|
||||
# Describe all the results
|
||||
_renew_describe_results(config, renew_successes, renew_failures,
|
||||
|
||||
@@ -5,6 +5,7 @@ import queue
|
||||
import sys
|
||||
import textwrap
|
||||
|
||||
from certbot import configuration
|
||||
from certbot import util
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -27,11 +28,11 @@ class Reporter:
|
||||
|
||||
_msg_type = collections.namedtuple('_msg_type', 'priority text on_crash')
|
||||
|
||||
def __init__(self, config):
|
||||
def __init__(self, config: configuration.NamespaceConfig) -> None:
|
||||
self.messages: queue.PriorityQueue[Reporter._msg_type] = queue.PriorityQueue()
|
||||
self.config = config
|
||||
|
||||
def add_message(self, msg, priority, on_crash=True):
|
||||
def add_message(self, msg: str, priority: int, on_crash: bool = True) -> None:
|
||||
"""Adds msg to the list of messages to be printed.
|
||||
|
||||
:param str msg: Message to be displayed to the user.
|
||||
@@ -47,7 +48,7 @@ class Reporter:
|
||||
self.messages.put(self._msg_type(priority, msg, on_crash))
|
||||
logger.debug("Reporting to user: %s", msg)
|
||||
|
||||
def print_messages(self):
|
||||
def print_messages(self) -> None:
|
||||
"""Prints messages to the user and clears the message queue.
|
||||
|
||||
If there is an unhandled exception, only messages for which
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
"""Module configuring Certbot in a snap environment"""
|
||||
import logging
|
||||
import socket
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from requests import Session
|
||||
from requests.adapters import HTTPAdapter
|
||||
@@ -79,23 +81,24 @@ def prepare_env(cli_args: List[str]) -> List[str]:
|
||||
|
||||
|
||||
class _SnapdConnection(HTTPConnection):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__("localhost")
|
||||
self.sock = None
|
||||
self.sock: Optional[socket.socket] = None
|
||||
|
||||
def connect(self):
|
||||
def connect(self) -> None:
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.sock.connect("/run/snapd.socket")
|
||||
|
||||
|
||||
class _SnapdConnectionPool(HTTPConnectionPool):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__("localhost")
|
||||
|
||||
def _new_conn(self):
|
||||
def _new_conn(self) -> _SnapdConnection:
|
||||
return _SnapdConnection()
|
||||
|
||||
|
||||
class _SnapdAdapter(HTTPAdapter):
|
||||
def get_connection(self, url, proxies=None):
|
||||
def get_connection(self, url: str,
|
||||
proxies: Optional[Iterable[str]] = None) -> _SnapdConnectionPool:
|
||||
return _SnapdConnectionPool()
|
||||
|
||||
@@ -5,8 +5,13 @@ import logging
|
||||
import re
|
||||
import shutil
|
||||
import stat
|
||||
from typing import cast
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import configobj
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
@@ -39,7 +44,7 @@ CURRENT_VERSION = pkg_resources.parse_version(certbot.__version__)
|
||||
BASE_PRIVKEY_MODE = 0o600
|
||||
|
||||
|
||||
def renewal_conf_files(config: configuration.NamespaceConfig):
|
||||
def renewal_conf_files(config: configuration.NamespaceConfig) -> List[str]:
|
||||
"""Build a list of all renewal configuration files.
|
||||
|
||||
:param configuration.NamespaceConfig config: Configuration object
|
||||
@@ -53,7 +58,7 @@ def renewal_conf_files(config: configuration.NamespaceConfig):
|
||||
return result
|
||||
|
||||
|
||||
def renewal_file_for_certname(config, certname):
|
||||
def renewal_file_for_certname(config: configuration.NamespaceConfig, certname: str) -> str:
|
||||
"""Return /path/to/certname.conf in the renewal conf directory"""
|
||||
path = os.path.join(config.renewal_configs_dir, "{0}.conf".format(certname))
|
||||
if not os.path.exists(path):
|
||||
@@ -74,7 +79,8 @@ def cert_path_for_cert_name(config: configuration.NamespaceConfig, cert_name: st
|
||||
cert_name_implied_conf, encoding='utf-8', default_encoding='utf-8')["fullchain"]
|
||||
|
||||
|
||||
def config_with_defaults(config=None):
|
||||
def config_with_defaults(config: Optional[configuration.NamespaceConfig] = None
|
||||
) -> configobj.ConfigObj:
|
||||
"""Merge supplied config, if provided, on top of builtin defaults."""
|
||||
defaults_copy = configobj.ConfigObj(
|
||||
constants.RENEWER_DEFAULTS, encoding='utf-8', default_encoding='utf-8')
|
||||
@@ -83,7 +89,9 @@ def config_with_defaults(config=None):
|
||||
return defaults_copy
|
||||
|
||||
|
||||
def add_time_interval(base_time, interval, textparser=parsedatetime.Calendar()):
|
||||
def add_time_interval(base_time: datetime.datetime, interval: str,
|
||||
textparser: parsedatetime.Calendar = parsedatetime.Calendar()
|
||||
) -> datetime.datetime:
|
||||
"""Parse the time specified time interval, and add it to the base_time
|
||||
|
||||
The interval can be in the English-language format understood by
|
||||
@@ -107,7 +115,9 @@ def add_time_interval(base_time, interval, textparser=parsedatetime.Calendar()):
|
||||
return textparser.parseDT(interval, base_time, tzinfo=tzinfo)[0]
|
||||
|
||||
|
||||
def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_data):
|
||||
def write_renewal_config(o_filename: str, n_filename: str, archive_dir: str,
|
||||
target: Mapping[str, str],
|
||||
relevant_data: Mapping[str, Any]) -> configobj.ConfigObj:
|
||||
"""Writes a renewal config file with the specified name and values.
|
||||
|
||||
:param str o_filename: Absolute path to the previous version of config file
|
||||
@@ -160,7 +170,8 @@ def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_d
|
||||
return config
|
||||
|
||||
|
||||
def rename_renewal_config(prev_name, new_name, cli_config):
|
||||
def rename_renewal_config(prev_name: str, new_name: str,
|
||||
cli_config: configuration.NamespaceConfig) -> None:
|
||||
"""Renames cli_config.certname's config to cli_config.new_certname.
|
||||
|
||||
:param .NamespaceConfig cli_config: parsed command line
|
||||
@@ -178,7 +189,8 @@ def rename_renewal_config(prev_name, new_name, cli_config):
|
||||
"for the new certificate name.")
|
||||
|
||||
|
||||
def update_configuration(lineagename, archive_dir, target, cli_config):
|
||||
def update_configuration(lineagename: str, archive_dir: str, target: Mapping[str, str],
|
||||
cli_config: configuration.NamespaceConfig) -> configobj.ConfigObj:
|
||||
"""Modifies lineagename's config to contain the specified values.
|
||||
|
||||
:param str lineagename: Name of the lineage being modified
|
||||
@@ -206,7 +218,7 @@ def update_configuration(lineagename, archive_dir, target, cli_config):
|
||||
return configobj.ConfigObj(config_filename, encoding='utf-8', default_encoding='utf-8')
|
||||
|
||||
|
||||
def get_link_target(link):
|
||||
def get_link_target(link: str) -> str:
|
||||
"""Get an absolute path to the target of link.
|
||||
|
||||
:param str link: Path to a symbolic link
|
||||
@@ -228,7 +240,7 @@ def get_link_target(link):
|
||||
return os.path.abspath(target)
|
||||
|
||||
|
||||
def _write_live_readme_to(readme_path, is_base_dir=False):
|
||||
def _write_live_readme_to(readme_path: str, is_base_dir: bool = False) -> None:
|
||||
prefix = ""
|
||||
if is_base_dir:
|
||||
prefix = "[cert name]/"
|
||||
@@ -249,7 +261,7 @@ def _write_live_readme_to(readme_path, is_base_dir=False):
|
||||
"certificates.\n".format(prefix=prefix))
|
||||
|
||||
|
||||
def _relevant(namespaces, option):
|
||||
def _relevant(namespaces: Iterable[str], option: str) -> bool:
|
||||
"""
|
||||
Is this option one that could be restored for future renewal purposes?
|
||||
|
||||
@@ -265,7 +277,7 @@ def _relevant(namespaces, option):
|
||||
any(option.startswith(namespace) for namespace in namespaces))
|
||||
|
||||
|
||||
def relevant_values(all_values):
|
||||
def relevant_values(all_values: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
"""Return a new dict containing only items relevant for renewal.
|
||||
|
||||
:param dict all_values: The original values.
|
||||
@@ -287,7 +299,8 @@ def relevant_values(all_values):
|
||||
rv["server"] = all_values["server"]
|
||||
return rv
|
||||
|
||||
def lineagename_for_filename(config_filename):
|
||||
|
||||
def lineagename_for_filename(config_filename: str) -> str:
|
||||
"""Returns the lineagename for a configuration filename.
|
||||
"""
|
||||
if not config_filename.endswith(".conf"):
|
||||
@@ -295,16 +308,21 @@ def lineagename_for_filename(config_filename):
|
||||
"renewal config file name must end in .conf")
|
||||
return os.path.basename(config_filename[:-len(".conf")])
|
||||
|
||||
def renewal_filename_for_lineagename(config, lineagename):
|
||||
|
||||
def renewal_filename_for_lineagename(config: configuration.NamespaceConfig,
|
||||
lineagename: str) -> str:
|
||||
"""Returns the lineagename for a configuration filename.
|
||||
"""
|
||||
return os.path.join(config.renewal_configs_dir, lineagename) + ".conf"
|
||||
|
||||
def _relpath_from_file(archive_dir, from_file):
|
||||
|
||||
def _relpath_from_file(archive_dir: str, from_file: str) -> str:
|
||||
"""Path to a directory from a file"""
|
||||
return os.path.relpath(archive_dir, os.path.dirname(from_file))
|
||||
|
||||
def full_archive_path(config_obj, cli_config, lineagename):
|
||||
|
||||
def full_archive_path(config_obj: configobj.ConfigObj, cli_config: configuration.NamespaceConfig,
|
||||
lineagename: str) -> str:
|
||||
"""Returns the full archive path for a lineagename
|
||||
|
||||
Uses cli_config to determine archive path if not available from config_obj.
|
||||
@@ -317,11 +335,13 @@ def full_archive_path(config_obj, cli_config, lineagename):
|
||||
return config_obj["archive_dir"]
|
||||
return os.path.join(cli_config.default_archive_dir, lineagename)
|
||||
|
||||
def _full_live_path(cli_config, lineagename):
|
||||
|
||||
def _full_live_path(cli_config: configuration.NamespaceConfig, lineagename: str) -> str:
|
||||
"""Returns the full default live path for a lineagename"""
|
||||
return os.path.join(cli_config.live_dir, lineagename)
|
||||
|
||||
def delete_files(config, certname):
|
||||
|
||||
def delete_files(config: configuration.NamespaceConfig, certname: str) -> None:
|
||||
"""Delete all files related to the certificate.
|
||||
|
||||
If some files are not found, ignore them and continue.
|
||||
@@ -423,7 +443,8 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
renewal configuration file and/or systemwide defaults.
|
||||
|
||||
"""
|
||||
def __init__(self, config_filename, cli_config, update_symlinks=False):
|
||||
def __init__(self, config_filename: str, cli_config: configuration.NamespaceConfig,
|
||||
update_symlinks: bool = False) -> None:
|
||||
"""Instantiate a RenewableCert object from an existing lineage.
|
||||
|
||||
:param str config_filename: the path to the renewal config file
|
||||
@@ -477,27 +498,27 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
self._check_symlinks()
|
||||
|
||||
@property
|
||||
def key_path(self):
|
||||
def key_path(self) -> str:
|
||||
"""Duck type for self.privkey"""
|
||||
return self.privkey
|
||||
|
||||
@property
|
||||
def cert_path(self):
|
||||
def cert_path(self) -> str:
|
||||
"""Duck type for self.cert"""
|
||||
return self.cert
|
||||
|
||||
@property
|
||||
def chain_path(self):
|
||||
def chain_path(self) -> str:
|
||||
"""Duck type for self.chain"""
|
||||
return self.chain
|
||||
|
||||
@property
|
||||
def fullchain_path(self):
|
||||
def fullchain_path(self) -> str:
|
||||
"""Duck type for self.fullchain"""
|
||||
return self.fullchain
|
||||
|
||||
@property
|
||||
def lineagename(self):
|
||||
def lineagename(self) -> str:
|
||||
"""Name given to the certificate lineage.
|
||||
|
||||
:rtype: str
|
||||
@@ -506,21 +527,24 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return self._lineagename
|
||||
|
||||
@property
|
||||
def target_expiry(self):
|
||||
def target_expiry(self) -> datetime.datetime:
|
||||
"""The current target certificate's expiration datetime
|
||||
|
||||
:returns: Expiration datetime of the current target certificate
|
||||
:rtype: :class:`datetime.datetime`
|
||||
"""
|
||||
return crypto_util.notAfter(self.current_target("cert"))
|
||||
cert_path = self.current_target("cert")
|
||||
if not cert_path:
|
||||
raise errors.Error("Target certificate does not exist.")
|
||||
return crypto_util.notAfter(cert_path)
|
||||
|
||||
@property
|
||||
def archive_dir(self):
|
||||
def archive_dir(self) -> str:
|
||||
"""Returns the default or specified archive directory"""
|
||||
return full_archive_path(self.configuration,
|
||||
self.cli_config, self.lineagename)
|
||||
|
||||
def relative_archive_dir(self, from_file):
|
||||
def relative_archive_dir(self, from_file: str) -> str:
|
||||
"""Returns the default or specified archive directory as a relative path
|
||||
|
||||
Used for creating symbolic links.
|
||||
@@ -539,7 +563,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return util.is_staging(self.server)
|
||||
return False
|
||||
|
||||
def _check_symlinks(self):
|
||||
def _check_symlinks(self) -> None:
|
||||
"""Raises an exception if a symlink doesn't exist"""
|
||||
for kind in ALL_FOUR:
|
||||
link = getattr(self, kind)
|
||||
@@ -551,7 +575,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
raise errors.CertStorageError("target {0} of symlink {1} does "
|
||||
"not exist".format(target, link))
|
||||
|
||||
def _update_symlinks(self):
|
||||
def _update_symlinks(self) -> None:
|
||||
"""Updates symlinks to use archive_dir"""
|
||||
for kind in ALL_FOUR:
|
||||
link = getattr(self, kind)
|
||||
@@ -562,7 +586,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
os.unlink(link)
|
||||
os.symlink(new_link, link)
|
||||
|
||||
def _consistent(self):
|
||||
def _consistent(self) -> bool:
|
||||
"""Are the files associated with this lineage self-consistent?
|
||||
|
||||
:returns: Whether the files stored in connection with this
|
||||
@@ -630,7 +654,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
# for x in ALL_FOUR))) == 1
|
||||
return True
|
||||
|
||||
def _fix(self):
|
||||
def _fix(self) -> None:
|
||||
"""Attempt to fix defects or inconsistencies in this lineage.
|
||||
|
||||
.. todo:: Currently unimplemented.
|
||||
@@ -648,7 +672,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
# happen as a result of random tampering by a sysadmin, or
|
||||
# filesystem errors, or crashes.)
|
||||
|
||||
def _previous_symlinks(self):
|
||||
def _previous_symlinks(self) -> List[Tuple[str, str]]:
|
||||
"""Returns the kind and path of all symlinks used in recovery.
|
||||
|
||||
:returns: list of (kind, symlink) tuples
|
||||
@@ -663,7 +687,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
|
||||
return previous_symlinks
|
||||
|
||||
def _fix_symlinks(self):
|
||||
def _fix_symlinks(self) -> None:
|
||||
"""Fixes symlinks in the event of an incomplete version update.
|
||||
|
||||
If there is no problem with the current symlinks, this function
|
||||
@@ -682,7 +706,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
if os.path.exists(link):
|
||||
os.unlink(link)
|
||||
|
||||
def current_target(self, kind):
|
||||
def current_target(self, kind: str) -> Optional[str]:
|
||||
"""Returns full path to which the specified item currently points.
|
||||
|
||||
:param str kind: the lineage member item ("cert", "privkey",
|
||||
@@ -702,7 +726,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return None
|
||||
return get_link_target(link)
|
||||
|
||||
def current_version(self, kind):
|
||||
def current_version(self, kind: str) -> Optional[int]:
|
||||
"""Returns numerical version of the specified item.
|
||||
|
||||
For example, if kind is "chain" and the current chain link
|
||||
@@ -729,7 +753,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
logger.debug("No matches for target %s.", kind)
|
||||
return None
|
||||
|
||||
def version(self, kind, version):
|
||||
def version(self, kind: str, version: int) -> str:
|
||||
"""The filename that corresponds to the specified version and kind.
|
||||
|
||||
.. warning:: The specified version may not exist in this
|
||||
@@ -746,10 +770,13 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
"""
|
||||
if kind not in ALL_FOUR:
|
||||
raise errors.CertStorageError("unknown kind of item")
|
||||
where = os.path.dirname(self.current_target(kind))
|
||||
link = self.current_target(kind)
|
||||
if not link:
|
||||
raise errors.Error(f"Target {kind} does not exist!")
|
||||
where = os.path.dirname(link)
|
||||
return os.path.join(where, "{0}{1}.pem".format(kind, version))
|
||||
|
||||
def available_versions(self, kind):
|
||||
def available_versions(self, kind: str) -> List[int]:
|
||||
"""Which alternative versions of the specified kind of item exist?
|
||||
|
||||
The archive directory where the current version is stored is
|
||||
@@ -764,13 +791,16 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
"""
|
||||
if kind not in ALL_FOUR:
|
||||
raise errors.CertStorageError("unknown kind of item")
|
||||
where = os.path.dirname(self.current_target(kind))
|
||||
link = self.current_target(kind)
|
||||
if not link:
|
||||
raise errors.Error(f"Target {kind} does not exist!")
|
||||
where = os.path.dirname(link)
|
||||
files = os.listdir(where)
|
||||
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
|
||||
matches = [pattern.match(f) for f in files]
|
||||
return sorted([int(m.groups()[0]) for m in matches if m])
|
||||
|
||||
def newest_available_version(self, kind):
|
||||
def newest_available_version(self, kind: str) -> int:
|
||||
"""Newest available version of the specified kind of item?
|
||||
|
||||
:param str kind: the lineage member item (``cert``,
|
||||
@@ -782,7 +812,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
"""
|
||||
return max(self.available_versions(kind))
|
||||
|
||||
def latest_common_version(self):
|
||||
def latest_common_version(self) -> int:
|
||||
"""Newest version for which all items are available?
|
||||
|
||||
:returns: the newest available version for which all members
|
||||
@@ -797,7 +827,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
versions = [self.available_versions(x) for x in ALL_FOUR]
|
||||
return max(n for n in versions[0] if all(n in v for v in versions[1:]))
|
||||
|
||||
def next_free_version(self):
|
||||
def next_free_version(self) -> int:
|
||||
"""Smallest version newer than all full or partial versions?
|
||||
|
||||
:returns: the smallest version number that is larger than any
|
||||
@@ -811,7 +841,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
# for the others.
|
||||
return max(self.newest_available_version(x) for x in ALL_FOUR) + 1
|
||||
|
||||
def ensure_deployed(self):
|
||||
def ensure_deployed(self) -> bool:
|
||||
"""Make sure we've deployed the latest version.
|
||||
|
||||
:returns: False if a change was needed, True otherwise
|
||||
@@ -826,8 +856,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def has_pending_deployment(self):
|
||||
def has_pending_deployment(self) -> bool:
|
||||
"""Is there a later version of all of the managed items?
|
||||
|
||||
:returns: ``True`` if there is a complete version of this
|
||||
@@ -836,12 +865,18 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
all_versions: List[int] = []
|
||||
for item in ALL_FOUR:
|
||||
version = self.current_version(item)
|
||||
if version is None:
|
||||
raise errors.Error(f"{item} is required but missing for this certificate.")
|
||||
all_versions.append(version)
|
||||
# TODO: consider whether to assume consistency or treat
|
||||
# inconsistent/consistent versions differently
|
||||
smallest_current = min(self.current_version(x) for x in ALL_FOUR)
|
||||
smallest_current = min(all_versions)
|
||||
return smallest_current < self.latest_common_version()
|
||||
|
||||
def _update_link_to(self, kind, version):
|
||||
def _update_link_to(self, kind: str, version: int) -> None:
|
||||
"""Make the specified item point at the specified version.
|
||||
|
||||
(Note that this method doesn't verify that the specified version
|
||||
@@ -867,7 +902,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
os.unlink(link)
|
||||
os.symlink(os.path.join(target_directory, filename), link)
|
||||
|
||||
def update_all_links_to(self, version):
|
||||
def update_all_links_to(self, version: int) -> None:
|
||||
"""Change all member objects to point to the specified version.
|
||||
|
||||
:param int version: the desired version
|
||||
@@ -876,7 +911,10 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
with error_handler.ErrorHandler(self._fix_symlinks):
|
||||
previous_links = self._previous_symlinks()
|
||||
for kind, link in previous_links:
|
||||
os.symlink(self.current_target(kind), link)
|
||||
target = self.current_target(kind)
|
||||
if not target:
|
||||
raise errors.Error(f"Target {kind} does not exist!")
|
||||
os.symlink(target, link)
|
||||
|
||||
for kind in ALL_FOUR:
|
||||
self._update_link_to(kind, version)
|
||||
@@ -884,7 +922,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
for _, link in previous_links:
|
||||
os.unlink(link)
|
||||
|
||||
def names(self):
|
||||
def names(self) -> List[str]:
|
||||
"""What are the subject names of this certificate?
|
||||
|
||||
:returns: the subject names
|
||||
@@ -895,11 +933,10 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
target = self.current_target("cert")
|
||||
if target is None:
|
||||
raise errors.CertStorageError("could not find the certificate file")
|
||||
with open(target) as f:
|
||||
# TODO: Remove the cast once certbot package is fully typed
|
||||
return crypto_util.get_names_from_cert(cast(bytes, f.read()))
|
||||
with open(target, "rb") as f:
|
||||
return crypto_util.get_names_from_cert(f.read())
|
||||
|
||||
def ocsp_revoked(self, version):
|
||||
def ocsp_revoked(self, version: int) -> bool:
|
||||
"""Is the specified cert version revoked according to OCSP?
|
||||
|
||||
Also returns True if the cert version is declared as revoked
|
||||
@@ -927,7 +964,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
logger.debug(str(e))
|
||||
return False
|
||||
|
||||
def autorenewal_is_enabled(self):
|
||||
def autorenewal_is_enabled(self) -> bool:
|
||||
"""Is automatic renewal enabled for this cert?
|
||||
|
||||
If autorenew is not specified, defaults to True.
|
||||
@@ -939,7 +976,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return ("autorenew" not in self.configuration["renewalparams"] or
|
||||
self.configuration["renewalparams"].as_bool("autorenew"))
|
||||
|
||||
def should_autorenew(self):
|
||||
def should_autorenew(self) -> bool:
|
||||
"""Should we now try to autorenew the most recent cert version?
|
||||
|
||||
This is a policy question and does not only depend on whether
|
||||
@@ -977,7 +1014,8 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def new_lineage(cls, lineagename, cert, privkey, chain, cli_config):
|
||||
def new_lineage(cls, lineagename: str, cert: bytes, privkey: bytes, chain: bytes,
|
||||
cli_config: configuration.NamespaceConfig) -> "RenewableCert":
|
||||
"""Create a new certificate lineage.
|
||||
|
||||
Attempts to create a certificate lineage -- enrolled for
|
||||
@@ -1072,7 +1110,7 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
return cls(new_config.filename, cli_config)
|
||||
|
||||
@property
|
||||
def private_key_type(self):
|
||||
def private_key_type(self) -> str:
|
||||
"""
|
||||
:returns: The type of algorithm for the private, RSA or ECDSA
|
||||
:rtype: str
|
||||
@@ -1088,8 +1126,8 @@ class RenewableCert(interfaces.RenewableCert):
|
||||
else:
|
||||
return "ECDSA"
|
||||
|
||||
def save_successor(self, prior_version, new_cert,
|
||||
new_privkey, new_chain, cli_config):
|
||||
def save_successor(self, prior_version: int, new_cert: bytes, new_privkey: bytes,
|
||||
new_chain: bytes, cli_config: configuration.NamespaceConfig) -> int:
|
||||
"""Save new cert and chain as a successor of a prior version.
|
||||
|
||||
Returns the new version number that was created.
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
"""Updaters run at renewal"""
|
||||
import logging
|
||||
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot._internal import storage
|
||||
from certbot._internal.plugins import disco as plugin_disco
|
||||
from certbot._internal.plugins import selection as plug_sel
|
||||
from certbot.plugins import enhancements
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def run_generic_updaters(config, lineage, plugins):
|
||||
|
||||
def run_generic_updaters(config: configuration.NamespaceConfig, lineage: storage.RenewableCert,
|
||||
plugins: plugin_disco.PluginsRegistry) -> None:
|
||||
"""Run updaters that the plugin supports
|
||||
|
||||
:param config: Configuration object
|
||||
@@ -35,7 +40,9 @@ def run_generic_updaters(config, lineage, plugins):
|
||||
_run_updaters(lineage, installer, config)
|
||||
_run_enhancement_updaters(lineage, installer, config)
|
||||
|
||||
def run_renewal_deployer(config, lineage, installer):
|
||||
|
||||
def run_renewal_deployer(config: configuration.NamespaceConfig, lineage: storage.RenewableCert,
|
||||
installer: interfaces.Installer) -> None:
|
||||
"""Helper function to run deployer interface method if supported by the used
|
||||
installer plugin.
|
||||
|
||||
@@ -60,7 +67,9 @@ def run_renewal_deployer(config, lineage, installer):
|
||||
installer.renew_deploy(lineage)
|
||||
_run_enhancement_deployers(lineage, installer, config)
|
||||
|
||||
def _run_updaters(lineage, installer, config):
|
||||
|
||||
def _run_updaters(lineage: storage.RenewableCert, installer: interfaces.Installer,
|
||||
config: configuration.NamespaceConfig) -> None:
|
||||
"""Helper function to run the updater interface methods if supported by the
|
||||
used installer plugin.
|
||||
|
||||
@@ -77,7 +86,9 @@ def _run_updaters(lineage, installer, config):
|
||||
if isinstance(installer, interfaces.GenericUpdater):
|
||||
installer.generic_updates(lineage)
|
||||
|
||||
def _run_enhancement_updaters(lineage, installer, config):
|
||||
|
||||
def _run_enhancement_updaters(lineage: storage.RenewableCert, installer: interfaces.Installer,
|
||||
config: configuration.NamespaceConfig) -> None:
|
||||
"""Iterates through known enhancement interfaces. If the installer implements
|
||||
an enhancement interface and the enhance interface has an updater method, the
|
||||
updater method gets run.
|
||||
@@ -99,7 +110,8 @@ def _run_enhancement_updaters(lineage, installer, config):
|
||||
getattr(installer, enh["updater_function"])(lineage)
|
||||
|
||||
|
||||
def _run_enhancement_deployers(lineage, installer, config):
|
||||
def _run_enhancement_deployers(lineage: storage.RenewableCert, installer: interfaces.Installer,
|
||||
config: configuration.NamespaceConfig) -> None:
|
||||
"""Iterates through known enhancement interfaces. If the installer implements
|
||||
an enhancement interface and the enhance interface has an deployer method, the
|
||||
deployer method gets run.
|
||||
|
||||
Reference in New Issue
Block a user