mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
Fully type certbot-compatibility-test (#9133)
* Finish typing the module * Use cast * Precise type
This commit is contained in:
@@ -1,21 +1,26 @@
|
||||
"""Provides a common base for Apache proxies"""
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from unittest import mock
|
||||
|
||||
from certbot import errors as le_errors, configuration
|
||||
from certbot import util as certbot_util
|
||||
from certbot_apache._internal import entrypoint
|
||||
from certbot_compatibility_test import errors
|
||||
from certbot_compatibility_test import util
|
||||
from certbot_compatibility_test.configurators import common as configurators_common
|
||||
|
||||
from certbot import configuration
|
||||
from certbot import errors as le_errors
|
||||
from certbot import util as certbot_util
|
||||
|
||||
|
||||
class Proxy(configurators_common.Proxy):
|
||||
"""A common base for Apache test configurators"""
|
||||
|
||||
def __init__(self, args):
|
||||
def __init__(self, args: argparse.Namespace) -> None:
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
super().__init__(args)
|
||||
self.le_config.apache_le_vhost_ext = "-le-ssl.conf"
|
||||
@@ -27,7 +32,7 @@ class Proxy(configurators_common.Proxy):
|
||||
mock_display.side_effect = le_errors.PluginError(
|
||||
"Unable to determine vhost")
|
||||
|
||||
def load_config(self):
|
||||
def load_config(self) -> str:
|
||||
"""Loads the next configuration for the plugin to test"""
|
||||
config = super().load_config()
|
||||
self._all_names, self._test_names = _get_names(config)
|
||||
@@ -47,7 +52,7 @@ class Proxy(configurators_common.Proxy):
|
||||
|
||||
return config
|
||||
|
||||
def _prepare_configurator(self):
|
||||
def _prepare_configurator(self) -> None:
|
||||
"""Prepares the Apache plugin for testing"""
|
||||
for k in entrypoint.ENTRYPOINT.OS_DEFAULTS.__dict__.keys():
|
||||
setattr(self.le_config, "apache_" + k,
|
||||
@@ -58,13 +63,13 @@ class Proxy(configurators_common.Proxy):
|
||||
name="apache")
|
||||
self._configurator.prepare()
|
||||
|
||||
def cleanup_from_tests(self):
|
||||
def cleanup_from_tests(self) -> None:
|
||||
"""Performs any necessary cleanup from running plugin tests"""
|
||||
super().cleanup_from_tests()
|
||||
mock.patch.stopall()
|
||||
|
||||
|
||||
def _get_server_root(config):
|
||||
def _get_server_root(config: str) -> str:
|
||||
"""Returns the server root directory in config"""
|
||||
subdirs = [
|
||||
name for name in os.listdir(config)
|
||||
@@ -76,7 +81,7 @@ def _get_server_root(config):
|
||||
return os.path.join(config, subdirs[0].rstrip())
|
||||
|
||||
|
||||
def _get_names(config):
|
||||
def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
|
||||
"""Returns all and testable domain names in config"""
|
||||
all_names = set()
|
||||
non_ip_names = set()
|
||||
|
||||
@@ -1,15 +1,28 @@
|
||||
"""Provides a common base for configurator proxies"""
|
||||
from abc import abstractmethod
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import overload
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import Union
|
||||
|
||||
from certbot._internal import constants
|
||||
from certbot_compatibility_test import interfaces
|
||||
from certbot_compatibility_test import errors
|
||||
from certbot_compatibility_test import interfaces
|
||||
from certbot_compatibility_test import util
|
||||
|
||||
from acme import challenges
|
||||
from acme.challenges import Challenge
|
||||
from certbot._internal import constants
|
||||
from certbot.achallenges import AnnotatedChallenge
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -17,10 +30,10 @@ class Proxy(interfaces.ConfiguratorProxy):
|
||||
"""A common base for compatibility test configurators"""
|
||||
|
||||
@classmethod
|
||||
def add_parser_arguments(cls, parser):
|
||||
def add_parser_arguments(cls, parser: argparse.ArgumentParser) -> None:
|
||||
"""Adds command line arguments needed by the plugin"""
|
||||
|
||||
def __init__(self, args):
|
||||
def __init__(self, args: argparse.Namespace) -> None:
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
super().__init__(args)
|
||||
self._temp_dir = tempfile.mkdtemp()
|
||||
@@ -37,25 +50,34 @@ class Proxy(interfaces.ConfiguratorProxy):
|
||||
self.http_port = 80
|
||||
self.https_port = 443
|
||||
self._configurator: interfaces.Configurator
|
||||
self._all_names = None
|
||||
self._test_names = None
|
||||
self._all_names: Optional[Set[str]] = None
|
||||
self._test_names: Optional[Set[str]] = None
|
||||
|
||||
def has_more_configs(self):
|
||||
def has_more_configs(self) -> bool:
|
||||
"""Returns true if there are more configs to test"""
|
||||
return bool(self._configs)
|
||||
|
||||
@abstractmethod
|
||||
def cleanup_from_tests(self):
|
||||
def cleanup_from_tests(self) -> None:
|
||||
"""Performs any necessary cleanup from running plugin tests"""
|
||||
|
||||
def load_config(self):
|
||||
def load_config(self) -> str:
|
||||
"""Returns the next config directory to be tested"""
|
||||
shutil.rmtree(self.le_config.work_dir, ignore_errors=True)
|
||||
backup = os.path.join(self.le_config.work_dir, constants.BACKUP_DIR)
|
||||
os.makedirs(backup)
|
||||
return self._configs.pop()
|
||||
|
||||
def copy_certs_and_keys(self, cert_path, key_path, chain_path=None):
|
||||
@overload
|
||||
def copy_certs_and_keys(self, cert_path: str, key_path: str,
|
||||
chain_path: str) -> Tuple[str, str, str]: ...
|
||||
|
||||
@overload
|
||||
def copy_certs_and_keys(self, cert_path: str, key_path: str,
|
||||
chain_path: Optional[str]) -> Tuple[str, str, Optional[str]]: ...
|
||||
|
||||
def copy_certs_and_keys(self, cert_path: str, key_path: str,
|
||||
chain_path: Optional[str] = None) -> Tuple[str, str, Optional[str]]:
|
||||
"""Copies certs and keys into the temporary directory"""
|
||||
cert_and_key_dir = os.path.join(self._temp_dir, "certs_and_keys")
|
||||
if not os.path.isdir(cert_and_key_dir):
|
||||
@@ -72,68 +94,67 @@ class Proxy(interfaces.ConfiguratorProxy):
|
||||
|
||||
return cert, key, chain
|
||||
|
||||
def get_all_names_answer(self):
|
||||
def get_all_names_answer(self) -> Set[str]:
|
||||
"""Returns the set of domain names that the plugin should find"""
|
||||
if self._all_names:
|
||||
return self._all_names
|
||||
raise errors.Error("No configuration file loaded")
|
||||
|
||||
def get_testable_domain_names(self):
|
||||
def get_testable_domain_names(self) -> Set[str]:
|
||||
"""Returns the set of domain names that can be tested against"""
|
||||
if self._test_names:
|
||||
return self._test_names
|
||||
return {"example.com"}
|
||||
|
||||
def deploy_cert(self, domain, cert_path, key_path, chain_path=None,
|
||||
fullchain_path=None):
|
||||
def deploy_cert(self, domain: str, cert_path: str, key_path: str, chain_path: str,
|
||||
fullchain_path: str) -> None:
|
||||
"""Installs cert"""
|
||||
cert_path, key_path, chain_path = self.copy_certs_and_keys(
|
||||
cert_path, key_path, chain_path)
|
||||
cert_path, key_path, chain_path = self.copy_certs_and_keys(cert_path, key_path, chain_path)
|
||||
if not self._configurator:
|
||||
raise ValueError("Configurator plugin is not set.")
|
||||
self._configurator.deploy_cert(
|
||||
domain, cert_path, key_path, chain_path, fullchain_path)
|
||||
|
||||
|
||||
def cleanup(self, achalls):
|
||||
def cleanup(self, achalls: List[AnnotatedChallenge]) -> None:
|
||||
self._configurator.cleanup(achalls)
|
||||
|
||||
def config_test(self):
|
||||
def config_test(self) -> None:
|
||||
self._configurator.config_test()
|
||||
|
||||
def enhance(self, domain, enhancement, options = None):
|
||||
def enhance(self, domain: str, enhancement: str,
|
||||
options: Optional[Union[List[str], str]] = None) -> None:
|
||||
self._configurator.enhance(domain, enhancement, options)
|
||||
|
||||
def get_all_names(self):
|
||||
def get_all_names(self) -> Iterable[str]:
|
||||
return self._configurator.get_all_names()
|
||||
|
||||
def get_chall_pref(self, domain):
|
||||
def get_chall_pref(self, domain: str) -> Iterable[Type[Challenge]]:
|
||||
return self._configurator.get_chall_pref(domain)
|
||||
|
||||
@classmethod
|
||||
def inject_parser_options(cls, parser, name):
|
||||
def inject_parser_options(cls, parser: argparse.ArgumentParser, name: str) -> None:
|
||||
pass
|
||||
|
||||
def more_info(self):
|
||||
def more_info(self) -> str:
|
||||
return self._configurator.more_info()
|
||||
|
||||
def perform(self, achalls):
|
||||
def perform(self, achalls: List[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]:
|
||||
return self._configurator.perform(achalls)
|
||||
|
||||
def prepare(self):
|
||||
def prepare(self) -> None:
|
||||
self._configurator.prepare()
|
||||
|
||||
def recovery_routine(self):
|
||||
def recovery_routine(self) -> None:
|
||||
self._configurator.recovery_routine()
|
||||
|
||||
def restart(self):
|
||||
def restart(self) -> None:
|
||||
self._configurator.restart()
|
||||
|
||||
def rollback_checkpoints(self, rollback = 1):
|
||||
def rollback_checkpoints(self, rollback: int = 1) -> None:
|
||||
self._configurator.rollback_checkpoints(rollback)
|
||||
|
||||
def save(self, title = None, temporary = False):
|
||||
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
|
||||
self._configurator.save(title, temporary)
|
||||
|
||||
def supported_enhancements(self):
|
||||
def supported_enhancements(self) -> List[str]:
|
||||
return self._configurator.supported_enhancements()
|
||||
|
||||
@@ -4,8 +4,8 @@ import shutil
|
||||
import subprocess
|
||||
from typing import cast
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
|
||||
from certbot import configuration
|
||||
from certbot_compatibility_test import errors
|
||||
from certbot_compatibility_test import interfaces
|
||||
from certbot_compatibility_test import util
|
||||
@@ -13,11 +13,13 @@ from certbot_compatibility_test.configurators import common as configurators_com
|
||||
from certbot_nginx._internal import configurator
|
||||
from certbot_nginx._internal import constants
|
||||
|
||||
from certbot import configuration
|
||||
|
||||
|
||||
class Proxy(configurators_common.Proxy):
|
||||
"""A common base for Nginx test configurators"""
|
||||
|
||||
def load_config(self):
|
||||
def load_config(self) -> str:
|
||||
"""Loads the next configuration for the plugin to test"""
|
||||
config = super().load_config()
|
||||
self._all_names, self._test_names = _get_names(config)
|
||||
@@ -40,7 +42,7 @@ class Proxy(configurators_common.Proxy):
|
||||
|
||||
return config
|
||||
|
||||
def _prepare_configurator(self):
|
||||
def _prepare_configurator(self) -> None:
|
||||
"""Prepares the Nginx plugin for testing"""
|
||||
for k in constants.CLI_DEFAULTS:
|
||||
setattr(self.le_config, "nginx_" + k, constants.os_constant(k))
|
||||
@@ -50,11 +52,11 @@ class Proxy(configurators_common.Proxy):
|
||||
config=conf, name="nginx"))
|
||||
self._configurator.prepare()
|
||||
|
||||
def cleanup_from_tests(self):
|
||||
def cleanup_from_tests(self) -> None:
|
||||
"""Performs any necessary cleanup from running plugin tests"""
|
||||
|
||||
|
||||
def _get_server_root(config):
|
||||
def _get_server_root(config: str) -> str:
|
||||
"""Returns the server root directory in config"""
|
||||
subdirs = [
|
||||
name for name in os.listdir(config)
|
||||
@@ -66,7 +68,7 @@ def _get_server_root(config):
|
||||
return os.path.join(config, subdirs[0].rstrip())
|
||||
|
||||
|
||||
def _get_names(config):
|
||||
def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
|
||||
"""Returns all and testable domain names in config"""
|
||||
all_names: Set[str] = set()
|
||||
for root, _dirs, files in os.walk(config):
|
||||
@@ -77,7 +79,7 @@ def _get_names(config):
|
||||
return all_names, non_ip_names
|
||||
|
||||
|
||||
def _get_server_names(root, filename):
|
||||
def _get_server_names(root: str, filename: str) -> Set[str]:
|
||||
"""Returns all names in a config file path"""
|
||||
all_names = set()
|
||||
with open(os.path.join(root, filename)) as f:
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
"""Certbot compatibility test interfaces"""
|
||||
from abc import ABCMeta
|
||||
from abc import abstractmethod
|
||||
import argparse
|
||||
from typing import cast
|
||||
from typing import Set
|
||||
|
||||
from certbot import interfaces
|
||||
from certbot.configuration import NamespaceConfig
|
||||
|
||||
|
||||
class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
|
||||
@@ -16,16 +20,16 @@ class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def add_parser_arguments(cls, parser):
|
||||
def add_parser_arguments(cls, parser: argparse.ArgumentParser) -> None:
|
||||
"""Adds command line arguments needed by the parser"""
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, args):
|
||||
def __init__(self, args: argparse.Namespace) -> None:
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
super().__init__(args, 'proxy')
|
||||
super().__init__(cast(NamespaceConfig, args), 'proxy')
|
||||
|
||||
@abstractmethod
|
||||
def cleanup_from_tests(self):
|
||||
def cleanup_from_tests(self) -> None:
|
||||
"""Performs any necessary cleanup from running plugin tests.
|
||||
|
||||
This is guaranteed to be called before the program exits.
|
||||
@@ -33,15 +37,15 @@ class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def has_more_configs(self):
|
||||
def has_more_configs(self) -> bool:
|
||||
"""Returns True if there are more configs to test"""
|
||||
|
||||
@abstractmethod
|
||||
def load_config(self):
|
||||
def load_config(self) -> str:
|
||||
"""Loads the next config and returns its name"""
|
||||
|
||||
@abstractmethod
|
||||
def get_testable_domain_names(self):
|
||||
def get_testable_domain_names(self) -> Set[str]:
|
||||
"""Returns the domain names that can be used in testing"""
|
||||
|
||||
|
||||
@@ -53,7 +57,7 @@ class InstallerProxy(PluginProxy, interfaces.Installer, metaclass=ABCMeta):
|
||||
"""Wraps a Certbot installer"""
|
||||
|
||||
@abstractmethod
|
||||
def get_all_names_answer(self):
|
||||
def get_all_names_answer(self) -> Set[str]:
|
||||
"""Returns all names that should be found by the installer"""
|
||||
|
||||
|
||||
|
||||
@@ -5,13 +5,26 @@ import filecmp
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Generator
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
|
||||
import OpenSSL
|
||||
from certbot_compatibility_test import errors
|
||||
from certbot_compatibility_test import util
|
||||
from certbot_compatibility_test import validator
|
||||
from certbot_compatibility_test.configurators import common
|
||||
from certbot_compatibility_test.configurators.apache import common as a_common
|
||||
from certbot_compatibility_test.configurators.nginx import common as n_common
|
||||
from OpenSSL import crypto
|
||||
from urllib3.util import connection
|
||||
|
||||
from acme import challenges
|
||||
@@ -19,14 +32,9 @@ from acme import crypto_util
|
||||
from acme import messages
|
||||
from certbot import achallenges
|
||||
from certbot import errors as le_errors
|
||||
from certbot.display import util as display_util
|
||||
from certbot._internal.display import obj as display_obj
|
||||
from certbot.display import util as display_util
|
||||
from certbot.tests import acme_util
|
||||
from certbot_compatibility_test import errors
|
||||
from certbot_compatibility_test import util
|
||||
from certbot_compatibility_test import validator
|
||||
from certbot_compatibility_test.configurators.apache import common as a_common
|
||||
from certbot_compatibility_test.configurators.nginx import common as n_common
|
||||
|
||||
DESCRIPTION = """
|
||||
Tests Certbot plugins against different server configurations. It is
|
||||
@@ -35,13 +43,13 @@ tests that the plugin supports are performed.
|
||||
|
||||
"""
|
||||
|
||||
PLUGINS = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
|
||||
PLUGINS: Dict[str, Type[common.Proxy]] = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_authenticator(plugin, config, temp_dir):
|
||||
def test_authenticator(plugin: common.Proxy, config: str, temp_dir: str) -> bool:
|
||||
"""Tests authenticator, returning True if the tests are successful"""
|
||||
backup = _create_backup(config, temp_dir)
|
||||
|
||||
@@ -96,7 +104,7 @@ def test_authenticator(plugin, config, temp_dir):
|
||||
return success
|
||||
|
||||
|
||||
def _create_achalls(plugin):
|
||||
def _create_achalls(plugin: common.Proxy) -> List[achallenges.AnnotatedChallenge]:
|
||||
"""Returns a list of annotated challenges to test on plugin"""
|
||||
achalls = []
|
||||
names = plugin.get_testable_domain_names()
|
||||
@@ -117,7 +125,8 @@ def _create_achalls(plugin):
|
||||
return achalls
|
||||
|
||||
|
||||
def test_installer(args, plugin, config, temp_dir):
|
||||
def test_installer(args: argparse.Namespace, plugin: common.Proxy, config: str,
|
||||
temp_dir: str) -> bool:
|
||||
"""Tests plugin as an installer"""
|
||||
backup = _create_backup(config, temp_dir)
|
||||
|
||||
@@ -137,13 +146,12 @@ def test_installer(args, plugin, config, temp_dir):
|
||||
return names_match and success and good_rollback
|
||||
|
||||
|
||||
def test_deploy_cert(plugin, temp_dir, domains):
|
||||
def test_deploy_cert(plugin: common.Proxy, temp_dir: str, domains: List[str]) -> bool:
|
||||
"""Tests deploy_cert returning True if the tests are successful"""
|
||||
cert = crypto_util.gen_ss_cert(util.KEY, domains)
|
||||
cert_path = os.path.join(temp_dir, "cert.pem")
|
||||
with open(cert_path, "wb") as f:
|
||||
f.write(OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert))
|
||||
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
|
||||
|
||||
for domain in domains:
|
||||
try:
|
||||
@@ -171,7 +179,7 @@ def test_deploy_cert(plugin, temp_dir, domains):
|
||||
return success
|
||||
|
||||
|
||||
def test_enhancements(plugin, domains):
|
||||
def test_enhancements(plugin: common.Proxy, domains: Iterable[str]) -> bool:
|
||||
"""Tests supported enhancements returning True if successful"""
|
||||
supported = plugin.supported_enhancements()
|
||||
|
||||
@@ -216,7 +224,7 @@ def test_enhancements(plugin, domains):
|
||||
return success
|
||||
|
||||
|
||||
def _save_and_restart(plugin, title=None):
|
||||
def _save_and_restart(plugin: common.Proxy, title: Optional[str] = None) -> bool:
|
||||
"""Saves and restart the plugin, returning True if no errors occurred"""
|
||||
try:
|
||||
plugin.save(title)
|
||||
@@ -227,7 +235,7 @@ def _save_and_restart(plugin, title=None):
|
||||
return False
|
||||
|
||||
|
||||
def test_rollback(plugin, config, backup):
|
||||
def test_rollback(plugin: common.Proxy, config: str, backup: str) -> bool:
|
||||
"""Tests the rollback checkpoints function"""
|
||||
try:
|
||||
plugin.rollback_checkpoints(1337)
|
||||
@@ -242,7 +250,7 @@ def test_rollback(plugin, config, backup):
|
||||
return True
|
||||
|
||||
|
||||
def _create_backup(config, temp_dir):
|
||||
def _create_backup(config: str, temp_dir: str) -> str:
|
||||
"""Creates a backup of config in temp_dir"""
|
||||
backup = os.path.join(temp_dir, "backup")
|
||||
shutil.rmtree(backup, ignore_errors=True)
|
||||
@@ -251,7 +259,7 @@ def _create_backup(config, temp_dir):
|
||||
return backup
|
||||
|
||||
|
||||
def _dirs_are_unequal(dir1, dir2):
|
||||
def _dirs_are_unequal(dir1: str, dir2: str) -> bool:
|
||||
"""Returns True if dir1 and dir2 are unequal"""
|
||||
dircmps = [filecmp.dircmp(dir1, dir2)]
|
||||
while dircmps:
|
||||
@@ -283,7 +291,7 @@ def _dirs_are_unequal(dir1, dir2):
|
||||
return False
|
||||
|
||||
|
||||
def get_args():
|
||||
def get_args() -> argparse.Namespace:
|
||||
"""Returns parsed command line arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description=DESCRIPTION,
|
||||
@@ -320,7 +328,7 @@ def get_args():
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(args):
|
||||
def setup_logging(args: argparse.Namespace) -> None:
|
||||
"""Prepares logging for the program"""
|
||||
handler = logging.StreamHandler()
|
||||
|
||||
@@ -329,13 +337,13 @@ def setup_logging(args):
|
||||
root_logger.addHandler(handler)
|
||||
|
||||
|
||||
def setup_display():
|
||||
def setup_display() -> None:
|
||||
""""Prepares a display utility instance for the Certbot plugins """
|
||||
displayer = display_util.NoninteractiveDisplay(sys.stdout)
|
||||
display_obj.set_display(displayer)
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
"""Main test script execution."""
|
||||
args = get_args()
|
||||
setup_logging(args)
|
||||
@@ -379,11 +387,12 @@ def main():
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _fake_dns_resolution(resolved_ip):
|
||||
def _fake_dns_resolution(resolved_ip: str) -> Generator[None, None, None]:
|
||||
"""Monkey patch urllib3 to make any hostname be resolved to the provided IP"""
|
||||
_original_create_connection = connection.create_connection
|
||||
|
||||
def _patched_create_connection(address, *args, **kwargs):
|
||||
def _patched_create_connection(address: Tuple[str, str],
|
||||
*args: Any, **kwargs: Any) -> socket.socket:
|
||||
_, port = address
|
||||
return _original_create_connection((resolved_ip, port), *args, **kwargs)
|
||||
|
||||
|
||||
@@ -6,11 +6,11 @@ import re
|
||||
import shutil
|
||||
import tarfile
|
||||
|
||||
from certbot_compatibility_test import errors
|
||||
import josepy as jose
|
||||
|
||||
from certbot._internal import constants
|
||||
from certbot.tests import util as test_util
|
||||
from certbot_compatibility_test import errors
|
||||
|
||||
_KEY_BASE = "rsa2048_key.pem"
|
||||
KEY_PATH = test_util.vector_path(_KEY_BASE)
|
||||
@@ -19,7 +19,7 @@ JWK = jose.JWKRSA(key=test_util.load_rsa_private_key(_KEY_BASE))
|
||||
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
|
||||
|
||||
|
||||
def create_le_config(parent_dir):
|
||||
def create_le_config(parent_dir: str) -> argparse.Namespace:
|
||||
"""Sets up LE dirs in parent_dir and returns the config dict"""
|
||||
config = copy.deepcopy(constants.CLI_DEFAULTS)
|
||||
|
||||
@@ -36,7 +36,7 @@ def create_le_config(parent_dir):
|
||||
return argparse.Namespace(**config)
|
||||
|
||||
|
||||
def extract_configs(configs, parent_dir):
|
||||
def extract_configs(configs: str, parent_dir: str) -> str:
|
||||
"""Extracts configs to a new dir under parent_dir and returns it"""
|
||||
config_dir = os.path.join(parent_dir, "configs")
|
||||
|
||||
|
||||
@@ -2,7 +2,11 @@
|
||||
import logging
|
||||
import socket
|
||||
from typing import cast
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from OpenSSL import crypto
|
||||
import requests
|
||||
|
||||
from acme import crypto_util
|
||||
@@ -14,10 +18,12 @@ logger = logging.getLogger(__name__)
|
||||
class Validator:
|
||||
"""Collection of functions to test a live webserver's configuration"""
|
||||
|
||||
def certificate(self, cert, name, alt_host=None, port=443):
|
||||
def certificate(self, cert: crypto.X509, name: Union[str, bytes],
|
||||
alt_host: Optional[str] = None, port: int = 443) -> bool:
|
||||
"""Verifies the certificate presented at name is cert"""
|
||||
if alt_host is None:
|
||||
host = socket.gethostbyname(name).encode()
|
||||
# In fact, socket.gethostbyname accepts both bytes and str, but types do not know that.
|
||||
host = socket.gethostbyname(cast(str, name)).encode()
|
||||
elif isinstance(alt_host, bytes):
|
||||
host = alt_host
|
||||
else:
|
||||
@@ -31,9 +37,10 @@ class Validator:
|
||||
return False
|
||||
|
||||
# Despite documentation saying that bytes are expected for digest(), we must provide a str.
|
||||
return presented_cert.digest(cast(bytes, "sha256")) == cert.digest("sha256")
|
||||
return presented_cert.digest(cast(bytes, "sha256")) == cert.digest(cast(bytes, "sha256"))
|
||||
|
||||
def redirect(self, name, port=80, headers=None):
|
||||
def redirect(self, name: str, port: int = 80,
|
||||
headers: Optional[Mapping[str, str]] = None) -> bool:
|
||||
"""Test whether webserver redirects to secure connection."""
|
||||
url = "http://{0}:{1}".format(name, port)
|
||||
if headers:
|
||||
@@ -54,7 +61,8 @@ class Validator:
|
||||
|
||||
return True
|
||||
|
||||
def any_redirect(self, name, port=80, headers=None):
|
||||
def any_redirect(self, name: str, port: int = 80,
|
||||
headers: Optional[Mapping[str, str]] = None) -> bool:
|
||||
"""Test whether webserver redirects."""
|
||||
url = "http://{0}:{1}".format(name, port)
|
||||
if headers:
|
||||
@@ -64,7 +72,7 @@ class Validator:
|
||||
|
||||
return response.status_code in range(300, 309)
|
||||
|
||||
def hsts(self, name):
|
||||
def hsts(self, name: str) -> bool:
|
||||
"""Test for HTTP Strict Transport Security header"""
|
||||
headers = requests.get("https://" + name).headers
|
||||
hsts_header = headers.get("strict-transport-security")
|
||||
@@ -93,6 +101,6 @@ class Validator:
|
||||
|
||||
return True
|
||||
|
||||
def ocsp_stapling(self, name):
|
||||
def ocsp_stapling(self, name: str) -> None:
|
||||
"""Verify ocsp stapling for domain."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -1,122 +1,126 @@
|
||||
"""Tests for certbot_compatibility_test.validator."""
|
||||
from typing import cast
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import OpenSSL
|
||||
from certbot_compatibility_test import validator
|
||||
from OpenSSL import crypto
|
||||
import requests
|
||||
|
||||
from acme import errors as acme_errors
|
||||
from certbot_compatibility_test import validator
|
||||
|
||||
|
||||
class ValidatorTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
self.validator = validator.Validator()
|
||||
|
||||
@mock.patch(
|
||||
"certbot_compatibility_test.validator.crypto_util.probe_sni")
|
||||
def test_certificate_success(self, mock_probe_sni):
|
||||
cert = OpenSSL.crypto.X509()
|
||||
def test_certificate_success(self, mock_probe_sni: mock.MagicMock) -> None:
|
||||
cert = crypto.X509()
|
||||
mock_probe_sni.return_value = cert
|
||||
self.assertTrue(self.validator.certificate(
|
||||
cert, "test.com", "127.0.0.1"))
|
||||
|
||||
@mock.patch(
|
||||
"certbot_compatibility_test.validator.crypto_util.probe_sni")
|
||||
def test_certificate_error(self, mock_probe_sni):
|
||||
cert = OpenSSL.crypto.X509()
|
||||
def test_certificate_error(self, mock_probe_sni: mock.MagicMock) -> None:
|
||||
cert = crypto.X509()
|
||||
mock_probe_sni.side_effect = [acme_errors.Error]
|
||||
self.assertFalse(self.validator.certificate(
|
||||
cert, "test.com", "127.0.0.1"))
|
||||
|
||||
@mock.patch(
|
||||
"certbot_compatibility_test.validator.crypto_util.probe_sni")
|
||||
def test_certificate_failure(self, mock_probe_sni):
|
||||
cert = OpenSSL.crypto.X509()
|
||||
def test_certificate_failure(self, mock_probe_sni: mock.MagicMock) -> None:
|
||||
cert = crypto.X509()
|
||||
cert.set_serial_number(1337)
|
||||
mock_probe_sni.return_value = OpenSSL.crypto.X509()
|
||||
mock_probe_sni.return_value = crypto.X509()
|
||||
self.assertFalse(self.validator.certificate(
|
||||
cert, "test.com", "127.0.0.1"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_successful_redirect(self, mock_get_request):
|
||||
def test_successful_redirect(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
301, {"location": "https://test.com"})
|
||||
self.assertTrue(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_redirect_with_headers(self, mock_get_request):
|
||||
def test_redirect_with_headers(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
301, {"location": "https://test.com"})
|
||||
self.assertTrue(self.validator.redirect(
|
||||
"test.com", headers={"Host": "test.com"}))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_redirect_missing_location(self, mock_get_request):
|
||||
def test_redirect_missing_location(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(301)
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_redirect_wrong_status_code(self, mock_get_request):
|
||||
def test_redirect_wrong_status_code(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
201, {"location": "https://test.com"})
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_redirect_wrong_redirect_code(self, mock_get_request):
|
||||
def test_redirect_wrong_redirect_code(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
303, {"location": "https://test.com"})
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_hsts_empty(self, mock_get_request):
|
||||
def test_hsts_empty(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": ""})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_hsts_malformed(self, mock_get_request):
|
||||
def test_hsts_malformed(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "sdfal"})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_hsts_bad_max_age(self, mock_get_request):
|
||||
def test_hsts_bad_max_age(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=not-an-int"})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_hsts_expire(self, mock_get_request):
|
||||
def test_hsts_expire(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=3600"})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_hsts(self, mock_get_request):
|
||||
def test_hsts(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=31536000"})
|
||||
self.assertTrue(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("certbot_compatibility_test.validator.requests.get")
|
||||
def test_hsts_include_subdomains(self, mock_get_request):
|
||||
def test_hsts_include_subdomains(self, mock_get_request: mock.MagicMock) -> None:
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security":
|
||||
"max-age=31536000;includeSubDomains"})
|
||||
self.assertTrue(self.validator.hsts("test.com"))
|
||||
|
||||
def test_ocsp_stapling(self):
|
||||
def test_ocsp_stapling(self) -> None:
|
||||
self.assertRaises(
|
||||
NotImplementedError, self.validator.ocsp_stapling, "test.com")
|
||||
|
||||
|
||||
def create_response(status_code=200, headers=None):
|
||||
def create_response(status_code: int = 200,
|
||||
headers: Optional[Mapping[str, str]] = None) -> requests.Response:
|
||||
"""Creates a requests.Response object for testing"""
|
||||
response = requests.Response()
|
||||
response.status_code = status_code
|
||||
|
||||
if headers:
|
||||
response.headers = headers
|
||||
response.headers = cast(requests.models.CaseInsensitiveDict, headers)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
4
tox.ini
4
tox.ini
@@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py
|
||||
dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud
|
||||
win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx
|
||||
all_packages = {[base]win_all_packages} certbot-apache
|
||||
fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests tests/lock_test.py
|
||||
partially_typed_source_paths = certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx
|
||||
fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests certbot-compatibility-test/certbot_compatibility_test tests/lock_test.py
|
||||
partially_typed_source_paths = certbot-apache/certbot_apache certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx
|
||||
|
||||
[testenv]
|
||||
passenv =
|
||||
|
||||
Reference in New Issue
Block a user