diff --git a/CHANGELOG.md b/CHANGELOG.md index 826a59dd2..7806b2ffe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ Certbot adheres to [Semantic Versioning](https://semver.org/). * Certbot's `config_changes` subcommand has been removed * `certbot.plugins.common.TLSSNI01` has been removed. +* Deprecated attributes related to the TLS-SNI-01 challenge in + `acme.challenges` and `acme.standalone` + have been removed. * The functions `certbot.client.view_config_changes`, `certbot.main.config_changes`, `certbot.plugins.common.Installer.view_config_changes`, diff --git a/acme/acme/__init__.py b/acme/acme/__init__.py index 7439712b0..8a034470a 100644 --- a/acme/acme/__init__.py +++ b/acme/acme/__init__.py @@ -21,30 +21,3 @@ for mod in list(sys.modules): # preserved (acme.jose.* is josepy.*) if mod == 'josepy' or mod.startswith('josepy.'): sys.modules['acme.' + mod.replace('josepy', 'jose', 1)] = sys.modules[mod] - - -# This class takes a similar approach to the cryptography project to deprecate attributes -# in public modules. See the _ModuleWithDeprecation class here: -# https://github.com/pyca/cryptography/blob/91105952739442a74582d3e62b3d2111365b0dc7/src/cryptography/utils.py#L129 -class _TLSSNI01DeprecationModule(object): - """ - Internal class delegating to a module, and displaying warnings when - attributes related to TLS-SNI-01 are accessed. - """ - def __init__(self, module): - self.__dict__['_module'] = module - - def __getattr__(self, attr): - if 'TLSSNI01' in attr or attr == 'BaseRequestHandlerWithLogging': - warnings.warn('{0} attribute is deprecated, and will be removed soon.'.format(attr), - DeprecationWarning, stacklevel=2) - return getattr(self._module, attr) - - def __setattr__(self, attr, value): # pragma: no cover - setattr(self._module, attr, value) - - def __delattr__(self, attr): # pragma: no cover - delattr(self._module, attr) - - def __dir__(self): # pragma: no cover - return ['_module'] + dir(self._module) diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 78991608a..f417bc47c 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -3,19 +3,13 @@ import abc import functools import hashlib import logging -import socket -import sys from cryptography.hazmat.primitives import hashes # type: ignore import josepy as jose -import OpenSSL import requests import six -from acme import errors -from acme import crypto_util from acme import fields -from acme import _TLSSNI01DeprecationModule logger = logging.getLogger(__name__) @@ -367,148 +361,6 @@ class HTTP01(KeyAuthorizationChallenge): return self.key_authorization(account_key) -@ChallengeResponse.register -class TLSSNI01Response(KeyAuthorizationChallengeResponse): - """ACME tls-sni-01 challenge response.""" - typ = "tls-sni-01" - - DOMAIN_SUFFIX = b".acme.invalid" - """Domain name suffix.""" - - PORT = 443 - """Verification port as defined by the protocol. - - You can override it (e.g. for testing) by passing ``port`` to - `simple_verify`. - - """ - - @property - def z(self): # pylint: disable=invalid-name - """``z`` value used for verification. - - :rtype bytes: - - """ - return hashlib.sha256( - self.key_authorization.encode("utf-8")).hexdigest().lower().encode() - - @property - def z_domain(self): - """Domain name used for verification, generated from `z`. - - :rtype bytes: - - """ - return self.z[:32] + b'.' + self.z[32:] + self.DOMAIN_SUFFIX - - def gen_cert(self, key=None, bits=2048): - """Generate tls-sni-01 certificate. - - :param OpenSSL.crypto.PKey key: Optional private key used in - certificate generation. If not provided (``None``), then - fresh key will be generated. - :param int bits: Number of bits for newly generated key. - - :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` - - """ - if key is None: - key = OpenSSL.crypto.PKey() - key.generate_key(OpenSSL.crypto.TYPE_RSA, bits) - return crypto_util.gen_ss_cert(key, [ - # z_domain is too big to fit into CN, hence first dummy domain - 'dummy', self.z_domain.decode()], force_san=True), key - - def probe_cert(self, domain, **kwargs): - """Probe tls-sni-01 challenge certificate. - - :param unicode domain: - - """ - # TODO: domain is not necessary if host is provided - if "host" not in kwargs: - host = socket.gethostbyname(domain) - logger.debug('%s resolved to %s', domain, host) - kwargs["host"] = host - - kwargs.setdefault("port", self.PORT) - kwargs["name"] = self.z_domain - # TODO: try different methods? - return crypto_util.probe_sni(**kwargs) - - def verify_cert(self, cert): - """Verify tls-sni-01 challenge certificate. - - :param OpensSSL.crypto.X509 cert: Challenge certificate. - - :returns: Whether the certificate was successfully verified. - :rtype: bool - - """ - # pylint: disable=protected-access - sans = crypto_util._pyopenssl_cert_or_req_san(cert) - logger.debug('Certificate %s. SANs: %s', cert.digest('sha256'), sans) - return self.z_domain.decode() in sans - - def simple_verify(self, chall, domain, account_public_key, - cert=None, **kwargs): - """Simple verify. - - Verify ``validation`` using ``account_public_key``, optionally - probe tls-sni-01 certificate and check using `verify_cert`. - - :param .challenges.TLSSNI01 chall: Corresponding challenge. - :param str domain: Domain name being validated. - :param JWK account_public_key: - :param OpenSSL.crypto.X509 cert: Optional certificate. If not - provided (``None``) certificate will be retrieved using - `probe_cert`. - :param int port: Port used to probe the certificate. - - - :returns: ``True`` iff client's control of the domain has been - verified. - :rtype: bool - - """ - if not self.verify(chall, account_public_key): - logger.debug("Verification of key authorization in response failed") - return False - - if cert is None: - try: - cert = self.probe_cert(domain=domain, **kwargs) - except errors.Error as error: - logger.debug(str(error), exc_info=True) - return False - - return self.verify_cert(cert) - - -@Challenge.register # pylint: disable=too-many-ancestors -class TLSSNI01(KeyAuthorizationChallenge): - """ACME tls-sni-01 challenge.""" - response_cls = TLSSNI01Response - typ = response_cls.typ - - # boulder#962, ietf-wg-acme#22 - #n = jose.Field("n", encoder=int, decoder=int) - - def validation(self, account_key, **kwargs): - """Generate validation. - - :param JWK account_key: - :param OpenSSL.crypto.PKey cert_key: Optional private key used - in certificate generation. If not provided (``None``), then - fresh key will be generated. - - :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` - - """ - return self.response(account_key).gen_cert(key=kwargs.get('cert_key')) - - @ChallengeResponse.register class TLSALPN01Response(KeyAuthorizationChallengeResponse): """ACME TLS-ALPN-01 challenge response. @@ -617,7 +469,3 @@ class DNSResponse(ChallengeResponse): """ return chall.check_validation(self.validation, account_public_key) - - -# Patching ourselves to warn about TLS-SNI challenge deprecation and removal. -sys.modules[__name__] = _TLSSNI01DeprecationModule(sys.modules[__name__]) diff --git a/acme/acme/challenges_test.py b/acme/acme/challenges_test.py index 9d3a92fa5..94641eaac 100644 --- a/acme/acme/challenges_test.py +++ b/acme/acme/challenges_test.py @@ -3,12 +3,10 @@ import unittest import josepy as jose import mock -import OpenSSL import requests from six.moves.urllib import parse as urllib_parse # pylint: disable=relative-import -from acme import errors from acme import test_util CERT = test_util.load_comparable_cert('cert.pem') @@ -259,150 +257,6 @@ class HTTP01Test(unittest.TestCase): self.msg.update(token=b'..').good_token) -class TLSSNI01ResponseTest(unittest.TestCase): - # pylint: disable=too-many-instance-attributes - - def setUp(self): - from acme.challenges import TLSSNI01 - self.chall = TLSSNI01( - token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e')) - - self.response = self.chall.response(KEY) - self.jmsg = { - 'resource': 'challenge', - 'type': 'tls-sni-01', - 'keyAuthorization': self.response.key_authorization, - } - - # pylint: disable=invalid-name - label1 = b'dc38d9c3fa1a4fdcc3a5501f2d38583f' - label2 = b'b7793728f084394f2a1afd459556bb5c' - self.z = label1 + label2 - self.z_domain = label1 + b'.' + label2 + b'.acme.invalid' - self.domain = 'foo.com' - - def test_z_and_domain(self): - self.assertEqual(self.z, self.response.z) - self.assertEqual(self.z_domain, self.response.z_domain) - - def test_to_partial_json(self): - self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'}, - self.response.to_partial_json()) - - def test_from_json(self): - from acme.challenges import TLSSNI01Response - self.assertEqual(self.response, TLSSNI01Response.from_json(self.jmsg)) - - def test_from_json_hashable(self): - from acme.challenges import TLSSNI01Response - hash(TLSSNI01Response.from_json(self.jmsg)) - - @mock.patch('acme.challenges.socket.gethostbyname') - @mock.patch('acme.challenges.crypto_util.probe_sni') - def test_probe_cert(self, mock_probe_sni, mock_gethostbyname): - mock_gethostbyname.return_value = '127.0.0.1' - self.response.probe_cert('foo.com') - mock_gethostbyname.assert_called_once_with('foo.com') - mock_probe_sni.assert_called_once_with( - host='127.0.0.1', port=self.response.PORT, - name=self.z_domain) - - self.response.probe_cert('foo.com', host='8.8.8.8') - mock_probe_sni.assert_called_with( - host='8.8.8.8', port=mock.ANY, name=mock.ANY) - - self.response.probe_cert('foo.com', port=1234) - mock_probe_sni.assert_called_with( - host=mock.ANY, port=1234, name=mock.ANY) - - self.response.probe_cert('foo.com', bar='baz') - mock_probe_sni.assert_called_with( - host=mock.ANY, port=mock.ANY, name=mock.ANY, bar='baz') - - self.response.probe_cert('foo.com', name=b'xxx') - mock_probe_sni.assert_called_with( - host=mock.ANY, port=mock.ANY, - name=self.z_domain) - - def test_gen_verify_cert(self): - key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem') - cert, key2 = self.response.gen_cert(key1) - self.assertEqual(key1, key2) - self.assertTrue(self.response.verify_cert(cert)) - - def test_gen_verify_cert_gen_key(self): - cert, key = self.response.gen_cert() - self.assertTrue(isinstance(key, OpenSSL.crypto.PKey)) - self.assertTrue(self.response.verify_cert(cert)) - - def test_verify_bad_cert(self): - self.assertFalse(self.response.verify_cert( - test_util.load_cert('cert.pem'))) - - def test_simple_verify_bad_key_authorization(self): - key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem')) - self.response.simple_verify(self.chall, "local", key2.public_key()) - - @mock.patch('acme.challenges.TLSSNI01Response.verify_cert', autospec=True) - def test_simple_verify(self, mock_verify_cert): - mock_verify_cert.return_value = mock.sentinel.verification - self.assertEqual( - mock.sentinel.verification, self.response.simple_verify( - self.chall, self.domain, KEY.public_key(), - cert=mock.sentinel.cert)) - mock_verify_cert.assert_called_once_with( - self.response, mock.sentinel.cert) - - @mock.patch('acme.challenges.TLSSNI01Response.probe_cert') - def test_simple_verify_false_on_probe_error(self, mock_probe_cert): - mock_probe_cert.side_effect = errors.Error - self.assertFalse(self.response.simple_verify( - self.chall, self.domain, KEY.public_key())) - - -class TLSSNI01Test(unittest.TestCase): - - def setUp(self): - self.jmsg = { - 'type': 'tls-sni-01', - 'token': 'a82d5ff8ef740d12881f6d3c2277ab2e', - } - from acme.challenges import TLSSNI01 - self.msg = TLSSNI01( - token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e')) - - def test_to_partial_json(self): - self.assertEqual(self.jmsg, self.msg.to_partial_json()) - - def test_from_json(self): - from acme.challenges import TLSSNI01 - self.assertEqual(self.msg, TLSSNI01.from_json(self.jmsg)) - - def test_from_json_hashable(self): - from acme.challenges import TLSSNI01 - hash(TLSSNI01.from_json(self.jmsg)) - - def test_from_json_invalid_token_length(self): - from acme.challenges import TLSSNI01 - self.jmsg['token'] = jose.encode_b64jose(b'abcd') - self.assertRaises( - jose.DeserializationError, TLSSNI01.from_json, self.jmsg) - - @mock.patch('acme.challenges.TLSSNI01Response.gen_cert') - def test_validation(self, mock_gen_cert): - mock_gen_cert.return_value = ('cert', 'key') - self.assertEqual(('cert', 'key'), self.msg.validation( - KEY, cert_key=mock.sentinel.cert_key)) - mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key) - - def test_deprecation_message(self): - with mock.patch('acme.warnings.warn') as mock_warn: - from acme.challenges import TLSSNI01 - assert TLSSNI01 - self.assertEqual(mock_warn.call_count, 1) - self.assertTrue('deprecated' in mock_warn.call_args[0][0]) - - class TLSALPN01ResponseTest(unittest.TestCase): # pylint: disable=too-many-instance-attributes diff --git a/acme/acme/standalone.py b/acme/acme/standalone.py index 69c35fb6f..b23b9b843 100644 --- a/acme/acme/standalone.py +++ b/acme/acme/standalone.py @@ -1,24 +1,17 @@ """Support for standalone client challenge solvers. """ -import argparse import collections import functools import logging -import os import socket -import sys import threading -import warnings from six.moves import BaseHTTPServer # type: ignore # pylint: disable=import-error from six.moves import http_client # pylint: disable=import-error from six.moves import socketserver # type: ignore # pylint: disable=import-error -import OpenSSL - from acme import challenges from acme import crypto_util from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module -from acme import _TLSSNI01DeprecationModule logger = logging.getLogger(__name__) @@ -133,35 +126,6 @@ class BaseDualNetworkedServers(object): self.threads = [] -class TLSSNI01Server(TLSServer, ACMEServerMixin): - """TLSSNI01 Server.""" - - def __init__(self, server_address, certs, ipv6=False): - TLSServer.__init__( - self, server_address, BaseRequestHandlerWithLogging, certs=certs, ipv6=ipv6) - - -class TLSSNI01DualNetworkedServers(BaseDualNetworkedServers): - """TLSSNI01Server Wrapper. Tries everything for both. Failures for one don't - affect the other.""" - - def __init__(self, *args, **kwargs): - BaseDualNetworkedServers.__init__(self, TLSSNI01Server, *args, **kwargs) - - -class BaseRequestHandlerWithLogging(socketserver.BaseRequestHandler): - """BaseRequestHandler with logging.""" - - def log_message(self, format, *args): # pylint: disable=redefined-builtin - """Log arbitrary message.""" - logger.debug("%s - - %s", self.client_address[0], format % args) - - def handle(self): - """Handle request.""" - self.log_message("Incoming request") - socketserver.BaseRequestHandler.handle(self) - - class HTTPServer(BaseHTTPServer.HTTPServer): """Generic HTTP Server.""" @@ -264,42 +228,3 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): """ return functools.partial( cls, simple_http_resources=simple_http_resources) - - -def simple_tls_sni_01_server(cli_args, forever=True): - """Run simple standalone TLSSNI01 server.""" - warnings.warn( - 'simple_tls_sni_01_server is deprecated and will be removed soon.', - DeprecationWarning, stacklevel=2) - logging.basicConfig(level=logging.DEBUG) - - parser = argparse.ArgumentParser() - parser.add_argument( - "-p", "--port", default=0, help="Port to serve at. By default " - "picks random free port.") - args = parser.parse_args(cli_args[1:]) - - certs = {} - - _, hosts, _ = next(os.walk('.')) # type: ignore # https://github.com/python/mypy/issues/465 - for host in hosts: - with open(os.path.join(host, "cert.pem")) as cert_file: - cert_contents = cert_file.read() - with open(os.path.join(host, "key.pem")) as key_file: - key_contents = key_file.read() - certs[host.encode()] = ( - OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, key_contents), - OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, cert_contents)) - - server = TLSSNI01Server(('', int(args.port)), certs=certs) - logger.info("Serving at https://%s:%s...", *server.socket.getsockname()[:2]) - if forever: # pragma: no cover - server.serve_forever() - else: - server.handle_request() - - -# Patching ourselves to warn about TLS-SNI challenge deprecation and removal. -sys.modules[__name__] = _TLSSNI01DeprecationModule(sys.modules[__name__]) diff --git a/acme/acme/standalone_test.py b/acme/acme/standalone_test.py index c14f82d39..9f9249b07 100644 --- a/acme/acme/standalone_test.py +++ b/acme/acme/standalone_test.py @@ -1,14 +1,7 @@ """Tests for acme.standalone.""" -import multiprocessing -import os -import shutil import socket import threading -import tempfile import unittest -import warnings -import time -from contextlib import closing from six.moves import http_client # pylint: disable=import-error from six.moves import socketserver # type: ignore # pylint: disable=import-error @@ -18,8 +11,6 @@ import mock import requests from acme import challenges -from acme import crypto_util -from acme import errors from acme import test_util from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module @@ -42,44 +33,6 @@ class TLSServerTest(unittest.TestCase): server.server_close() -class TLSSNI01ServerTest(unittest.TestCase): - """Test for acme.standalone.TLSSNI01Server.""" - - - def setUp(self): - self.certs = {b'localhost': ( - test_util.load_pyopenssl_private_key('rsa2048_key.pem'), - test_util.load_cert('rsa2048_cert.pem'), - )} - from acme.standalone import TLSSNI01Server - self.server = TLSSNI01Server(('localhost', 0), certs=self.certs) - self.thread = threading.Thread(target=self.server.serve_forever) - self.thread.start() - - def tearDown(self): - self.server.shutdown() - self.thread.join() - - def test_it(self): - host, port = self.server.socket.getsockname()[:2] - cert = crypto_util.probe_sni( - b'localhost', host=host, port=port, timeout=1) - self.assertEqual(jose.ComparableX509(cert), - jose.ComparableX509(self.certs[b'localhost'][1])) - - -class BaseRequestHandlerWithLoggingTest(unittest.TestCase): - """Test for acme.standalone.BaseRequestHandlerWithLogging.""" - - def test_it(self): - with mock.patch('acme.standalone.warnings.warn') as mock_warn: - # pylint: disable=unused-variable - from acme.standalone import BaseRequestHandlerWithLogging - self.assertTrue(mock_warn.called) - msg = mock_warn.call_args[0][0] - self.assertTrue(msg.startswith('BaseRequestHandlerWithLogging')) - - class HTTP01ServerTest(unittest.TestCase): """Tests for acme.standalone.HTTP01Server.""" @@ -183,33 +136,6 @@ class BaseDualNetworkedServersTest(unittest.TestCase): prev_port = port -class TLSSNI01DualNetworkedServersTest(unittest.TestCase): - """Test for acme.standalone.TLSSNI01DualNetworkedServers.""" - - - def setUp(self): - self.certs = {b'localhost': ( - test_util.load_pyopenssl_private_key('rsa2048_key.pem'), - test_util.load_cert('rsa2048_cert.pem'), - )} - from acme.standalone import TLSSNI01DualNetworkedServers - self.servers = TLSSNI01DualNetworkedServers(('localhost', 0), certs=self.certs) - self.servers.serve_forever() - - def tearDown(self): - self.servers.shutdown_and_server_close() - - def test_connect(self): - socknames = self.servers.getsocknames() - # connect to all addresses - for sockname in socknames: - host, port = sockname[:2] - cert = crypto_util.probe_sni( - b'localhost', host=host, port=port, timeout=1) - self.assertEqual(jose.ComparableX509(cert), - jose.ComparableX509(self.certs[b'localhost'][1])) - - class HTTP01DualNetworkedServersTest(unittest.TestCase): """Tests for acme.standalone.HTTP01DualNetworkedServers.""" @@ -260,66 +186,5 @@ class HTTP01DualNetworkedServersTest(unittest.TestCase): self.assertFalse(self._test_http01(add=False)) -class TestSimpleTLSSNI01Server(unittest.TestCase): - """Tests for acme.standalone.simple_tls_sni_01_server.""" - - - def setUp(self): - # mirror ../examples/standalone - self.test_cwd = tempfile.mkdtemp() - localhost_dir = os.path.join(self.test_cwd, 'localhost') - os.makedirs(localhost_dir) - shutil.copy(test_util.vector_path('rsa2048_cert.pem'), - os.path.join(localhost_dir, 'cert.pem')) - shutil.copy(test_util.vector_path('rsa2048_key.pem'), - os.path.join(localhost_dir, 'key.pem')) - - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.bind(('', 0)) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.port = sock.getsockname()[1] - - self.process = multiprocessing.Process(target=_simple_tls_sni_01_server_no_warnings, - args=(['path', '-p', str(self.port)],)) - self.old_cwd = os.getcwd() - os.chdir(self.test_cwd) - - def tearDown(self): - os.chdir(self.old_cwd) - if self.process.is_alive(): - self.process.terminate() - self.process.join(timeout=5) - # Check that we didn't timeout waiting for the process to - # terminate. - self.assertNotEqual(self.process.exitcode, None) - shutil.rmtree(self.test_cwd) - - @mock.patch('acme.standalone.TLSSNI01Server.handle_request') - def test_mock(self, handle): - _simple_tls_sni_01_server_no_warnings(cli_args=['path', '-p', str(self.port)], - forever=False) - self.assertEqual(handle.call_count, 1) - - def test_live(self): - self.process.start() - cert = None - for _ in range(50): - time.sleep(0.1) - try: - cert = crypto_util.probe_sni(b'localhost', b'127.0.0.1', self.port) - break - except errors.Error: # pragma: no cover - pass - self.assertEqual(jose.ComparableX509(cert), - test_util.load_comparable_cert('rsa2048_cert.pem')) - - -def _simple_tls_sni_01_server_no_warnings(*args, **kwargs): - with warnings.catch_warnings(): - warnings.filterwarnings('ignore', 'simple_tls.*') - from acme.standalone import simple_tls_sni_01_server - return simple_tls_sni_01_server(*args, **kwargs) - - if __name__ == "__main__": unittest.main() # pragma: no cover diff --git a/acme/acme/test_util.py b/acme/acme/test_util.py index 6d9cbc8dc..6737bff4e 100644 --- a/acme/acme/test_util.py +++ b/acme/acme/test_util.py @@ -12,12 +12,6 @@ import josepy as jose from OpenSSL import crypto -def vector_path(*names): - """Path to a test vector.""" - return pkg_resources.resource_filename( - __name__, os.path.join('testdata', *names)) - - def load_vector(*names): """Load contents of a test vector.""" # luckily, resource_string opens file in binary mode diff --git a/pytest.ini b/pytest.ini index 54ae4e9d6..6c2404056 100644 --- a/pytest.ini +++ b/pytest.ini @@ -13,7 +13,6 @@ filterwarnings = error ignore:decodestring:DeprecationWarning - ignore:(TLSSNI01|TLS-SNI-01):DeprecationWarning ignore:.*collections\.abc:DeprecationWarning ignore:The `color_scheme` argument is deprecated:DeprecationWarning:IPython.* ignore:.*get_systemd_os_info:DeprecationWarning