mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
Remove TLS-SNI objects in ACME (#7535)
* fixes #7214 * update changelog * remove unused import
This commit is contained in:
committed by
Adrien Ferrand
parent
d290fe464e
commit
641b60b8f0
@@ -12,6 +12,9 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
|
||||
|
||||
* Certbot's `config_changes` subcommand has been removed
|
||||
* `certbot.plugins.common.TLSSNI01` has been removed.
|
||||
* Deprecated attributes related to the TLS-SNI-01 challenge in
|
||||
`acme.challenges` and `acme.standalone`
|
||||
have been removed.
|
||||
* The functions `certbot.client.view_config_changes`,
|
||||
`certbot.main.config_changes`,
|
||||
`certbot.plugins.common.Installer.view_config_changes`,
|
||||
|
||||
@@ -21,30 +21,3 @@ for mod in list(sys.modules):
|
||||
# preserved (acme.jose.* is josepy.*)
|
||||
if mod == 'josepy' or mod.startswith('josepy.'):
|
||||
sys.modules['acme.' + mod.replace('josepy', 'jose', 1)] = sys.modules[mod]
|
||||
|
||||
|
||||
# This class takes a similar approach to the cryptography project to deprecate attributes
|
||||
# in public modules. See the _ModuleWithDeprecation class here:
|
||||
# https://github.com/pyca/cryptography/blob/91105952739442a74582d3e62b3d2111365b0dc7/src/cryptography/utils.py#L129
|
||||
class _TLSSNI01DeprecationModule(object):
|
||||
"""
|
||||
Internal class delegating to a module, and displaying warnings when
|
||||
attributes related to TLS-SNI-01 are accessed.
|
||||
"""
|
||||
def __init__(self, module):
|
||||
self.__dict__['_module'] = module
|
||||
|
||||
def __getattr__(self, attr):
|
||||
if 'TLSSNI01' in attr or attr == 'BaseRequestHandlerWithLogging':
|
||||
warnings.warn('{0} attribute is deprecated, and will be removed soon.'.format(attr),
|
||||
DeprecationWarning, stacklevel=2)
|
||||
return getattr(self._module, attr)
|
||||
|
||||
def __setattr__(self, attr, value): # pragma: no cover
|
||||
setattr(self._module, attr, value)
|
||||
|
||||
def __delattr__(self, attr): # pragma: no cover
|
||||
delattr(self._module, attr)
|
||||
|
||||
def __dir__(self): # pragma: no cover
|
||||
return ['_module'] + dir(self._module)
|
||||
|
||||
@@ -3,19 +3,13 @@ import abc
|
||||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
import requests
|
||||
import six
|
||||
|
||||
from acme import errors
|
||||
from acme import crypto_util
|
||||
from acme import fields
|
||||
from acme import _TLSSNI01DeprecationModule
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -367,148 +361,6 @@ class HTTP01(KeyAuthorizationChallenge):
|
||||
return self.key_authorization(account_key)
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class TLSSNI01Response(KeyAuthorizationChallengeResponse):
|
||||
"""ACME tls-sni-01 challenge response."""
|
||||
typ = "tls-sni-01"
|
||||
|
||||
DOMAIN_SUFFIX = b".acme.invalid"
|
||||
"""Domain name suffix."""
|
||||
|
||||
PORT = 443
|
||||
"""Verification port as defined by the protocol.
|
||||
|
||||
You can override it (e.g. for testing) by passing ``port`` to
|
||||
`simple_verify`.
|
||||
|
||||
"""
|
||||
|
||||
@property
|
||||
def z(self): # pylint: disable=invalid-name
|
||||
"""``z`` value used for verification.
|
||||
|
||||
:rtype bytes:
|
||||
|
||||
"""
|
||||
return hashlib.sha256(
|
||||
self.key_authorization.encode("utf-8")).hexdigest().lower().encode()
|
||||
|
||||
@property
|
||||
def z_domain(self):
|
||||
"""Domain name used for verification, generated from `z`.
|
||||
|
||||
:rtype bytes:
|
||||
|
||||
"""
|
||||
return self.z[:32] + b'.' + self.z[32:] + self.DOMAIN_SUFFIX
|
||||
|
||||
def gen_cert(self, key=None, bits=2048):
|
||||
"""Generate tls-sni-01 certificate.
|
||||
|
||||
:param OpenSSL.crypto.PKey key: Optional private key used in
|
||||
certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
:param int bits: Number of bits for newly generated key.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
if key is None:
|
||||
key = OpenSSL.crypto.PKey()
|
||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
|
||||
return crypto_util.gen_ss_cert(key, [
|
||||
# z_domain is too big to fit into CN, hence first dummy domain
|
||||
'dummy', self.z_domain.decode()], force_san=True), key
|
||||
|
||||
def probe_cert(self, domain, **kwargs):
|
||||
"""Probe tls-sni-01 challenge certificate.
|
||||
|
||||
:param unicode domain:
|
||||
|
||||
"""
|
||||
# TODO: domain is not necessary if host is provided
|
||||
if "host" not in kwargs:
|
||||
host = socket.gethostbyname(domain)
|
||||
logger.debug('%s resolved to %s', domain, host)
|
||||
kwargs["host"] = host
|
||||
|
||||
kwargs.setdefault("port", self.PORT)
|
||||
kwargs["name"] = self.z_domain
|
||||
# TODO: try different methods?
|
||||
return crypto_util.probe_sni(**kwargs)
|
||||
|
||||
def verify_cert(self, cert):
|
||||
"""Verify tls-sni-01 challenge certificate.
|
||||
|
||||
:param OpensSSL.crypto.X509 cert: Challenge certificate.
|
||||
|
||||
:returns: Whether the certificate was successfully verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
# pylint: disable=protected-access
|
||||
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
|
||||
logger.debug('Certificate %s. SANs: %s', cert.digest('sha256'), sans)
|
||||
return self.z_domain.decode() in sans
|
||||
|
||||
def simple_verify(self, chall, domain, account_public_key,
|
||||
cert=None, **kwargs):
|
||||
"""Simple verify.
|
||||
|
||||
Verify ``validation`` using ``account_public_key``, optionally
|
||||
probe tls-sni-01 certificate and check using `verify_cert`.
|
||||
|
||||
:param .challenges.TLSSNI01 chall: Corresponding challenge.
|
||||
:param str domain: Domain name being validated.
|
||||
:param JWK account_public_key:
|
||||
:param OpenSSL.crypto.X509 cert: Optional certificate. If not
|
||||
provided (``None``) certificate will be retrieved using
|
||||
`probe_cert`.
|
||||
:param int port: Port used to probe the certificate.
|
||||
|
||||
|
||||
:returns: ``True`` iff client's control of the domain has been
|
||||
verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not self.verify(chall, account_public_key):
|
||||
logger.debug("Verification of key authorization in response failed")
|
||||
return False
|
||||
|
||||
if cert is None:
|
||||
try:
|
||||
cert = self.probe_cert(domain=domain, **kwargs)
|
||||
except errors.Error as error:
|
||||
logger.debug(str(error), exc_info=True)
|
||||
return False
|
||||
|
||||
return self.verify_cert(cert)
|
||||
|
||||
|
||||
@Challenge.register # pylint: disable=too-many-ancestors
|
||||
class TLSSNI01(KeyAuthorizationChallenge):
|
||||
"""ACME tls-sni-01 challenge."""
|
||||
response_cls = TLSSNI01Response
|
||||
typ = response_cls.typ
|
||||
|
||||
# boulder#962, ietf-wg-acme#22
|
||||
#n = jose.Field("n", encoder=int, decoder=int)
|
||||
|
||||
def validation(self, account_key, **kwargs):
|
||||
"""Generate validation.
|
||||
|
||||
:param JWK account_key:
|
||||
:param OpenSSL.crypto.PKey cert_key: Optional private key used
|
||||
in certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
return self.response(account_key).gen_cert(key=kwargs.get('cert_key'))
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
||||
"""ACME TLS-ALPN-01 challenge response.
|
||||
@@ -617,7 +469,3 @@ class DNSResponse(ChallengeResponse):
|
||||
|
||||
"""
|
||||
return chall.check_validation(self.validation, account_public_key)
|
||||
|
||||
|
||||
# Patching ourselves to warn about TLS-SNI challenge deprecation and removal.
|
||||
sys.modules[__name__] = _TLSSNI01DeprecationModule(sys.modules[__name__])
|
||||
|
||||
@@ -3,12 +3,10 @@ import unittest
|
||||
|
||||
import josepy as jose
|
||||
import mock
|
||||
import OpenSSL
|
||||
import requests
|
||||
|
||||
from six.moves.urllib import parse as urllib_parse # pylint: disable=relative-import
|
||||
|
||||
from acme import errors
|
||||
from acme import test_util
|
||||
|
||||
CERT = test_util.load_comparable_cert('cert.pem')
|
||||
@@ -259,150 +257,6 @@ class HTTP01Test(unittest.TestCase):
|
||||
self.msg.update(token=b'..').good_token)
|
||||
|
||||
|
||||
class TLSSNI01ResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import TLSSNI01
|
||||
self.chall = TLSSNI01(
|
||||
token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
|
||||
self.response = self.chall.response(KEY)
|
||||
self.jmsg = {
|
||||
'resource': 'challenge',
|
||||
'type': 'tls-sni-01',
|
||||
'keyAuthorization': self.response.key_authorization,
|
||||
}
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
label1 = b'dc38d9c3fa1a4fdcc3a5501f2d38583f'
|
||||
label2 = b'b7793728f084394f2a1afd459556bb5c'
|
||||
self.z = label1 + label2
|
||||
self.z_domain = label1 + b'.' + label2 + b'.acme.invalid'
|
||||
self.domain = 'foo.com'
|
||||
|
||||
def test_z_and_domain(self):
|
||||
self.assertEqual(self.z, self.response.z)
|
||||
self.assertEqual(self.z_domain, self.response.z_domain)
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'},
|
||||
self.response.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import TLSSNI01Response
|
||||
self.assertEqual(self.response, TLSSNI01Response.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import TLSSNI01Response
|
||||
hash(TLSSNI01Response.from_json(self.jmsg))
|
||||
|
||||
@mock.patch('acme.challenges.socket.gethostbyname')
|
||||
@mock.patch('acme.challenges.crypto_util.probe_sni')
|
||||
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
|
||||
mock_gethostbyname.return_value = '127.0.0.1'
|
||||
self.response.probe_cert('foo.com')
|
||||
mock_gethostbyname.assert_called_once_with('foo.com')
|
||||
mock_probe_sni.assert_called_once_with(
|
||||
host='127.0.0.1', port=self.response.PORT,
|
||||
name=self.z_domain)
|
||||
|
||||
self.response.probe_cert('foo.com', host='8.8.8.8')
|
||||
mock_probe_sni.assert_called_with(
|
||||
host='8.8.8.8', port=mock.ANY, name=mock.ANY)
|
||||
|
||||
self.response.probe_cert('foo.com', port=1234)
|
||||
mock_probe_sni.assert_called_with(
|
||||
host=mock.ANY, port=1234, name=mock.ANY)
|
||||
|
||||
self.response.probe_cert('foo.com', bar='baz')
|
||||
mock_probe_sni.assert_called_with(
|
||||
host=mock.ANY, port=mock.ANY, name=mock.ANY, bar='baz')
|
||||
|
||||
self.response.probe_cert('foo.com', name=b'xxx')
|
||||
mock_probe_sni.assert_called_with(
|
||||
host=mock.ANY, port=mock.ANY,
|
||||
name=self.z_domain)
|
||||
|
||||
def test_gen_verify_cert(self):
|
||||
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
cert, key2 = self.response.gen_cert(key1)
|
||||
self.assertEqual(key1, key2)
|
||||
self.assertTrue(self.response.verify_cert(cert))
|
||||
|
||||
def test_gen_verify_cert_gen_key(self):
|
||||
cert, key = self.response.gen_cert()
|
||||
self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
|
||||
self.assertTrue(self.response.verify_cert(cert))
|
||||
|
||||
def test_verify_bad_cert(self):
|
||||
self.assertFalse(self.response.verify_cert(
|
||||
test_util.load_cert('cert.pem')))
|
||||
|
||||
def test_simple_verify_bad_key_authorization(self):
|
||||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
|
||||
self.response.simple_verify(self.chall, "local", key2.public_key())
|
||||
|
||||
@mock.patch('acme.challenges.TLSSNI01Response.verify_cert', autospec=True)
|
||||
def test_simple_verify(self, mock_verify_cert):
|
||||
mock_verify_cert.return_value = mock.sentinel.verification
|
||||
self.assertEqual(
|
||||
mock.sentinel.verification, self.response.simple_verify(
|
||||
self.chall, self.domain, KEY.public_key(),
|
||||
cert=mock.sentinel.cert))
|
||||
mock_verify_cert.assert_called_once_with(
|
||||
self.response, mock.sentinel.cert)
|
||||
|
||||
@mock.patch('acme.challenges.TLSSNI01Response.probe_cert')
|
||||
def test_simple_verify_false_on_probe_error(self, mock_probe_cert):
|
||||
mock_probe_cert.side_effect = errors.Error
|
||||
self.assertFalse(self.response.simple_verify(
|
||||
self.chall, self.domain, KEY.public_key()))
|
||||
|
||||
|
||||
class TLSSNI01Test(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.jmsg = {
|
||||
'type': 'tls-sni-01',
|
||||
'token': 'a82d5ff8ef740d12881f6d3c2277ab2e',
|
||||
}
|
||||
from acme.challenges import TLSSNI01
|
||||
self.msg = TLSSNI01(
|
||||
token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import TLSSNI01
|
||||
self.assertEqual(self.msg, TLSSNI01.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import TLSSNI01
|
||||
hash(TLSSNI01.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_invalid_token_length(self):
|
||||
from acme.challenges import TLSSNI01
|
||||
self.jmsg['token'] = jose.encode_b64jose(b'abcd')
|
||||
self.assertRaises(
|
||||
jose.DeserializationError, TLSSNI01.from_json, self.jmsg)
|
||||
|
||||
@mock.patch('acme.challenges.TLSSNI01Response.gen_cert')
|
||||
def test_validation(self, mock_gen_cert):
|
||||
mock_gen_cert.return_value = ('cert', 'key')
|
||||
self.assertEqual(('cert', 'key'), self.msg.validation(
|
||||
KEY, cert_key=mock.sentinel.cert_key))
|
||||
mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key)
|
||||
|
||||
def test_deprecation_message(self):
|
||||
with mock.patch('acme.warnings.warn') as mock_warn:
|
||||
from acme.challenges import TLSSNI01
|
||||
assert TLSSNI01
|
||||
self.assertEqual(mock_warn.call_count, 1)
|
||||
self.assertTrue('deprecated' in mock_warn.call_args[0][0])
|
||||
|
||||
|
||||
class TLSALPN01ResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
|
||||
@@ -1,24 +1,17 @@
|
||||
"""Support for standalone client challenge solvers. """
|
||||
import argparse
|
||||
import collections
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import warnings
|
||||
|
||||
from six.moves import BaseHTTPServer # type: ignore # pylint: disable=import-error
|
||||
from six.moves import http_client # pylint: disable=import-error
|
||||
from six.moves import socketserver # type: ignore # pylint: disable=import-error
|
||||
|
||||
import OpenSSL
|
||||
|
||||
from acme import challenges
|
||||
from acme import crypto_util
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from acme import _TLSSNI01DeprecationModule
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -133,35 +126,6 @@ class BaseDualNetworkedServers(object):
|
||||
self.threads = []
|
||||
|
||||
|
||||
class TLSSNI01Server(TLSServer, ACMEServerMixin):
|
||||
"""TLSSNI01 Server."""
|
||||
|
||||
def __init__(self, server_address, certs, ipv6=False):
|
||||
TLSServer.__init__(
|
||||
self, server_address, BaseRequestHandlerWithLogging, certs=certs, ipv6=ipv6)
|
||||
|
||||
|
||||
class TLSSNI01DualNetworkedServers(BaseDualNetworkedServers):
|
||||
"""TLSSNI01Server Wrapper. Tries everything for both. Failures for one don't
|
||||
affect the other."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
BaseDualNetworkedServers.__init__(self, TLSSNI01Server, *args, **kwargs)
|
||||
|
||||
|
||||
class BaseRequestHandlerWithLogging(socketserver.BaseRequestHandler):
|
||||
"""BaseRequestHandler with logging."""
|
||||
|
||||
def log_message(self, format, *args): # pylint: disable=redefined-builtin
|
||||
"""Log arbitrary message."""
|
||||
logger.debug("%s - - %s", self.client_address[0], format % args)
|
||||
|
||||
def handle(self):
|
||||
"""Handle request."""
|
||||
self.log_message("Incoming request")
|
||||
socketserver.BaseRequestHandler.handle(self)
|
||||
|
||||
|
||||
class HTTPServer(BaseHTTPServer.HTTPServer):
|
||||
"""Generic HTTP Server."""
|
||||
|
||||
@@ -264,42 +228,3 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
"""
|
||||
return functools.partial(
|
||||
cls, simple_http_resources=simple_http_resources)
|
||||
|
||||
|
||||
def simple_tls_sni_01_server(cli_args, forever=True):
|
||||
"""Run simple standalone TLSSNI01 server."""
|
||||
warnings.warn(
|
||||
'simple_tls_sni_01_server is deprecated and will be removed soon.',
|
||||
DeprecationWarning, stacklevel=2)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"-p", "--port", default=0, help="Port to serve at. By default "
|
||||
"picks random free port.")
|
||||
args = parser.parse_args(cli_args[1:])
|
||||
|
||||
certs = {}
|
||||
|
||||
_, hosts, _ = next(os.walk('.')) # type: ignore # https://github.com/python/mypy/issues/465
|
||||
for host in hosts:
|
||||
with open(os.path.join(host, "cert.pem")) as cert_file:
|
||||
cert_contents = cert_file.read()
|
||||
with open(os.path.join(host, "key.pem")) as key_file:
|
||||
key_contents = key_file.read()
|
||||
certs[host.encode()] = (
|
||||
OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, key_contents),
|
||||
OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert_contents))
|
||||
|
||||
server = TLSSNI01Server(('', int(args.port)), certs=certs)
|
||||
logger.info("Serving at https://%s:%s...", *server.socket.getsockname()[:2])
|
||||
if forever: # pragma: no cover
|
||||
server.serve_forever()
|
||||
else:
|
||||
server.handle_request()
|
||||
|
||||
|
||||
# Patching ourselves to warn about TLS-SNI challenge deprecation and removal.
|
||||
sys.modules[__name__] = _TLSSNI01DeprecationModule(sys.modules[__name__])
|
||||
|
||||
@@ -1,14 +1,7 @@
|
||||
"""Tests for acme.standalone."""
|
||||
import multiprocessing
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import threading
|
||||
import tempfile
|
||||
import unittest
|
||||
import warnings
|
||||
import time
|
||||
from contextlib import closing
|
||||
|
||||
from six.moves import http_client # pylint: disable=import-error
|
||||
from six.moves import socketserver # type: ignore # pylint: disable=import-error
|
||||
@@ -18,8 +11,6 @@ import mock
|
||||
import requests
|
||||
|
||||
from acme import challenges
|
||||
from acme import crypto_util
|
||||
from acme import errors
|
||||
from acme import test_util
|
||||
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
@@ -42,44 +33,6 @@ class TLSServerTest(unittest.TestCase):
|
||||
server.server_close()
|
||||
|
||||
|
||||
class TLSSNI01ServerTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.TLSSNI01Server."""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.certs = {b'localhost': (
|
||||
test_util.load_pyopenssl_private_key('rsa2048_key.pem'),
|
||||
test_util.load_cert('rsa2048_cert.pem'),
|
||||
)}
|
||||
from acme.standalone import TLSSNI01Server
|
||||
self.server = TLSSNI01Server(('localhost', 0), certs=self.certs)
|
||||
self.thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.thread.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.server.shutdown()
|
||||
self.thread.join()
|
||||
|
||||
def test_it(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
cert = crypto_util.probe_sni(
|
||||
b'localhost', host=host, port=port, timeout=1)
|
||||
self.assertEqual(jose.ComparableX509(cert),
|
||||
jose.ComparableX509(self.certs[b'localhost'][1]))
|
||||
|
||||
|
||||
class BaseRequestHandlerWithLoggingTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.BaseRequestHandlerWithLogging."""
|
||||
|
||||
def test_it(self):
|
||||
with mock.patch('acme.standalone.warnings.warn') as mock_warn:
|
||||
# pylint: disable=unused-variable
|
||||
from acme.standalone import BaseRequestHandlerWithLogging
|
||||
self.assertTrue(mock_warn.called)
|
||||
msg = mock_warn.call_args[0][0]
|
||||
self.assertTrue(msg.startswith('BaseRequestHandlerWithLogging'))
|
||||
|
||||
|
||||
class HTTP01ServerTest(unittest.TestCase):
|
||||
"""Tests for acme.standalone.HTTP01Server."""
|
||||
|
||||
@@ -183,33 +136,6 @@ class BaseDualNetworkedServersTest(unittest.TestCase):
|
||||
prev_port = port
|
||||
|
||||
|
||||
class TLSSNI01DualNetworkedServersTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.TLSSNI01DualNetworkedServers."""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.certs = {b'localhost': (
|
||||
test_util.load_pyopenssl_private_key('rsa2048_key.pem'),
|
||||
test_util.load_cert('rsa2048_cert.pem'),
|
||||
)}
|
||||
from acme.standalone import TLSSNI01DualNetworkedServers
|
||||
self.servers = TLSSNI01DualNetworkedServers(('localhost', 0), certs=self.certs)
|
||||
self.servers.serve_forever()
|
||||
|
||||
def tearDown(self):
|
||||
self.servers.shutdown_and_server_close()
|
||||
|
||||
def test_connect(self):
|
||||
socknames = self.servers.getsocknames()
|
||||
# connect to all addresses
|
||||
for sockname in socknames:
|
||||
host, port = sockname[:2]
|
||||
cert = crypto_util.probe_sni(
|
||||
b'localhost', host=host, port=port, timeout=1)
|
||||
self.assertEqual(jose.ComparableX509(cert),
|
||||
jose.ComparableX509(self.certs[b'localhost'][1]))
|
||||
|
||||
|
||||
class HTTP01DualNetworkedServersTest(unittest.TestCase):
|
||||
"""Tests for acme.standalone.HTTP01DualNetworkedServers."""
|
||||
|
||||
@@ -260,66 +186,5 @@ class HTTP01DualNetworkedServersTest(unittest.TestCase):
|
||||
self.assertFalse(self._test_http01(add=False))
|
||||
|
||||
|
||||
class TestSimpleTLSSNI01Server(unittest.TestCase):
|
||||
"""Tests for acme.standalone.simple_tls_sni_01_server."""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
# mirror ../examples/standalone
|
||||
self.test_cwd = tempfile.mkdtemp()
|
||||
localhost_dir = os.path.join(self.test_cwd, 'localhost')
|
||||
os.makedirs(localhost_dir)
|
||||
shutil.copy(test_util.vector_path('rsa2048_cert.pem'),
|
||||
os.path.join(localhost_dir, 'cert.pem'))
|
||||
shutil.copy(test_util.vector_path('rsa2048_key.pem'),
|
||||
os.path.join(localhost_dir, 'key.pem'))
|
||||
|
||||
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
|
||||
sock.bind(('', 0))
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.port = sock.getsockname()[1]
|
||||
|
||||
self.process = multiprocessing.Process(target=_simple_tls_sni_01_server_no_warnings,
|
||||
args=(['path', '-p', str(self.port)],))
|
||||
self.old_cwd = os.getcwd()
|
||||
os.chdir(self.test_cwd)
|
||||
|
||||
def tearDown(self):
|
||||
os.chdir(self.old_cwd)
|
||||
if self.process.is_alive():
|
||||
self.process.terminate()
|
||||
self.process.join(timeout=5)
|
||||
# Check that we didn't timeout waiting for the process to
|
||||
# terminate.
|
||||
self.assertNotEqual(self.process.exitcode, None)
|
||||
shutil.rmtree(self.test_cwd)
|
||||
|
||||
@mock.patch('acme.standalone.TLSSNI01Server.handle_request')
|
||||
def test_mock(self, handle):
|
||||
_simple_tls_sni_01_server_no_warnings(cli_args=['path', '-p', str(self.port)],
|
||||
forever=False)
|
||||
self.assertEqual(handle.call_count, 1)
|
||||
|
||||
def test_live(self):
|
||||
self.process.start()
|
||||
cert = None
|
||||
for _ in range(50):
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
cert = crypto_util.probe_sni(b'localhost', b'127.0.0.1', self.port)
|
||||
break
|
||||
except errors.Error: # pragma: no cover
|
||||
pass
|
||||
self.assertEqual(jose.ComparableX509(cert),
|
||||
test_util.load_comparable_cert('rsa2048_cert.pem'))
|
||||
|
||||
|
||||
def _simple_tls_sni_01_server_no_warnings(*args, **kwargs):
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', 'simple_tls.*')
|
||||
from acme.standalone import simple_tls_sni_01_server
|
||||
return simple_tls_sni_01_server(*args, **kwargs)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
@@ -12,12 +12,6 @@ import josepy as jose
|
||||
from OpenSSL import crypto
|
||||
|
||||
|
||||
def vector_path(*names):
|
||||
"""Path to a test vector."""
|
||||
return pkg_resources.resource_filename(
|
||||
__name__, os.path.join('testdata', *names))
|
||||
|
||||
|
||||
def load_vector(*names):
|
||||
"""Load contents of a test vector."""
|
||||
# luckily, resource_string opens file in binary mode
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
filterwarnings =
|
||||
error
|
||||
ignore:decodestring:DeprecationWarning
|
||||
ignore:(TLSSNI01|TLS-SNI-01):DeprecationWarning
|
||||
ignore:.*collections\.abc:DeprecationWarning
|
||||
ignore:The `color_scheme` argument is deprecated:DeprecationWarning:IPython.*
|
||||
ignore:.*get_systemd_os_info:DeprecationWarning
|
||||
|
||||
Reference in New Issue
Block a user