diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 89bc68966..f32783830 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -1,13 +1,9 @@ """ACME Identifier Validation Challenges.""" -import binascii import functools import hashlib import logging -import os import socket -from cryptography.hazmat.backends import default_backend -from cryptography import x509 import OpenSSL import requests @@ -54,43 +50,45 @@ class SimpleHTTP(DVChallenge): """ typ = "simpleHttp" - token = jose.Field("token") + + TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec + """Minimum size of the :attr:`token` in bytes.""" + + # TODO: acme-spec doesn't specify token as base64-encoded value + token = jose.Field( + "token", encoder=jose.encode_b64jose, decoder=functools.partial( + jose.decode_b64jose, size=TOKEN_SIZE, minimum=True)) + + @property + def good_token(self): # XXX: @token.decoder + """Is `token` good? + + .. todo:: acme-spec wants "It MUST NOT contain any non-ASCII + characters", but it should also warrant that it doesn't + contain ".." or "/"... + + """ + # TODO: check that path combined with uri does not go above + # URI_ROOT_PATH! + return b'..' not in self.token and b'/' not in self.token @ChallengeResponse.register class SimpleHTTPResponse(ChallengeResponse): """ACME "simpleHttp" challenge response. - :ivar unicode path: - :ivar unicode tls: + :ivar bool tls: """ typ = "simpleHttp" - path = jose.Field("path") tls = jose.Field("tls", default=True, omitempty=True) URI_ROOT_PATH = ".well-known/acme-challenge" """URI root path for the server provisioned resource.""" - _URI_TEMPLATE = "{scheme}://{domain}/" + URI_ROOT_PATH + "/{path}" + _URI_TEMPLATE = "{scheme}://{domain}/" + URI_ROOT_PATH + "/{token}" - MAX_PATH_LEN = 25 - """Maximum allowed `path` length.""" - - CONTENT_TYPE = "text/plain" - - @property - def good_path(self): - """Is `path` good? - - .. todo:: acme-spec: "The value MUST be comprised entirely of - characters from the URL-safe alphabet for Base64 encoding - [RFC4648]", base64.b64decode ignores those characters - - """ - # TODO: check that path combined with uri does not go above - # URI_ROOT_PATH! - return len(self.path) <= 25 + CONTENT_TYPE = "application/jose+json" @property def scheme(self): @@ -102,19 +100,73 @@ class SimpleHTTPResponse(ChallengeResponse): """Port that the ACME client should be listening for validation.""" return 443 if self.tls else 80 - def uri(self, domain): + def uri(self, domain, chall): """Create an URI to the provisioned resource. Forms an URI to the HTTPS server provisioned resource (containing :attr:`~SimpleHTTP.token`). :param unicode domain: Domain name being verified. + :param challenges.SimpleHTTP chall: """ return self._URI_TEMPLATE.format( - scheme=self.scheme, domain=domain, path=self.path) + scheme=self.scheme, domain=domain, token=chall.encode("token")) - def simple_verify(self, chall, domain, port=None): + def gen_resource(self, chall): + """Generate provisioned resource. + + :param .SimpleHTTP chall: + :rtype: SimpleHTTPProvisionedResource + + """ + return SimpleHTTPProvisionedResource(token=chall.token, tls=self.tls) + + def gen_validation(self, chall, account_key, alg=jose.RS256, **kwargs): + """Generate validation. + + :param .SimpleHTTP chall: + :param .JWK account_key: Private account key. + :param .JWA alg: + + :returns: `.SimpleHTTPProvisionedResource` signed in `.JWS` + :rtype: .JWS + + """ + return jose.JWS.sign( + payload=self.gen_resource(chall).json_dumps( + sort_keys=True).encode('utf-8'), + key=account_key, alg=alg, **kwargs) + + def check_validation(self, validation, chall, account_public_key): + """Check validation. + + :param .JWS validation: + :param .SimpleHTTP chall: + :type account_public_key: + `~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` + wrapped in `.ComparableKey + + :rtype: bool + + """ + if not validation.verify(key=account_public_key): + return False + + try: + resource = SimpleHTTPProvisionedResource.json_loads( + validation.payload.decode('utf-8')) + except jose.DeserializationError as error: + logger.debug(error) + return False + + return resource.token == chall.token and resource.tls == self.tls + + def simple_verify(self, chall, domain, account_public_key, port=None): """Simple verify. According to the ACME specification, "the ACME server MUST @@ -123,6 +175,16 @@ class SimpleHTTPResponse(ChallengeResponse): :param .SimpleHTTP chall: Corresponding challenge. :param unicode domain: Domain name being verified. + :param account_public_key: Public key for the key pair + being authorized. If ``None`` key verification is not + performed! + :type account_public_key: + `~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` + wrapped in `.ComparableKey :param int port: Port used in the validation. :returns: ``True`` iff validation is successful, ``False`` @@ -138,76 +200,67 @@ class SimpleHTTPResponse(ChallengeResponse): "Using non-standard port for SimpleHTTP verification: %s", port) domain += ":{0}".format(port) - uri = self.uri(domain) + uri = self.uri(domain, chall) logger.debug("Verifying %s at %s...", chall.typ, uri) try: http_response = requests.get(uri, verify=False) except requests.exceptions.RequestException as error: logger.error("Unable to reach %s: %s", uri, error) return False - logger.debug( - "Received %s. Headers: %s", http_response, http_response.headers) + logger.debug("Received %s: %s. Headers: %s", http_response, + http_response.text, http_response.headers) - good_token = http_response.text == chall.token - if not good_token: - logger.error( - "Unable to verify %s! Expected: %r, returned: %r.", - uri, chall.token, http_response.text) - # TODO: spec contradicts itself, c.f. - # https://github.com/letsencrypt/acme-spec/pull/156/files#r33136438 - good_ct = self.CONTENT_TYPE == http_response.headers.get( - "Content-Type", self.CONTENT_TYPE) - return self.good_path and good_ct and good_token + if self.CONTENT_TYPE != http_response.headers.get( + "Content-Type", self.CONTENT_TYPE): + return False + + try: + validation = jose.JWS.json_loads(http_response.text) + except jose.DeserializationError as error: + logger.debug(error) + return False + + return self.check_validation(validation, chall, account_public_key) + + +class SimpleHTTPProvisionedResource(jose.JSONObjectWithFields): + """SimpleHTTP provisioned resource.""" + typ = fields.Fixed("type", SimpleHTTP.typ) + token = SimpleHTTP._fields["token"] + # If the "tls" field is not included in the response, then + # validation object MUST have its "tls" field set to "true". + tls = jose.Field("tls", omitempty=False) @Challenge.register class DVSNI(DVChallenge): """ACME "dvsni" challenge. - :ivar bytes r: Random data, **not** base64-encoded. - :ivar bytes nonce: Random data, **not** hex-encoded. + :ivar bytes token: Random data, **not** base64-encoded. """ typ = "dvsni" - DOMAIN_SUFFIX = b".acme.invalid" - """Domain name suffix.""" - - R_SIZE = 32 - """Required size of the :attr:`r` in bytes.""" - - NONCE_SIZE = 16 - """Required size of the :attr:`nonce` in bytes.""" - PORT = 443 """Port to perform DVSNI challenge.""" - r = jose.Field("r", encoder=jose.encode_b64jose, # pylint: disable=invalid-name - decoder=functools.partial(jose.decode_b64jose, size=R_SIZE)) - nonce = jose.Field("nonce", encoder=jose.encode_hex16, - decoder=functools.partial(functools.partial( - jose.decode_hex16, size=NONCE_SIZE))) + TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec + """Minimum size of the :attr:`token` in bytes.""" - @property - def nonce_domain(self): - """Domain name used in SNI. + token = jose.Field( + "token", encoder=jose.encode_b64jose, decoder=functools.partial( + jose.decode_b64jose, size=TOKEN_SIZE, minimum=True)) - :rtype: bytes + def gen_response(self, account_key, alg=jose.RS256, **kwargs): + """Generate response. + + :param .JWK account_key: Private account key. + :rtype: .DVSNIResponse """ - return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX - - def probe_cert(self, domain, **kwargs): - """Probe DVSNI challenge certificate.""" - host = socket.gethostbyname(domain) - logging.debug('%s resolved to %s', domain, host) - - kwargs.setdefault("host", host) - kwargs.setdefault("port", self.PORT) - kwargs["name"] = self.nonce_domain - # TODO: try different methods? - # pylint: disable=protected-access - return crypto_util._probe_sni(**kwargs) + return DVSNIResponse(validation=jose.JWS.sign( + payload=self.json_dumps(sort_keys=True).encode('utf-8'), + key=account_key, alg=alg, **kwargs)) @ChallengeResponse.register @@ -219,105 +272,137 @@ class DVSNIResponse(ChallengeResponse): """ typ = "dvsni" - DOMAIN_SUFFIX = DVSNI.DOMAIN_SUFFIX + DOMAIN_SUFFIX = b".acme.invalid" """Domain name suffix.""" - S_SIZE = 32 - """Required size of the :attr:`s` in bytes.""" + PORT = DVSNI.PORT + """Port to perform DVSNI challenge.""" - s = jose.Field("s", encoder=jose.encode_b64jose, # pylint: disable=invalid-name - decoder=functools.partial(jose.decode_b64jose, size=S_SIZE)) + validation = jose.Field("validation", decoder=jose.JWS.from_json) - def __init__(self, s=None, *args, **kwargs): - s = os.urandom(self.S_SIZE) if s is None else s - super(DVSNIResponse, self).__init__(s=s, *args, **kwargs) - - def z(self, chall): # pylint: disable=invalid-name - """Compute the parameter ``z``. - - :param challenge: Corresponding challenge. - :type challenge: :class:`DVSNI` + @property + def z(self): # pylint: disable=invalid-name + """The ``z`` parameter. :rtype: bytes """ - z = hashlib.new("sha256") # pylint: disable=invalid-name - z.update(chall.r) - z.update(self.s) - return z.hexdigest().encode() + # Instance of 'Field' has no 'signature' member + # pylint: disable=no-member + return hashlib.sha256(self.validation.signature.encode( + "signature").encode("utf-8")).hexdigest().encode() - def z_domain(self, chall): + @property + def z_domain(self): """Domain name for certificate subjectAltName. - :rtype bytes: + :rtype: bytes """ - return self.z(chall) + self.DOMAIN_SUFFIX + z = self.z # pylint: disable=invalid-name + return z[:32] + b'.' + z[32:] + self.DOMAIN_SUFFIX - def gen_cert(self, chall, domain, key): + @property + def chall(self): + """Get challenge encoded in the `validation` payload. + + :rtype: DVSNI + + """ + # pylint: disable=no-member + return DVSNI.json_loads(self.validation.payload.decode('utf-8')) + + def gen_cert(self, key=None, bits=2048): """Generate DVSNI certificate. - :param .DVSNI chall: Corresponding challenge. - :param unicode domain: - :param OpenSSL.crypto.PKey + :param OpenSSL.crypto.PKey key: Optional private key used in + certificate generation. If not provided (``None``), then + fresh key will be generated. + :param int bits: Number of bits for newly generated key. + + :rtype: `tuple` of `OpenSSL.crypto.X509` and + `OpenSSL.crypto.PKey` """ + if key is None: + key = OpenSSL.crypto.PKey() + key.generate_key(OpenSSL.crypto.TYPE_RSA, bits) return crypto_util.gen_ss_cert(key, [ - domain, chall.nonce_domain.decode(), self.z_domain(chall).decode()]) + # z_domain is too big to fit into CN, hence first dummy domain + 'dummy', self.z_domain.decode()], force_san=True), key - def simple_verify(self, chall, domain, public_key, **kwargs): + def probe_cert(self, domain, **kwargs): + """Probe DVSNI challenge certificate. + + :param unicode domain: + + """ + host = socket.gethostbyname(domain) + logging.debug('%s resolved to %s', domain, host) + + kwargs.setdefault("host", host) + kwargs.setdefault("port", self.PORT) + kwargs["name"] = self.z_domain + # TODO: try different methods? + # pylint: disable=protected-access + return crypto_util._probe_sni(**kwargs) + + def verify_cert(self, cert): + """Verify DVSNI challenge certificate.""" + # pylint: disable=protected-access + sans = crypto_util._pyopenssl_cert_or_req_san(cert) + logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans) + return self.z_domain.decode() in sans + + def simple_verify(self, chall, domain, account_public_key, + cert=None, **kwargs): """Simple verify. - Probes DVSNI certificate and checks it using `verify_cert`; - hence all arguments documented in `verify_cert`. - - """ - try: - cert = chall.probe_cert(domain=domain, **kwargs) - except errors.Error as error: - logger.debug(error, exc_info=True) - return False - return self.verify_cert(chall, domain, public_key, cert) - - def verify_cert(self, chall, domain, public_key, cert): - """Verify DVSNI certificate. + Verify ``validation`` using ``account_public_key``, optionally + probe DVSNI certificate and check using `verify_cert`. :param .challenges.DVSNI chall: Corresponding challenge. :param str domain: Domain name being validated. - :param public_key: Public key for the key pair - being authorized. If ``None`` key verification is not - performed! - :type public_key: + :type account_public_key: `~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey` or `~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey` or `~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` wrapped in `.ComparableKey - :param OpenSSL.crypto.X509 cert: + :param OpenSSL.crypto.X509 cert: Optional certificate. If not + provided (``None``) certificate will be retrieved using + `probe_cert`. :returns: ``True`` iff client's control of the domain has been verified, ``False`` otherwise. :rtype: bool """ - # TODO: check "It is a valid self-signed certificate" and - # return False if not - - # pylint: disable=protected-access - sans = crypto_util._pyopenssl_cert_or_req_san(cert) - logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans) - - cert = x509.load_der_x509_certificate( - OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert), - default_backend()) - - if public_key is None: - logging.warn('No key verification is performed') - elif public_key != jose.ComparableKey(cert.public_key()): + # pylint: disable=no-member + if not self.validation.verify(key=account_public_key): return False - return domain in sans and self.z_domain(chall).decode() in sans + # TODO: it's not checked that payload has exectly 2 fields! + try: + decoded_chall = self.chall + except jose.DeserializationError as error: + logger.debug(error, exc_info=True) + return False + + if decoded_chall.token != chall.token: + logger.debug("Wrong token: expected %r, found %r", + chall.token, decoded_chall.token) + return False + + if cert is None: + try: + cert = self.probe_cert(domain=domain, **kwargs) + except errors.Error as error: + logger.debug(error, exc_info=True) + return False + + return self.verify_cert(cert) @Challenge.register @@ -347,23 +432,6 @@ class RecoveryContactResponse(ChallengeResponse): token = jose.Field("token", omitempty=True) -@Challenge.register -class RecoveryToken(ContinuityChallenge): - """ACME "recoveryToken" challenge.""" - typ = "recoveryToken" - - -@ChallengeResponse.register -class RecoveryTokenResponse(ChallengeResponse): - """ACME "recoveryToken" challenge response. - - :ivar unicode token: - - """ - typ = "recoveryToken" - token = jose.Field("token", omitempty=True) - - @Challenge.register class ProofOfPossession(ContinuityChallenge): """ACME "proofOfPossession" challenge. diff --git a/acme/acme/challenges_test.py b/acme/acme/challenges_test.py index e4ec37362..61cca498c 100644 --- a/acme/acme/challenges_test.py +++ b/acme/acme/challenges_test.py @@ -22,10 +22,11 @@ class SimpleHTTPTest(unittest.TestCase): def setUp(self): from acme.challenges import SimpleHTTP self.msg = SimpleHTTP( - token='evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA') + token=jose.decode_b64jose( + 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA')) self.jmsg = { 'type': 'simpleHttp', - 'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA', + 'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA', } def test_to_partial_json(self): @@ -39,56 +40,36 @@ class SimpleHTTPTest(unittest.TestCase): from acme.challenges import SimpleHTTP hash(SimpleHTTP.from_json(self.jmsg)) + def test_good_token(self): + self.assertTrue(self.msg.good_token) + self.assertFalse( + self.msg.update(token=b'..').good_token) + class SimpleHTTPResponseTest(unittest.TestCase): # pylint: disable=too-many-instance-attributes def setUp(self): from acme.challenges import SimpleHTTPResponse - self.msg_http = SimpleHTTPResponse( - path='6tbIMBC5Anhl5bOlWT5ZFA', tls=False) - self.msg_https = SimpleHTTPResponse(path='6tbIMBC5Anhl5bOlWT5ZFA') + self.msg_http = SimpleHTTPResponse(tls=False) + self.msg_https = SimpleHTTPResponse(tls=True) self.jmsg_http = { 'resource': 'challenge', 'type': 'simpleHttp', - 'path': '6tbIMBC5Anhl5bOlWT5ZFA', 'tls': False, } self.jmsg_https = { 'resource': 'challenge', 'type': 'simpleHttp', - 'path': '6tbIMBC5Anhl5bOlWT5ZFA', 'tls': True, } from acme.challenges import SimpleHTTP - self.chall = SimpleHTTP(token="foo") - self.resp_http = SimpleHTTPResponse(path="bar", tls=False) - self.resp_https = SimpleHTTPResponse(path="bar", tls=True) + self.chall = SimpleHTTP(token=(b"x" * 16)) + self.resp_http = SimpleHTTPResponse(tls=False) + self.resp_https = SimpleHTTPResponse(tls=True) self.good_headers = {'Content-Type': SimpleHTTPResponse.CONTENT_TYPE} - def test_good_path(self): - self.assertTrue(self.msg_http.good_path) - self.assertTrue(self.msg_https.good_path) - self.assertFalse( - self.msg_http.update(path=(self.msg_http.path * 10)).good_path) - - def test_scheme(self): - self.assertEqual('http', self.msg_http.scheme) - self.assertEqual('https', self.msg_https.scheme) - - def test_port(self): - self.assertEqual(80, self.msg_http.port) - self.assertEqual(443, self.msg_https.port) - - def test_uri(self): - self.assertEqual( - 'http://example.com/.well-known/acme-challenge/' - '6tbIMBC5Anhl5bOlWT5ZFA', self.msg_http.uri('example.com')) - self.assertEqual( - 'https://example.com/.well-known/acme-challenge/' - '6tbIMBC5Anhl5bOlWT5ZFA', self.msg_https.uri('example.com')) - def test_to_partial_json(self): self.assertEqual(self.jmsg_http, self.msg_http.to_partial_json()) self.assertEqual(self.jmsg_https, self.msg_https.to_partial_json()) @@ -105,6 +86,63 @@ class SimpleHTTPResponseTest(unittest.TestCase): hash(SimpleHTTPResponse.from_json(self.jmsg_http)) hash(SimpleHTTPResponse.from_json(self.jmsg_https)) + def test_scheme(self): + self.assertEqual('http', self.msg_http.scheme) + self.assertEqual('https', self.msg_https.scheme) + + def test_port(self): + self.assertEqual(80, self.msg_http.port) + self.assertEqual(443, self.msg_https.port) + + def test_uri(self): + self.assertEqual( + 'http://example.com/.well-known/acme-challenge/' + 'eHh4eHh4eHh4eHh4eHh4eA', self.msg_http.uri( + 'example.com', self.chall)) + self.assertEqual( + 'https://example.com/.well-known/acme-challenge/' + 'eHh4eHh4eHh4eHh4eHh4eA', self.msg_https.uri( + 'example.com', self.chall)) + + def test_gen_check_validation(self): + account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem')) + self.assertTrue(self.resp_http.check_validation( + validation=self.resp_http.gen_validation(self.chall, account_key), + chall=self.chall, account_public_key=account_key.public_key())) + + def test_gen_check_validation_wrong_key(self): + key1 = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem')) + key2 = jose.JWKRSA.load(test_util.load_vector('rsa1024_key.pem')) + self.assertFalse(self.resp_http.check_validation( + validation=self.resp_http.gen_validation(self.chall, key1), + chall=self.chall, account_public_key=key2.public_key())) + + def test_check_validation_wrong_payload(self): + account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem')) + validations = tuple( + jose.JWS.sign(payload=payload, alg=jose.RS256, key=account_key) + for payload in (b'', b'{}', self.chall.json_dumps().encode('utf-8'), + self.resp_http.json_dumps().encode('utf-8')) + ) + for validation in validations: + self.assertFalse(self.resp_http.check_validation( + validation=validation, chall=self.chall, + account_public_key=account_key.public_key())) + + def test_check_validation_wrong_fields(self): + resource = self.resp_http.gen_resource(self.chall) + account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem')) + validations = tuple( + jose.JWS.sign(payload=bad_resource.json_dumps().encode('utf-8'), + alg=jose.RS256, key=account_key) + for bad_resource in (resource.update(tls=True), + resource.update(token=b'x'*20)) + ) + for validation in validations: + self.assertFalse(self.resp_http.check_validation( + validation=validation, chall=self.chall, + account_public_key=account_key.public_key())) + @mock.patch("acme.challenges.requests.get") def test_simple_verify_good_token(self, mock_get): for resp in self.resp_http, self.resp_https: @@ -132,7 +170,8 @@ class SimpleHTTPResponseTest(unittest.TestCase): @mock.patch("acme.challenges.requests.get") def test_simple_verify_port(self, mock_get): - self.resp_http.simple_verify(self.chall, "local", 4430) + self.resp_http.simple_verify( + self.chall, domain="local", account_public_key=None, port=4430) self.assertEqual("local:4430", urllib_parse.urlparse( mock_get.mock_calls[0][1][0]).netloc) @@ -142,19 +181,12 @@ class DVSNITest(unittest.TestCase): def setUp(self): from acme.challenges import DVSNI self.msg = DVSNI( - r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6" - b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12", - nonce=b'\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.') + token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e')) self.jmsg = { 'type': 'dvsni', - 'r': 'Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI', - 'nonce': 'a82d5ff8ef740d12881f6d3c2277ab2e', + 'token': 'a82d5ff8ef740d12881f6d3c2277ab2e', } - def test_nonce_domain(self): - self.assertEqual(b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid', - self.msg.nonce_domain) - def test_to_partial_json(self): self.assertEqual(self.jmsg, self.msg.to_partial_json()) @@ -166,17 +198,66 @@ class DVSNITest(unittest.TestCase): from acme.challenges import DVSNI hash(DVSNI.from_json(self.jmsg)) - def test_from_json_invalid_r_length(self): + def test_from_json_invalid_token_length(self): from acme.challenges import DVSNI - self.jmsg['r'] = 'abcd' + self.jmsg['token'] = jose.encode_b64jose(b'abcd') self.assertRaises( jose.DeserializationError, DVSNI.from_json, self.jmsg) - def test_from_json_invalid_nonce_length(self): + def test_gen_response(self): + key = jose.JWKRSA(key=KEY) from acme.challenges import DVSNI - self.jmsg['nonce'] = 'abcd' - self.assertRaises( - jose.DeserializationError, DVSNI.from_json, self.jmsg) + self.assertEqual(self.msg, DVSNI.json_loads( + self.msg.gen_response(key).validation.payload.decode())) + + +class DVSNIResponseTest(unittest.TestCase): + # pylint: disable=too-many-instance-attributes + + def setUp(self): + self.key = jose.JWKRSA(key=KEY) + + from acme.challenges import DVSNI + self.chall = DVSNI( + token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e')) + + from acme.challenges import DVSNIResponse + self.validation = jose.JWS.sign( + payload=self.chall.json_dumps(sort_keys=True).encode(), + key=self.key, alg=jose.RS256) + self.msg = DVSNIResponse(validation=self.validation) + self.jmsg_to = { + 'resource': 'challenge', + 'type': 'dvsni', + 'validation': self.validation, + } + self.jmsg_from = { + 'resource': 'challenge', + 'type': 'dvsni', + 'validation': self.validation.to_json(), + } + + # pylint: disable=invalid-name + label1 = b'e2df3498860637c667fedadc5a8494ec' + label2 = b'09dcc75553c9b3bd73662b50e71b1e42' + self.z = label1 + label2 + self.z_domain = label1 + b'.' + label2 + b'.acme.invalid' + self.domain = 'foo.com' + + def test_z_and_domain(self): + self.assertEqual(self.z, self.msg.z) + self.assertEqual(self.z_domain, self.msg.z_domain) + + def test_to_partial_json(self): + self.assertEqual(self.jmsg_to, self.msg.to_partial_json()) + + def test_from_json(self): + from acme.challenges import DVSNIResponse + self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg_from)) + + def test_from_json_hashable(self): + from acme.challenges import DVSNIResponse + hash(DVSNIResponse.from_json(self.jmsg_from)) @mock.patch('acme.challenges.socket.gethostbyname') @mock.patch('acme.challenges.crypto_util._probe_sni') @@ -186,7 +267,7 @@ class DVSNITest(unittest.TestCase): mock_gethostbyname.assert_called_once_with('foo.com') mock_probe_sni.assert_called_once_with( host='127.0.0.1', port=self.msg.PORT, - name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid') + name=self.z_domain) self.msg.probe_cert('foo.com', host='8.8.8.8') mock_probe_sni.assert_called_with( @@ -203,88 +284,54 @@ class DVSNITest(unittest.TestCase): self.msg.probe_cert('foo.com', name=b'xxx') mock_probe_sni.assert_called_with( host=mock.ANY, port=mock.ANY, - name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid') + name=self.z_domain) + def test_gen_verify_cert(self): + key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem') + cert, key2 = self.msg.gen_cert(key1) + self.assertEqual(key1, key2) + self.assertTrue(self.msg.verify_cert(cert)) -class DVSNIResponseTest(unittest.TestCase): + def test_gen_verify_cert_gen_key(self): + cert, key = self.msg.gen_cert() + self.assertTrue(isinstance(key, OpenSSL.crypto.PKey)) + self.assertTrue(self.msg.verify_cert(cert)) - def setUp(self): - from acme.challenges import DVSNIResponse - # pylint: disable=invalid-name - s = '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c' - self.msg = DVSNIResponse(s=jose.decode_b64jose(s)) - self.jmsg = { - 'resource': 'challenge', - 'type': 'dvsni', - 's': s, - } + def test_verify_bad_cert(self): + self.assertFalse(self.msg.verify_cert(test_util.load_cert('cert.pem'))) - from acme.challenges import DVSNI - self.chall = DVSNI( - r=jose.decode_b64jose('Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI'), - nonce=jose.decode_b64jose('a82d5ff8ef740d12881f6d3c2277ab2e')) - self.z = (b'38e612b0397cc2624a07d351d7ef50e4' - b'6134c0213d9ed52f7d7c611acaeed41b') - self.domain = 'foo.com' - self.key = test_util.load_pyopenssl_private_key('rsa512_key.pem') - self.public_key = test_util.load_rsa_private_key( - 'rsa512_key.pem').public_key() + def test_simple_verify_wrong_account_key(self): + self.assertFalse(self.msg.simple_verify( + self.chall, self.domain, jose.JWKRSA.load( + test_util.load_vector('rsa256_key.pem')).public_key())) - def test_z_and_domain(self): - # pylint: disable=invalid-name - self.assertEqual(self.z, self.msg.z(self.chall)) - self.assertEqual( - self.z + b'.acme.invalid', self.msg.z_domain(self.chall)) + def test_simple_verify_wrong_payload(self): + for payload in b'', b'{}': + msg = self.msg.update(validation=jose.JWS.sign( + payload=payload, key=self.key, alg=jose.RS256)) + self.assertFalse(msg.simple_verify( + self.chall, self.domain, self.key.public_key())) - def test_to_partial_json(self): - self.assertEqual(self.jmsg, self.msg.to_partial_json()) + def test_simple_verify_wrong_token(self): + msg = self.msg.update(validation=jose.JWS.sign( + payload=self.chall.update(token=b'b'*20).json_dumps().encode(), + key=self.key, alg=jose.RS256)) + self.assertFalse(msg.simple_verify( + self.chall, self.domain, self.key.public_key())) - def test_from_json(self): - from acme.challenges import DVSNIResponse - self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg)) - - def test_from_json_hashable(self): - from acme.challenges import DVSNIResponse - hash(DVSNIResponse.from_json(self.jmsg)) - - @mock.patch('acme.challenges.DVSNIResponse.verify_cert') + @mock.patch('acme.challenges.DVSNIResponse.verify_cert', autospec=True) def test_simple_verify(self, mock_verify_cert): - chall = mock.Mock() - chall.probe_cert.return_value = mock.sentinel.cert - mock_verify_cert.return_value = 'x' - self.assertEqual('x', self.msg.simple_verify( - chall, mock.sentinel.domain, mock.sentinel.key)) - chall.probe_cert.assert_called_once_with(domain=mock.sentinel.domain) - self.msg.verify_cert.assert_called_once_with( - chall, mock.sentinel.domain, mock.sentinel.key, - mock.sentinel.cert) + mock_verify_cert.return_value = mock.sentinel.verification + self.assertEqual(mock.sentinel.verification, self.msg.simple_verify( + self.chall, self.domain, self.key.public_key(), + cert=mock.sentinel.cert)) + mock_verify_cert.assert_called_once_with(self.msg, mock.sentinel.cert) def test_simple_verify_false_on_probe_error(self): chall = mock.Mock() chall.probe_cert.side_effect = errors.Error self.assertFalse(self.msg.simple_verify( - chall=chall, domain=None, public_key=None)) - - def test_gen_verify_cert_postive_no_key(self): - cert = self.msg.gen_cert(self.chall, self.domain, self.key) - self.assertTrue(self.msg.verify_cert( - self.chall, self.domain, public_key=None, cert=cert)) - - def test_gen_verify_cert_postive_with_key(self): - cert = self.msg.gen_cert(self.chall, self.domain, self.key) - self.assertTrue(self.msg.verify_cert( - self.chall, self.domain, public_key=self.public_key, cert=cert)) - - def test_gen_verify_cert_negative_with_wrong_key(self): - cert = self.msg.gen_cert(self.chall, self.domain, self.key) - key = test_util.load_rsa_private_key('rsa256_key.pem').public_key() - self.assertFalse(self.msg.verify_cert( - self.chall, self.domain, public_key=key, cert=cert)) - - def test_gen_verify_cert_negative(self): - cert = self.msg.gen_cert(self.chall, self.domain + 'x', self.key) - self.assertFalse(self.msg.verify_cert( - self.chall, self.domain, public_key=None, cert=cert)) + self.chall, self.domain, self.key.public_key())) class RecoveryContactTest(unittest.TestCase): @@ -360,58 +407,6 @@ class RecoveryContactResponseTest(unittest.TestCase): self.assertEqual(self.jmsg, msg.to_partial_json()) -class RecoveryTokenTest(unittest.TestCase): - - def setUp(self): - from acme.challenges import RecoveryToken - self.msg = RecoveryToken() - self.jmsg = {'type': 'recoveryToken'} - - def test_to_partial_json(self): - self.assertEqual(self.jmsg, self.msg.to_partial_json()) - - def test_from_json(self): - from acme.challenges import RecoveryToken - self.assertEqual(self.msg, RecoveryToken.from_json(self.jmsg)) - - def test_from_json_hashable(self): - from acme.challenges import RecoveryToken - hash(RecoveryToken.from_json(self.jmsg)) - - -class RecoveryTokenResponseTest(unittest.TestCase): - - def setUp(self): - from acme.challenges import RecoveryTokenResponse - self.msg = RecoveryTokenResponse(token='23029d88d9e123e') - self.jmsg = { - 'resource': 'challenge', - 'type': 'recoveryToken', - 'token': '23029d88d9e123e' - } - - def test_to_partial_json(self): - self.assertEqual(self.jmsg, self.msg.to_partial_json()) - - def test_from_json(self): - from acme.challenges import RecoveryTokenResponse - self.assertEqual( - self.msg, RecoveryTokenResponse.from_json(self.jmsg)) - - def test_from_json_hashable(self): - from acme.challenges import RecoveryTokenResponse - hash(RecoveryTokenResponse.from_json(self.jmsg)) - - def test_json_without_optionals(self): - del self.jmsg['token'] - - from acme.challenges import RecoveryTokenResponse - msg = RecoveryTokenResponse.from_json(self.jmsg) - - self.assertTrue(msg.token is None) - self.assertEqual(self.jmsg, msg.to_partial_json()) - - class ProofOfPossessionHintsTest(unittest.TestCase): def setUp(self): diff --git a/acme/acme/client_test.py b/acme/acme/client_test.py index 97e0b9662..28226fe5a 100644 --- a/acme/acme/client_test.py +++ b/acme/acme/client_test.py @@ -44,7 +44,7 @@ class ClientTest(unittest.TestCase): # Registration self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212') reg = messages.Registration( - contact=self.contact, key=KEY.public_key(), recovery_token='t') + contact=self.contact, key=KEY.public_key()) self.new_reg = messages.NewRegistration(**dict(reg)) self.regr = messages.RegistrationResource( body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1', diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index cb796cb88..1ab8e87a1 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -155,13 +155,18 @@ def _pyopenssl_cert_or_req_san(cert_or_req): for part in parts if part.startswith(prefix)] -def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)): +def gen_ss_cert(key, domains, not_before=None, + validity=(7 * 24 * 60 * 60), force_san=True): """Generate new self-signed certificate. :type domains: `list` of `unicode` :param OpenSSL.crypto.PKey key: + :param bool force_san: - Uses key and contains all domains. + If more than one domain is provided, all of the domains are put into + ``subjectAltName`` X.509 extension and first domain is set as the + subject CN. If only one domain is provided no ``subjectAltName`` + extension is used, unless `force_san` is ``True``. """ assert domains, "Must provide one or more hostnames for the cert." @@ -178,7 +183,7 @@ def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)): # TODO: what to put into cert.get_subject()? cert.set_issuer(cert.get_subject()) - if len(domains) > 1: + if force_san or len(domains) > 1: extensions.append(OpenSSL.crypto.X509Extension( b"subjectAltName", critical=False, diff --git a/acme/acme/fields.py b/acme/acme/fields.py index 6943563d4..1e4d3a822 100644 --- a/acme/acme/fields.py +++ b/acme/acme/fields.py @@ -1,9 +1,34 @@ """ACME JSON fields.""" +import logging + import pyrfc3339 from acme import jose +logger = logging.getLogger(__name__) + + +class Fixed(jose.Field): + """Fixed field.""" + + def __init__(self, json_name, value): + self.value = value + super(Fixed, self).__init__( + json_name=json_name, default=value, omitempty=False) + + def decode(self, value): + if value != self.value: + raise jose.DeserializationError('Expected {0!r}'.format(self.value)) + return self.value + + def encode(self, value): + if value != self.value: + logger.warn('Overriding fixed field ({0}) with {1}'.format( + self.json_name, value)) + return value + + class RFC3339Field(jose.Field): """RFC3339 field encoder/decoder. @@ -31,8 +56,6 @@ class Resource(jose.Field): def __init__(self, resource_type, *args, **kwargs): self.resource_type = resource_type super(Resource, self).__init__( - # TODO: omitempty used only to trick - # JSONObjectWithFieldsMeta._defaults..., server implementation 'resource', default=resource_type, *args, **kwargs) def decode(self, value): diff --git a/acme/acme/fields_test.py b/acme/acme/fields_test.py index e95c450ce..de852b6fa 100644 --- a/acme/acme/fields_test.py +++ b/acme/acme/fields_test.py @@ -7,6 +7,26 @@ import pytz from acme import jose +class FixedTest(unittest.TestCase): + """Tests for acme.fields.Fixed.""" + + def setUp(self): + from acme.fields import Fixed + self.field = Fixed('name', 'x') + + def test_decode(self): + self.assertEqual('x', self.field.decode('x')) + + def test_decode_bad(self): + self.assertRaises(jose.DeserializationError, self.field.decode, 'y') + + def test_encode(self): + self.assertEqual('x', self.field.encode('x')) + + def test_encode_override(self): + self.assertEqual('y', self.field.encode('y')) + + class RFC3339FieldTest(unittest.TestCase): """Tests for acme.fields.RFC3339Field.""" diff --git a/acme/acme/jose/errors.py b/acme/acme/jose/errors.py index 74708c4a4..74c9443e1 100644 --- a/acme/acme/jose/errors.py +++ b/acme/acme/jose/errors.py @@ -8,6 +8,10 @@ class Error(Exception): class DeserializationError(Error): """JSON deserialization error.""" + def __str__(self): + return "Deserialization error: {0}".format( + super(DeserializationError, self).__str__()) + class SerializationError(Error): """JSON serialization error.""" diff --git a/acme/acme/jose/interfaces.py b/acme/acme/jose/interfaces.py index 96dae6bae..a714fee51 100644 --- a/acme/acme/jose/interfaces.py +++ b/acme/acme/jose/interfaces.py @@ -5,6 +5,7 @@ import json import six +from acme.jose import errors from acme.jose import util # pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class @@ -172,7 +173,11 @@ class JSONDeSerializable(object): @classmethod def json_loads(cls, json_string): """Deserialize from JSON document string.""" - return cls.from_json(json.loads(json_string)) + try: + loads = json.loads(json_string) + except ValueError as error: + raise errors.DeserializationError(error) + return cls.from_json(loads) def json_dumps(self, **kwargs): """Dump to JSON string using proper serializer. diff --git a/acme/acme/jose/json_util.py b/acme/acme/jose/json_util.py index fec13aa26..51d55ebd9 100644 --- a/acme/acme/jose/json_util.py +++ b/acme/acme/jose/json_util.py @@ -221,6 +221,22 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable): super(JSONObjectWithFields, self).__init__( **(dict(self._defaults(), **kwargs))) + def encode(self, name): + """Encode a single field. + + :param str name: Name of the field to be encoded. + + :raises erors.SerializationError: if field cannot be serialized + :raises errors.Error: if field could not be found + + """ + try: + field = self._fields[name] + except KeyError: + raise errors.Error("Field not found: {0}".format(name)) + + return field.encode(getattr(self, name)) + def fields_to_partial_json(self): """Serialize fields to JSON.""" jobj = {} @@ -310,7 +326,8 @@ def decode_b64jose(data, size=None, minimum=False): if size is not None and ((not minimum and len(decoded) != size) or (minimum and len(decoded) < size)): - raise errors.DeserializationError() + raise errors.DeserializationError( + "Expected at least or exactly {0} bytes".format(size)) return decoded @@ -418,7 +435,9 @@ class TypedJSONObjectWithFields(JSONObjectWithFields): def get_type_cls(cls, jobj): """Get the registered class for ``jobj``.""" if cls in six.itervalues(cls.TYPES): - assert jobj[cls.type_field_name] + if cls.type_field_name not in jobj: + raise errors.DeserializationError( + "Missing type field ({0})".format(cls.type_field_name)) # cls is already registered type_cls, force to use it # so that, e.g Revocation.from_json(jobj) fails if # jobj["type"] != "revocation". diff --git a/acme/acme/jose/json_util_test.py b/acme/acme/jose/json_util_test.py index 2225267ee..313282e67 100644 --- a/acme/acme/jose/json_util_test.py +++ b/acme/acme/jose/json_util_test.py @@ -160,6 +160,18 @@ class JSONObjectWithFieldsTest(unittest.TestCase): def test_init_defaults(self): self.assertEqual(self.mock, self.MockJSONObjectWithFields(y=2, z=3)) + def test_encode(self): + self.assertEqual(10, self.MockJSONObjectWithFields( + x=5, y=0, z=0).encode("x")) + + def test_encode_wrong_field(self): + self.assertRaises(errors.Error, self.mock.encode, 'foo') + + def test_encode_serialization_error_passthrough(self): + self.assertRaises( + errors.SerializationError, + self.MockJSONObjectWithFields(y=500, z=None).encode, "y") + def test_fields_to_partial_json_omits_empty(self): self.assertEqual(self.mock.fields_to_partial_json(), {'y': 2, 'Z': 3}) diff --git a/acme/acme/messages.py b/acme/acme/messages.py index 39594cbda..33157899e 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -156,7 +156,6 @@ class Registration(ResourceBody): :ivar acme.jose.jwk.JWK key: Public key. :ivar tuple contact: Contact information following ACME spec, `tuple` of `unicode`. - :ivar unicode recovery_token: :ivar unicode agreement: """ @@ -164,7 +163,6 @@ class Registration(ResourceBody): # JWS.signature.combined.jwk key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json) contact = jose.Field('contact', omitempty=True, default=()) - recovery_token = jose.Field('recoveryToken', omitempty=True) agreement = jose.Field('agreement', omitempty=True) phone_prefix = 'tel:' diff --git a/acme/acme/messages_test.py b/acme/acme/messages_test.py index 2ed0dd669..051db9ae9 100644 --- a/acme/acme/messages_test.py +++ b/acme/acme/messages_test.py @@ -101,18 +101,14 @@ class RegistrationTest(unittest.TestCase): 'mailto:admin@foo.com', 'tel:1234', ) - recovery_token = 'XYZ' agreement = 'https://letsencrypt.org/terms' from acme.messages import Registration - self.reg = Registration( - key=key, contact=contact, recovery_token=recovery_token, - agreement=agreement) + self.reg = Registration(key=key, contact=contact, agreement=agreement) self.reg_none = Registration() self.jobj_to = { 'contact': contact, - 'recoveryToken': recovery_token, 'agreement': agreement, 'key': key, } @@ -228,11 +224,12 @@ class AuthorizationTest(unittest.TestCase): self.challbs = ( ChallengeBody( uri='http://challb1', status=STATUS_VALID, - chall=challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A')), + chall=challenges.SimpleHTTP(token=b'IlirfxKKXAsHtmzK29Pj8A')), ChallengeBody(uri='http://challb2', status=STATUS_VALID, - chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')), + chall=challenges.DNS( + token=b'DGyRejmCefe7v4NfDGDKfA')), ChallengeBody(uri='http://challb3', status=STATUS_VALID, - chall=challenges.RecoveryToken()), + chall=challenges.RecoveryContact()), ) combinations = ((0, 2), (1, 2)) diff --git a/letsencrypt/account.py b/letsencrypt/account.py index e962c993d..22f625bca 100644 --- a/letsencrypt/account.py +++ b/letsencrypt/account.py @@ -94,15 +94,11 @@ def report_new_account(acc, config): config.config_dir), reporter.MEDIUM_PRIORITY, True) - assert acc.regr.body.recovery_token is not None - recovery_msg = ("If you lose your account credentials, you can recover " - "them using the token \"{0}\". You must write that down " - "and put it in a safe place.".format( - acc.regr.body.recovery_token)) if acc.regr.body.emails: - recovery_msg += (" Another recovery method will be e-mails sent to " - "{0}.".format(", ".join(acc.regr.body.emails))) - reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True) + recovery_msg = ("If you lose your account credentials, you can " + "recover through e-mails sent to {0}.".format( + ", ".join(acc.regr.body.emails))) + reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True) class AccountMemoryStorage(interfaces.AccountStorage): diff --git a/letsencrypt/achallenges.py b/letsencrypt/achallenges.py index df7f96dd6..25874d4c1 100644 --- a/letsencrypt/achallenges.py +++ b/letsencrypt/achallenges.py @@ -17,18 +17,22 @@ Note, that all annotated challenges act as a proxy objects:: achall.token == challb.token """ +import logging +import os + import OpenSSL from acme import challenges -from acme.jose import util as jose_util +from acme import jose -from letsencrypt import crypto_util + +logger = logging.getLogger(__name__) # pylint: disable=too-few-public-methods -class AnnotatedChallenge(jose_util.ImmutableMap): +class AnnotatedChallenge(jose.ImmutableMap): """Client annotated challenge. Wraps around server provided challenge and annotates with data @@ -45,33 +49,56 @@ class AnnotatedChallenge(jose_util.ImmutableMap): class DVSNI(AnnotatedChallenge): - """Client annotated "dvsni" ACME challenge.""" - __slots__ = ('challb', 'domain', 'key') + """Client annotated "dvsni" ACME challenge. + + :ivar .Account account: + + """ + __slots__ = ('challb', 'domain', 'account') acme_type = challenges.DVSNI - def gen_cert_and_response(self, s=None): # pylint: disable=invalid-name + def gen_cert_and_response(self, key_pem=None, bits=2048, alg=jose.RS256): """Generate a DVSNI cert and response. - :returns: ``(cert_pem, response)`` tuple, where ``cert_pem`` is the PEM - encoded certificate and ``response`` is an instance - :class:`acme.challenges.DVSNIResponse`. + :param bytes key_pem: Private PEM-encoded key used for + certificate generation. If none provided, a fresh key will + be generated. + :param int bits: Number of bits for fresh key generation. + :param .JWAAlgorithm alg: + + :returns: ``(response, cert_pem, key_pem)`` tuple, where + ``response`` is an instance of + `acme.challenges.DVSNIResponse`, ``cert_pem`` is the + PEM-encoded certificate and ``key_pem`` is PEM-encoded + private key. :rtype: tuple """ - key = crypto_util.private_jwk_to_pyopenssl(self.key) - response = challenges.DVSNIResponse(s=s) - cert = response.gen_cert(self.challb.chall, self.domain, key) + key = None if key_pem is None else OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, key_pem) + response = self.challb.chall.gen_response(self.account.key, alg=alg) + cert, key = response.gen_cert(key=key, bits=bits) + cert_pem = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, cert) + key_pem = OpenSSL.crypto.dump_privatekey( + OpenSSL.crypto.FILETYPE_PEM, key) - return cert_pem, response + return response, cert_pem, key_pem class SimpleHTTP(AnnotatedChallenge): """Client annotated "simpleHttp" ACME challenge.""" - __slots__ = ('challb', 'domain', 'key') + __slots__ = ('challb', 'domain', 'account') acme_type = challenges.SimpleHTTP + def gen_response_and_validation(self, tls): + response = challenges.SimpleHTTPResponse(tls=tls) + + validation = response.gen_validation(self.chall, self.account.key) + logger.debug("Simple HTTP validation payload: %s", validation.payload) + return response, validation + class DNS(AnnotatedChallenge): """Client annotated "dns" ACME challenge.""" @@ -85,12 +112,6 @@ class RecoveryContact(AnnotatedChallenge): acme_type = challenges.RecoveryContact -class RecoveryToken(AnnotatedChallenge): - """Client annotated "recoveryToken" ACME challenge.""" - __slots__ = ('challb', 'domain') - acme_type = challenges.RecoveryToken - - class ProofOfPossession(AnnotatedChallenge): """Client annotated "proofOfPossession" ACME challenge.""" __slots__ = ('challb', 'domain') diff --git a/letsencrypt/auth_handler.py b/letsencrypt/auth_handler.py index 5969dc36f..a8e720ce3 100644 --- a/letsencrypt/auth_handler.py +++ b/letsencrypt/auth_handler.py @@ -322,7 +322,7 @@ class AuthHandler(object): challb = self.authzr[domain].body.challenges[index] chall = challb.chall - achall = challb_to_achall(challb, self.account.key, domain) + achall = challb_to_achall(challb, self.account, domain) if isinstance(chall, challenges.ContinuityChallenge): cont_chall.append(achall) @@ -332,15 +332,11 @@ class AuthHandler(object): return cont_chall, dv_chall -def challb_to_achall(challb, key, domain): +def challb_to_achall(challb, account, domain): """Converts a ChallengeBody object to an AnnotatedChallenge. - :param challb: ChallengeBody - :type challb: :class:`acme.messages.ChallengeBody` - - :param key: Key - :type key: :class:`letsencrypt.le_util.Key` - + :param .ChallengeBody challb: ChallengeBody + :param .Account account: :param str domain: Domain of the challb :returns: Appropriate AnnotatedChallenge @@ -352,14 +348,12 @@ def challb_to_achall(challb, key, domain): if isinstance(chall, challenges.DVSNI): return achallenges.DVSNI( - challb=challb, domain=domain, key=key) + challb=challb, domain=domain, account=account) elif isinstance(chall, challenges.SimpleHTTP): return achallenges.SimpleHTTP( - challb=challb, domain=domain, key=key) + challb=challb, domain=domain, account=account) elif isinstance(chall, challenges.DNS): return achallenges.DNS(challb=challb, domain=domain) - elif isinstance(chall, challenges.RecoveryToken): - return achallenges.RecoveryToken(challb=challb, domain=domain) elif isinstance(chall, challenges.RecoveryContact): return achallenges.RecoveryContact( challb=challb, domain=domain) diff --git a/letsencrypt/configuration.py b/letsencrypt/configuration.py index d68b46e52..c7c780535 100644 --- a/letsencrypt/configuration.py +++ b/letsencrypt/configuration.py @@ -22,7 +22,6 @@ class NamespaceConfig(object): - `cert_key_backup` - `in_progress_dir` - `key_dir` - - `rec_token_dir` - `renewer_config_file` - `temp_checkpoint_dir` @@ -71,11 +70,6 @@ class NamespaceConfig(object): def key_dir(self): # pylint: disable=missing-docstring return os.path.join(self.namespace.config_dir, constants.KEY_DIR) - # TODO: This should probably include the server name - @property - def rec_token_dir(self): # pylint: disable=missing-docstring - return os.path.join(self.namespace.work_dir, constants.REC_TOKEN_DIR) - @property def temp_checkpoint_dir(self): # pylint: disable=missing-docstring return os.path.join( diff --git a/letsencrypt/constants.py b/letsencrypt/constants.py index 38f3454dd..230860762 100644 --- a/letsencrypt/constants.py +++ b/letsencrypt/constants.py @@ -88,10 +88,6 @@ LIVE_DIR = "live" TEMP_CHECKPOINT_DIR = "temp_checkpoint" """Temporary checkpoint directory (relative to `IConfig.work_dir`).""" -REC_TOKEN_DIR = "recovery_tokens" -"""Directory where all recovery tokens are saved (relative to -`IConfig.work_dir`).""" - RENEWAL_CONFIGS_DIR = "configs" """Renewal configs directory, relative to `IConfig.config_dir`.""" diff --git a/letsencrypt/continuity_auth.py b/letsencrypt/continuity_auth.py index 2eb1c22bf..52d0cee8e 100644 --- a/letsencrypt/continuity_auth.py +++ b/letsencrypt/continuity_auth.py @@ -7,16 +7,12 @@ from letsencrypt import achallenges from letsencrypt import errors from letsencrypt import interfaces from letsencrypt import proof_of_possession -from letsencrypt import recovery_token class ContinuityAuthenticator(object): """IAuthenticator for :const:`~acme.challenges.ContinuityChallenge` class challenges. - :ivar rec_token: Performs "recoveryToken" challenges. - :type rec_token: :class:`letsencrypt.recovery_token.RecoveryToken` - :ivar proof_of_pos: Performs "proofOfPossession" challenges. :type proof_of_pos: :class:`letsencrypt.proof_of_possession.Proof_of_Possession` @@ -25,7 +21,7 @@ class ContinuityAuthenticator(object): zope.interface.implements(interfaces.IAuthenticator) # This will have an installer soon for get_key/cert purposes - def __init__(self, config, installer): + def __init__(self, config, installer): # pylint: disable=unused-argument """Initialize Client Authenticator. :param config: Configuration. @@ -35,13 +31,11 @@ class ContinuityAuthenticator(object): :type installer: :class:`letsencrypt.interfaces.IInstaller` """ - self.rec_token = recovery_token.RecoveryToken( - config.server, config.rec_token_dir) self.proof_of_pos = proof_of_possession.ProofOfPossession(installer) def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use """Return list of challenge preferences.""" - return [challenges.ProofOfPossession, challenges.RecoveryToken] + return [challenges.ProofOfPossession] def perform(self, achalls): """Perform client specific challenges for IAuthenticator""" @@ -49,16 +43,12 @@ class ContinuityAuthenticator(object): for achall in achalls: if isinstance(achall, achallenges.ProofOfPossession): responses.append(self.proof_of_pos.perform(achall)) - elif isinstance(achall, achallenges.RecoveryToken): - responses.append(self.rec_token.perform(achall)) else: raise errors.ContAuthError("Unexpected Challenge") return responses - def cleanup(self, achalls): + def cleanup(self, achalls): # pylint: disable=no-self-use """Cleanup call for IAuthenticator.""" for achall in achalls: - if isinstance(achall, achallenges.RecoveryToken): - self.rec_token.cleanup(achall) - elif not isinstance(achall, achallenges.ProofOfPossession): + if not isinstance(achall, achallenges.ProofOfPossession): raise errors.ContAuthError("Unexpected Challenge") diff --git a/letsencrypt/crypto_util.py b/letsencrypt/crypto_util.py index 12c449261..b7d9987fc 100644 --- a/letsencrypt/crypto_util.py +++ b/letsencrypt/crypto_util.py @@ -8,7 +8,6 @@ import datetime import logging import os -from cryptography.hazmat.primitives import serialization import OpenSSL from acme import crypto_util as acme_crypto_util @@ -215,15 +214,6 @@ def pyopenssl_load_certificate(data): return _pyopenssl_load(data, OpenSSL.crypto.load_certificate) -def private_jwk_to_pyopenssl(jwk): - """Convert private JWK to pyOpenSSL key.""" - key_pem = jwk.key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption()) - return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_pem) - - def _get_sans_from_cert_or_req( cert_or_req_str, load_func, typ=OpenSSL.crypto.FILETYPE_PEM): try: @@ -238,7 +228,7 @@ def _get_sans_from_cert_or_req( def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM): """Get a list of Subject Alternative Names from a certificate. - :param str csr: Certificate (encoded). + :param str cert: Certificate (encoded). :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` :returns: A list of Subject Alternative Names. diff --git a/letsencrypt/interfaces.py b/letsencrypt/interfaces.py index 3cb7270b4..d2a474cbd 100644 --- a/letsencrypt/interfaces.py +++ b/letsencrypt/interfaces.py @@ -215,8 +215,6 @@ class IConfig(zope.interface.Interface): in_progress_dir = zope.interface.Attribute( "Directory used before a permanent checkpoint is finalized.") key_dir = zope.interface.Attribute("Keys storage.") - rec_token_dir = zope.interface.Attribute( - "Directory where all recovery tokens are saved.") temp_checkpoint_dir = zope.interface.Attribute( "Temporary checkpoint directory.") diff --git a/letsencrypt/plugins/common.py b/letsencrypt/plugins/common.py index 3e7596c1f..39956fc90 100644 --- a/letsencrypt/plugins/common.py +++ b/letsencrypt/plugins/common.py @@ -5,7 +5,6 @@ import re import shutil import tempfile -from cryptography.hazmat.primitives import serialization import zope.interface from acme.jose import util as jose_util @@ -163,13 +162,13 @@ class Dvsni(object): :rtype: str """ - return os.path.join( - self.configurator.config.work_dir, achall.nonce_domain + ".crt") + return os.path.join(self.configurator.config.work_dir, + achall.chall.encode("token") + ".crt") def get_key_path(self, achall): """Get standardized path to challenge key.""" - return os.path.join( - self.configurator.config.work_dir, achall.nonce_domain + '.pem') + return os.path.join(self.configurator.config.work_dir, + achall.chall.encode("token") + '.pem') def _setup_challenge_cert(self, achall, s=None): # pylint: disable=invalid-name @@ -180,17 +179,11 @@ class Dvsni(object): self.configurator.reverter.register_file_creation(True, key_path) self.configurator.reverter.register_file_creation(True, cert_path) - cert_pem, response = achall.gen_cert_and_response(s) + response, cert_pem, key_pem = achall.gen_cert_and_response(s) - # Write out challenge cert + # Write out challenge cert and key with open(cert_path, "wb") as cert_chall_fd: cert_chall_fd.write(cert_pem) - - # Write out challenge key - key_pem = achall.key.key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption()) with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file: key_file.write(key_pem) diff --git a/letsencrypt/plugins/common_test.py b/letsencrypt/plugins/common_test.py index b68ab8369..4ecf638dd 100644 --- a/letsencrypt/plugins/common_test.py +++ b/letsencrypt/plugins/common_test.py @@ -120,21 +120,12 @@ class DvsniTest(unittest.TestCase): achalls = [ achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI( - r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9" - "\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4", - nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18", - ), "pending"), - domain="encryption-example.demo", key=auth_key), + challenges.DVSNI(token=b'dvsni1'), "pending"), + domain="encryption-example.demo", account=mock.Mock(key=auth_key)), achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI( - r="\xba\xa9\xda? {response.URI_ROOT_PATH}/{response.path} +echo -n {validation} > {response.URI_ROOT_PATH}/{encoded_token} # run only once per server: python -c "import BaseHTTPServer, SimpleHTTPServer; \\ SimpleHTTPServer.SimpleHTTPRequestHandler.extensions_map = {{'': '{ct}'}}; \\ @@ -67,7 +68,7 @@ s.serve_forever()" """ HTTPS_TEMPLATE = """\ mkdir -p {root}/public_html/{response.URI_ROOT_PATH} cd {root}/public_html -echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path} +echo -n {validation} > {response.URI_ROOT_PATH}/{encoded_token} # run only once per server: openssl req -new -newkey rsa:4096 -subj "/" -days 1 -nodes -x509 -keyout ../key.pem -out ../cert.pem python -c "import BaseHTTPServer, SimpleHTTPServer, ssl; \\ @@ -124,13 +125,13 @@ binary for temporary key/certificate generation.""".replace("\n", "") # same path for each challenge response would be easier for # users, but will not work if multiple domains point at the # same server: default command doesn't support virtual hosts - response = challenges.SimpleHTTPResponse( - path=jose.b64encode(os.urandom(18)), + response, validation = achall.gen_response_and_validation( tls=(not self.config.no_simple_http_tls)) - assert response.good_path # is encoded os.urandom(18) good? command = self.template.format( root=self._root, achall=achall, response=response, + validation=pipes.quote(validation.json_dumps()), + encoded_token=achall.chall.encode("token"), ct=response.CONTENT_TYPE, port=( response.port if self.config.simple_http_port is None else self.config.simple_http_port)) @@ -161,7 +162,8 @@ binary for temporary key/certificate generation.""".replace("\n", "") command=command)) if response.simple_verify( - achall.challb, achall.domain, self.config.simple_http_port): + achall.chall, achall.domain, + achall.account.key.public_key(), self.config.simple_http_port): return response else: if self.conf("test-mode") and self._httpd.poll() is not None: diff --git a/letsencrypt/plugins/standalone/authenticator.py b/letsencrypt/plugins/standalone/authenticator.py index cc25d2c1d..968063781 100644 --- a/letsencrypt/plugins/standalone/authenticator.py +++ b/letsencrypt/plugins/standalone/authenticator.py @@ -6,7 +6,6 @@ import socket import sys import time -from cryptography.hazmat.primitives import serialization import OpenSSL import zope.component import zope.interface @@ -14,6 +13,7 @@ import zope.interface from acme import challenges from letsencrypt import achallenges +from letsencrypt import crypto_util from letsencrypt import interfaces from letsencrypt.plugins import common @@ -28,6 +28,11 @@ class StandaloneAuthenticator(common.Plugin): the certificate authority. Therefore, it does not rely on any existing server program. + :param OpenSSL.crypto.PKey private_key: DVSNI challenge certificate + key. + :param sni_names: Mapping from z_domain (`bytes`) to PEM-encoded + certificate (`bytes`). + """ zope.interface.implements(interfaces.IAuthenticator) zope.interface.classProvides(interfaces.IPluginFactory) @@ -40,9 +45,12 @@ class StandaloneAuthenticator(common.Plugin): self.parent_pid = os.getpid() self.subproc_state = None self.tasks = {} + self.sni_names = {} self.sock = None self.connection = None - self.private_key = None + self.key_pem = crypto_util.make_key(bits=2048) + self.private_key = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, self.key_pem) self.ssl_conn = None def prepare(self): @@ -121,12 +129,12 @@ class StandaloneAuthenticator(common.Plugin): """ sni_name = connection.get_servername() - if sni_name in self.tasks: - pem_cert = self.tasks[sni_name] + if sni_name in self.sni_names: + pem_cert = self.sni_names[sni_name] else: # TODO: Should we really present a certificate if we get an # unexpected SNI name? Or should we just disconnect? - pem_cert = self.tasks.values()[0] + pem_cert = next(self.sni_names.itervalues()) cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, pem_cert) new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD) @@ -179,7 +187,7 @@ class StandaloneAuthenticator(common.Plugin): return False - def do_child_process(self, port, key): + def do_child_process(self, port): """Perform the child process side of the TCP listener task. This should only be called by :meth:`start_listener`. @@ -189,9 +197,6 @@ class StandaloneAuthenticator(common.Plugin): handler. :param int port: Which TCP port to bind. - :param key: The private key to use to respond to DVSNI challenge - requests. - :type key: `letsencrypt.le_util.Key` """ signal.signal(signal.SIGINT, self.subproc_signal_handler) @@ -218,11 +223,6 @@ class StandaloneAuthenticator(common.Plugin): self.sock.listen(1) # Signal that we've successfully bound TCP port os.kill(self.parent_pid, signal.SIGIO) - self.private_key = OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, key.key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption())) while True: self.connection, _ = self.sock.accept() @@ -245,16 +245,13 @@ class StandaloneAuthenticator(common.Plugin): self.ssl_conn.shutdown() self.ssl_conn.close() - def start_listener(self, port, key): + def start_listener(self, port): """Start listener. Create a child process which will start a TCP listener on the specified port to perform the specified DVSNI challenges. :param int port: The TCP port to bind. - :param key: The private key to use to respond to DVSNI challenge - requests. - :type key: :class:`letsencrypt.le_util.Key` :returns: ``True`` or ``False`` to indicate success or failure creating the subprocess. @@ -290,7 +287,7 @@ class StandaloneAuthenticator(common.Plugin): self.child_pid = os.getpid() # do_child_process() is normally not expected to return but # should terminate via sys.exit(). - return self.do_child_process(port, key) + return self.do_child_process(port) def already_listening(self, port): # pylint: disable=no-self-use """Check if a process is already listening on the port. @@ -368,12 +365,14 @@ class StandaloneAuthenticator(common.Plugin): results_if_failure = [] if not achalls or not isinstance(achalls, list): raise ValueError(".perform() was called without challenge list") + # TODO: "bits" should be user-configurable for achall in achalls: if isinstance(achall, achallenges.DVSNI): # We will attempt to do it - key = achall.key # TODO: bug; one key per start_listener - cert_pem, response = achall.gen_cert_and_response() - self.tasks[achall.nonce_domain] = cert_pem + response, cert_pem, _ = achall.gen_cert_and_response( + key_pem=self.key_pem) + self.sni_names[response.z_domain] = cert_pem + self.tasks[achall.token] = cert_pem results_if_success.append(response) results_if_failure.append(None) else: @@ -392,7 +391,7 @@ class StandaloneAuthenticator(common.Plugin): return results_if_failure # Try to do the authentication; note that this creates # the listener subprocess via os.fork() - if self.start_listener(self.config.dvsni_port, key): + if self.start_listener(self.config.dvsni_port): return results_if_success else: # TODO: This should probably raise a DVAuthError exception @@ -411,8 +410,8 @@ class StandaloneAuthenticator(common.Plugin): # Remove this from pending tasks list for achall in achalls: assert isinstance(achall, achallenges.DVSNI) - if achall.nonce_domain in self.tasks: - del self.tasks[achall.nonce_domain] + if achall.token in self.tasks: + del self.tasks[achall.token] else: # Could not find the challenge to remove! raise ValueError("could not find the challenge to remove") diff --git a/letsencrypt/plugins/standalone/tests/authenticator_test.py b/letsencrypt/plugins/standalone/tests/authenticator_test.py index 45c485b5d..e43c81315 100644 --- a/letsencrypt/plugins/standalone/tests/authenticator_test.py +++ b/letsencrypt/plugins/standalone/tests/authenticator_test.py @@ -1,13 +1,10 @@ """Tests for letsencrypt.plugins.standalone.authenticator.""" import os -import pkg_resources import psutil import signal import socket import unittest -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization import mock import OpenSSL @@ -17,16 +14,14 @@ from acme import jose from letsencrypt import achallenges from letsencrypt.tests import acme_util +from letsencrypt.tests import test_util -KEY_PATH = pkg_resources.resource_filename( - "letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem")) -KEY_DATA = pkg_resources.resource_string( - "letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem")) -KEY = jose.JWKRSA(key=jose.ComparableRSAKey(serialization.load_pem_private_key( - KEY_DATA, password=None, backend=default_backend()))) -PRIVATE_KEY = OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, KEY_DATA) +ACCOUNT = mock.Mock(key=jose.JWKRSA.load( + test_util.load_vector("rsa512_key.pem"))) +CHALL_KEY_PEM = test_util.load_vector("rsa512_key_2.pem") +CHALL_KEY = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, CHALL_KEY_PEM) CONFIG = mock.Mock(dvsni_port=5001) @@ -80,9 +75,10 @@ class SNICallbackTest(unittest.TestCase): self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None) self.cert = achallenges.DVSNI( challb=acme_util.DVSNI_P, - domain="example.com", key=KEY).gen_cert_and_response()[0] - self.authenticator.private_key = PRIVATE_KEY - self.authenticator.tasks = {"abcdef.acme.invalid": self.cert} + domain="example.com", + account=ACCOUNT).gen_cert_and_response(key_pem=CHALL_KEY_PEM)[1] + self.authenticator.private_key = CHALL_KEY + self.authenticator.sni_names = {"abcdef.acme.invalid": self.cert} self.authenticator.child_pid = 12345 def test_real_servername(self): @@ -116,7 +112,7 @@ class ClientSignalHandlerTest(unittest.TestCase): from letsencrypt.plugins.standalone.authenticator import \ StandaloneAuthenticator self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None) - self.authenticator.tasks = {"foononce.acme.invalid": "stuff"} + self.authenticator.tasks = {"footoken.acme.invalid": "stuff"} self.authenticator.child_pid = 12345 def test_client_signal_handler(self): @@ -145,7 +141,7 @@ class SubprocSignalHandlerTest(unittest.TestCase): from letsencrypt.plugins.standalone.authenticator import \ StandaloneAuthenticator self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None) - self.authenticator.tasks = {"foononce.acme.invalid": "stuff"} + self.authenticator.tasks = {"footoken.acme.invalid": "stuff"} self.authenticator.child_pid = 12345 self.authenticator.parent_pid = 23456 @@ -303,12 +299,12 @@ class PerformTest(unittest.TestCase): self.achall1 = achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI(r="whee", nonce="foo"), "pending"), - domain="foo.example.com", key=KEY) + challenges.DVSNI(token=b"foo"), "pending"), + domain="foo.example.com", account=ACCOUNT) self.achall2 = achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI(r="whee", nonce="bar"), "pending"), - domain="bar.example.com", key=KEY) + challenges.DVSNI(token=b"bar"), "pending"), + domain="bar.example.com", account=ACCOUNT) bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge") self.achalls = [self.achall1, self.achall2, bad_achall] @@ -326,16 +322,16 @@ class PerformTest(unittest.TestCase): result = self.authenticator.perform(self.achalls) self.assertEqual(len(self.authenticator.tasks), 2) self.assertTrue( - self.authenticator.tasks.has_key(self.achall1.nonce_domain)) + self.authenticator.tasks.has_key(self.achall1.token)) self.assertTrue( - self.authenticator.tasks.has_key(self.achall2.nonce_domain)) + self.authenticator.tasks.has_key(self.achall2.token)) self.assertTrue(isinstance(result, list)) self.assertEqual(len(result), 3) self.assertTrue(isinstance(result[0], challenges.ChallengeResponse)) self.assertTrue(isinstance(result[1], challenges.ChallengeResponse)) self.assertFalse(result[2]) self.authenticator.start_listener.assert_called_once_with( - CONFIG.dvsni_port, KEY) + CONFIG.dvsni_port) def test_cannot_perform(self): """What happens if start_listener() returns False.""" @@ -345,17 +341,17 @@ class PerformTest(unittest.TestCase): result = self.authenticator.perform(self.achalls) self.assertEqual(len(self.authenticator.tasks), 2) self.assertTrue( - self.authenticator.tasks.has_key(self.achall1.nonce_domain)) + self.authenticator.tasks.has_key(self.achall1.token)) self.assertTrue( - self.authenticator.tasks.has_key(self.achall2.nonce_domain)) + self.authenticator.tasks.has_key(self.achall2.token)) self.assertTrue(isinstance(result, list)) self.assertEqual(len(result), 3) self.assertEqual(result, [None, None, False]) self.authenticator.start_listener.assert_called_once_with( - CONFIG.dvsni_port, KEY) + CONFIG.dvsni_port) def test_perform_with_pending_tasks(self): - self.authenticator.tasks = {"foononce.acme.invalid": "cert_data"} + self.authenticator.tasks = {"footoken.acme.invalid": "cert_data"} extra_achall = acme_util.DVSNI_P self.assertRaises( ValueError, self.authenticator.perform, [extra_achall]) @@ -384,7 +380,7 @@ class StartListenerTest(unittest.TestCase): self.authenticator.do_parent_process = mock.Mock() self.authenticator.do_parent_process.return_value = True mock_fork.return_value = 22222 - result = self.authenticator.start_listener(1717, "key") + result = self.authenticator.start_listener(1717) # start_listener is expected to return the True or False return # value from do_parent_process. self.assertTrue(result) @@ -396,10 +392,9 @@ class StartListenerTest(unittest.TestCase): self.authenticator.do_parent_process = mock.Mock() self.authenticator.do_child_process = mock.Mock() mock_fork.return_value = 0 - self.authenticator.start_listener(1717, "key") + self.authenticator.start_listener(1717) self.assertEqual(self.authenticator.child_pid, os.getpid()) - self.authenticator.do_child_process.assert_called_once_with( - 1717, "key") + self.authenticator.do_child_process.assert_called_once_with(1717) class DoParentProcessTest(unittest.TestCase): @@ -452,9 +447,10 @@ class DoChildProcessTest(unittest.TestCase): self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None) self.cert = achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI(r=("x" * 32), nonce="abcdef"), "pending"), - domain="example.com", key=KEY).gen_cert_and_response()[0] - self.authenticator.private_key = PRIVATE_KEY + challenges.DVSNI(token=b"abcdef"), "pending"), + domain="example.com", account=ACCOUNT).gen_cert_and_response( + key_pem=CHALL_KEY_PEM)[1] + self.authenticator.private_key = CHALL_KEY self.authenticator.tasks = {"abcdef.acme.invalid": self.cert} self.authenticator.parent_pid = 12345 @@ -475,7 +471,7 @@ class DoChildProcessTest(unittest.TestCase): # do_child_process code assumes that calling sys.exit() will # cause subsequent code not to be executed.) self.assertRaises( - IndentationError, self.authenticator.do_child_process, 1717, KEY) + IndentationError, self.authenticator.do_child_process, 1717) mock_exit.assert_called_once_with(1) mock_kill.assert_called_once_with(12345, signal.SIGUSR2) @@ -490,7 +486,7 @@ class DoChildProcessTest(unittest.TestCase): sample_socket.bind.side_effect = eaccess mock_socket.return_value = sample_socket self.assertRaises( - IndentationError, self.authenticator.do_child_process, 1717, KEY) + IndentationError, self.authenticator.do_child_process, 1717) mock_exit.assert_called_once_with(1) mock_kill.assert_called_once_with(12345, signal.SIGUSR1) @@ -506,7 +502,7 @@ class DoChildProcessTest(unittest.TestCase): sample_socket.bind.side_effect = eio mock_socket.return_value = sample_socket self.assertRaises( - socket.error, self.authenticator.do_child_process, 1717, KEY) + socket.error, self.authenticator.do_child_process, 1717) @mock.patch("letsencrypt.plugins.standalone.authenticator." "OpenSSL.SSL.Connection") @@ -519,7 +515,7 @@ class DoChildProcessTest(unittest.TestCase): mock_socket.return_value = sample_socket mock_connection.return_value = mock.MagicMock() self.assertRaises( - CallableExhausted, self.authenticator.do_child_process, 1717, KEY) + CallableExhausted, self.authenticator.do_child_process, 1717) mock_socket.assert_called_once_with() sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717)) sample_socket.listen.assert_called_once_with(1) @@ -538,9 +534,9 @@ class CleanupTest(unittest.TestCase): self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None) self.achall = achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI(r="whee", nonce="foononce"), "pending"), - domain="foo.example.com", key="key") - self.authenticator.tasks = {self.achall.nonce_domain: "stuff"} + challenges.DVSNI(token=b"footoken"), "pending"), + domain="foo.example.com", account=mock.Mock(key="key")) + self.authenticator.tasks = {self.achall.token: "stuff"} self.authenticator.child_pid = 12345 @mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill") @@ -558,8 +554,8 @@ class CleanupTest(unittest.TestCase): self.assertRaises( ValueError, self.authenticator.cleanup, [achallenges.DVSNI( challb=acme_util.chall_to_challb( - challenges.DVSNI(r="whee", nonce="badnonce"), "pending"), - domain="bad.example.com", key="key")]) + challenges.DVSNI(token=b"badtoken"), "pending"), + domain="bad.example.com", account=mock.Mock(key="key"))]) class MoreInfoTest(unittest.TestCase): diff --git a/letsencrypt/recovery_token.py b/letsencrypt/recovery_token.py deleted file mode 100644 index c5796d581..000000000 --- a/letsencrypt/recovery_token.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Recovery Token Identifier Validation Challenge.""" -import errno -import os - -import zope.component - -from acme import challenges - -from letsencrypt import le_util -from letsencrypt import interfaces - - -class RecoveryToken(object): - """Recovery Token Identifier Validation Challenge. - - Based on draft-barnes-acme, section 6.4. - - """ - def __init__(self, server, direc): - self.token_dir = os.path.join(direc, server) - - def perform(self, chall): - """Perform the Recovery Token Challenge. - - :param chall: Recovery Token Challenge - :type chall: :class:`letsencrypt.achallenges.RecoveryToken` - - :returns: response - :rtype: dict - - """ - token_fp = os.path.join(self.token_dir, chall.domain) - if os.path.isfile(token_fp): - with open(token_fp) as token_fd: - return challenges.RecoveryTokenResponse(token=token_fd.read()) - - cancel, token = zope.component.getUtility( - interfaces.IDisplay).input( - "%s - Input Recovery Token: " % chall.domain) - if cancel != 1: - return challenges.RecoveryTokenResponse(token=token) - - return None - - def cleanup(self, chall): - """Cleanup the saved recovery token if it exists. - - :param chall: Recovery Token Challenge - :type chall: :class:`letsencrypt.achallenges.RecoveryToken` - - """ - try: - le_util.safely_remove(os.path.join(self.token_dir, chall.domain)) - except OSError as err: - if err.errno != errno.ENOENT: - raise - - def requires_human(self, domain): - """Indicates whether or not domain can be auto solved.""" - return not os.path.isfile(os.path.join(self.token_dir, domain)) - - def store_token(self, domain, token): - """Store token for later automatic use. - - :param str domain: domain associated with the token - :param str token: token from authorization - - """ - le_util.make_or_verify_dir(self.token_dir, 0o700, os.geteuid()) - - with open(os.path.join(self.token_dir, domain), "w") as token_fd: - token_fd.write(str(token)) diff --git a/letsencrypt/tests/account_test.py b/letsencrypt/tests/account_test.py index e19940fe8..cd98e1e20 100644 --- a/letsencrypt/tests/account_test.py +++ b/letsencrypt/tests/account_test.py @@ -63,7 +63,6 @@ class ReportNewAccountTest(unittest.TestCase): def setUp(self): self.config = mock.MagicMock(config_dir="/etc/letsencrypt") reg = messages.Registration.from_data(email="rhino@jungle.io") - reg = reg.update(recovery_token="ECCENTRIC INVISIBILITY RHINOCEROS") self.acc = mock.MagicMock(regr=messages.RegistrationResource( uri=None, new_authzr_uri=None, body=reg)) @@ -81,7 +80,6 @@ class ReportNewAccountTest(unittest.TestCase): self._call() call_list = mock_zope().add_message.call_args_list self.assertTrue(self.config.config_dir in call_list[0][0][0]) - self.assertTrue(self.acc.regr.body.recovery_token in call_list[1][0][0]) self.assertTrue( ", ".join(self.acc.regr.body.emails) in call_list[1][0][0]) diff --git a/letsencrypt/tests/achallenges_test.py b/letsencrypt/tests/achallenges_test.py index 3d46c781a..ebf948604 100644 --- a/letsencrypt/tests/achallenges_test.py +++ b/letsencrypt/tests/achallenges_test.py @@ -1,10 +1,9 @@ """Tests for letsencrypt.achallenges.""" import unittest -import OpenSSL +import mock from acme import challenges -from acme import crypto_util as acme_crypto_util from acme import jose from letsencrypt.tests import acme_util @@ -15,28 +14,21 @@ class DVSNITest(unittest.TestCase): """Tests for letsencrypt.achallenges.DVSNI.""" def setUp(self): - self.chall = acme_util.chall_to_challb( - challenges.DVSNI(r="r_value", nonce="12345ABCDE"), "pending") - self.response = challenges.DVSNIResponse() - key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem")) - + self.challb = acme_util.chall_to_challb(acme_util.DVSNI, "pending") + account = mock.Mock(key=jose.JWKRSA.load( + test_util.load_vector("rsa512_key.pem"))) from letsencrypt.achallenges import DVSNI - self.achall = DVSNI(challb=self.chall, domain="example.com", key=key) + self.achall = DVSNI( + challb=self.challb, domain="example.com", account=account) def test_proxy(self): - self.assertEqual(self.chall.r, self.achall.r) - self.assertEqual(self.chall.nonce, self.achall.nonce) + self.assertEqual(self.challb.token, self.achall.token) def test_gen_cert_and_response(self): - cert_pem, _ = self.achall.gen_cert_and_response(s=self.response.s) - - cert = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, cert_pem) - self.assertEqual(cert.get_subject().CN, "example.com") - # pylint: disable=protected-access - self.assertEqual(acme_crypto_util._pyopenssl_cert_or_req_san(cert), [ - "example.com", self.chall.nonce_domain, - self.response.z_domain(self.chall)]) + response, cert_pem, key_pem = self.achall.gen_cert_and_response() + self.assertTrue(isinstance(response, challenges.DVSNIResponse)) + self.assertTrue(isinstance(cert_pem, bytes)) + self.assertTrue(isinstance(key_pem, bytes)) if __name__ == "__main__": diff --git a/letsencrypt/tests/acme_util.py b/letsencrypt/tests/acme_util.py index 0f504fde3..33bf605e0 100644 --- a/letsencrypt/tests/acme_util.py +++ b/letsencrypt/tests/acme_util.py @@ -15,14 +15,12 @@ KEY = test_util.load_rsa_private_key('rsa512_key.pem') SIMPLE_HTTP = challenges.SimpleHTTP( token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA") DVSNI = challenges.DVSNI( - r=jose.b64decode("Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI"), - nonce=jose.b64decode("a82d5ff8ef740d12881f6d3c2277ab2e")) + token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA")) DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a") RECOVERY_CONTACT = challenges.RecoveryContact( activation_url="https://example.ca/sendrecovery/a5bd99383fb0", success_url="https://example.ca/confirmrecovery/bb1b9928932", contact="c********n@example.com") -RECOVERY_TOKEN = challenges.RecoveryToken() POP = challenges.ProofOfPossession( alg="RS256", nonce=jose.b64decode("eET5udtV7aoX8Xl8gYiZIA"), hints=challenges.ProofOfPossession.Hints( @@ -43,7 +41,7 @@ POP = challenges.ProofOfPossession( ) ) -CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, RECOVERY_TOKEN, POP] +CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, POP] DV_CHALLENGES = [chall for chall in CHALLENGES if isinstance(chall, challenges.DVChallenge)] CONT_CHALLENGES = [chall for chall in CHALLENGES @@ -85,11 +83,9 @@ DVSNI_P = chall_to_challb(DVSNI, messages.STATUS_PENDING) SIMPLE_HTTP_P = chall_to_challb(SIMPLE_HTTP, messages.STATUS_PENDING) DNS_P = chall_to_challb(DNS, messages.STATUS_PENDING) RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages.STATUS_PENDING) -RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages.STATUS_PENDING) POP_P = chall_to_challb(POP, messages.STATUS_PENDING) -CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P, - RECOVERY_CONTACT_P, RECOVERY_TOKEN_P, POP_P] +CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P, RECOVERY_CONTACT_P, POP_P] DV_CHALLENGES_P = [challb for challb in CHALLENGES_P if isinstance(challb.chall, challenges.DVChallenge)] CONT_CHALLENGES_P = [ diff --git a/letsencrypt/tests/auth_handler_test.py b/letsencrypt/tests/auth_handler_test.py index 1cb40d5d9..4e1ee85e6 100644 --- a/letsencrypt/tests/auth_handler_test.py +++ b/letsencrypt/tests/auth_handler_test.py @@ -19,7 +19,6 @@ TRANSLATE = { "dvsni": "DVSNI", "simpleHttp": "SimpleHTTP", "dns": "DNS", - "recoveryToken": "RecoveryToken", "recoveryContact": "RecoveryContact", "proofOfPossession": "ProofOfPossession", } @@ -41,7 +40,8 @@ class ChallengeFactoryTest(unittest.TestCase): [messages.STATUS_PENDING]*6, False) def test_all(self): - cont_c, dv_c = self.handler._challenge_factory(self.dom, range(0, 6)) + cont_c, dv_c = self.handler._challenge_factory( + self.dom, range(0, len(acme_util.CHALLENGES))) self.assertEqual( [achall.chall for achall in cont_c], acme_util.CONT_CHALLENGES) @@ -49,10 +49,10 @@ class ChallengeFactoryTest(unittest.TestCase): [achall.chall for achall in dv_c], acme_util.DV_CHALLENGES) def test_one_dv_one_cont(self): - cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 4]) + cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 3]) self.assertEqual( - [achall.chall for achall in cont_c], [acme_util.RECOVERY_TOKEN]) + [achall.chall for achall in cont_c], [acme_util.RECOVERY_CONTACT]) self.assertEqual([achall.chall for achall in dv_c], [acme_util.DVSNI]) def test_unrecognized(self): @@ -80,7 +80,7 @@ class GetAuthorizationsTest(unittest.TestCase): self.mock_dv_auth.get_chall_pref.return_value = [challenges.DVSNI] self.mock_cont_auth.get_chall_pref.return_value = [ - challenges.RecoveryToken] + challenges.RecoveryContact] self.mock_cont_auth.perform.side_effect = gen_auth_resp self.mock_dv_auth.perform.side_effect = gen_auth_resp @@ -196,7 +196,7 @@ class PollChallengesTest(unittest.TestCase): self.chall_update = {} for dom in self.doms: self.chall_update[dom] = [ - challb_to_achall(challb, "dummy_key", dom) + challb_to_achall(challb, mock.Mock(key="dummy_key"), dom) for challb in self.handler.authzr[dom].body.challenges] @mock.patch("letsencrypt.auth_handler.time") @@ -313,11 +313,11 @@ class GenChallengePathTest(unittest.TestCase): self.assertTrue(self._call(challbs[::-1], prefs, None)) def test_common_case_with_continuity(self): - challbs = (acme_util.RECOVERY_TOKEN_P, + challbs = (acme_util.POP_P, acme_util.RECOVERY_CONTACT_P, acme_util.DVSNI_P, acme_util.SIMPLE_HTTP_P) - prefs = [challenges.RecoveryToken, challenges.DVSNI] + prefs = [challenges.ProofOfPossession, challenges.DVSNI] combos = acme_util.gen_combos(challbs) self.assertEqual(self._call(challbs, prefs, combos), (0, 2)) @@ -325,21 +325,19 @@ class GenChallengePathTest(unittest.TestCase): self.assertTrue(self._call(challbs, prefs, None)) def test_full_cont_server(self): - challbs = (acme_util.RECOVERY_TOKEN_P, - acme_util.RECOVERY_CONTACT_P, + challbs = (acme_util.RECOVERY_CONTACT_P, acme_util.POP_P, acme_util.DVSNI_P, acme_util.SIMPLE_HTTP_P, acme_util.DNS_P) # Typical webserver client that can do everything except DNS # Attempted to make the order realistic - prefs = [challenges.RecoveryToken, - challenges.ProofOfPossession, + prefs = [challenges.ProofOfPossession, challenges.SimpleHTTP, challenges.DVSNI, challenges.RecoveryContact] combos = acme_util.gen_combos(challbs) - self.assertEqual(self._call(challbs, prefs, combos), (0, 4)) + self.assertEqual(self._call(challbs, prefs, combos), (1, 3)) # Dumb path trivial test self.assertTrue(self._call(challbs, prefs, None)) @@ -444,13 +442,13 @@ class ReportFailedChallsTest(unittest.TestCase): self.dvsni_same = achallenges.DVSNI( challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args domain="example.com", - key=acme_util.KEY) + account=mock.Mock(key=acme_util.KEY)) kwargs["error"] = messages.Error(typ="dnssec", detail="detail") self.dvsni_diff = achallenges.DVSNI( challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args domain="foo.bar", - key=acme_util.KEY) + account=mock.Mock(key=acme_util.KEY)) @mock.patch("letsencrypt.auth_handler.zope.component.getUtility") def test_same_error_and_domain(self, mock_zope): diff --git a/letsencrypt/tests/configuration_test.py b/letsencrypt/tests/configuration_test.py index 82e82d520..498147c6d 100644 --- a/letsencrypt/tests/configuration_test.py +++ b/letsencrypt/tests/configuration_test.py @@ -36,7 +36,6 @@ class NamespaceConfigTest(unittest.TestCase): constants.CERT_DIR = 'certs' constants.IN_PROGRESS_DIR = '../p' constants.KEY_DIR = 'keys' - constants.REC_TOKEN_DIR = '/r' constants.TEMP_CHECKPOINT_DIR = 't' self.assertEqual( @@ -47,7 +46,6 @@ class NamespaceConfigTest(unittest.TestCase): self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new') self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p') self.assertEqual(self.config.key_dir, '/tmp/config/keys') - self.assertEqual(self.config.rec_token_dir, '/r') self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t') diff --git a/letsencrypt/tests/continuity_auth_test.py b/letsencrypt/tests/continuity_auth_test.py index 509dc8bdf..67d27d04d 100644 --- a/letsencrypt/tests/continuity_auth_test.py +++ b/letsencrypt/tests/continuity_auth_test.py @@ -17,54 +17,30 @@ class PerformTest(unittest.TestCase): self.auth = ContinuityAuthenticator( mock.MagicMock(server="demo_server.org"), None) - self.auth.rec_token.perform = mock.MagicMock( - name="rec_token_perform", side_effect=gen_client_resp) self.auth.proof_of_pos.perform = mock.MagicMock( name="proof_of_pos_perform", side_effect=gen_client_resp) - def test_rec_token1(self): - token = achallenges.RecoveryToken(challb=None, domain="0") - responses = self.auth.perform([token]) - self.assertEqual(responses, ["RecoveryToken0"]) - - def test_rec_token5(self): - tokens = [] - for i in xrange(5): - tokens.append(achallenges.RecoveryToken(challb=None, domain=str(i))) - - responses = self.auth.perform(tokens) - - self.assertEqual(len(responses), 5) - for i in xrange(5): - self.assertEqual(responses[i], "RecoveryToken%d" % i) - - def test_pop_and_rec_token(self): + def test_pop(self): achalls = [] for i in xrange(4): - if i % 2 == 0: - achalls.append(achallenges.RecoveryToken(challb=None, - domain=str(i))) - else: - achalls.append(achallenges.ProofOfPossession(challb=None, - domain=str(i))) + achalls.append(achallenges.ProofOfPossession( + challb=None, domain=str(i))) responses = self.auth.perform(achalls) self.assertEqual(len(responses), 4) for i in xrange(4): - if i % 2 == 0: - self.assertEqual(responses[i], "RecoveryToken%d" % i) - else: - self.assertEqual(responses[i], "ProofOfPossession%d" % i) + self.assertEqual(responses[i], "ProofOfPossession%d" % i) def test_unexpected(self): self.assertRaises( errors.ContAuthError, self.auth.perform, [ - achallenges.DVSNI(challb=None, domain="0", key="invalid_key")]) + achallenges.DVSNI(challb=None, domain="0", + account=mock.Mock(key="invalid_key"))]) def test_chall_pref(self): self.assertEqual( self.auth.get_chall_pref("example.com"), - [challenges.ProofOfPossession, challenges.RecoveryToken]) + [challenges.ProofOfPossession]) class CleanupTest(unittest.TestCase): @@ -75,24 +51,11 @@ class CleanupTest(unittest.TestCase): self.auth = ContinuityAuthenticator( mock.MagicMock(server="demo_server.org"), None) - self.mock_cleanup = mock.MagicMock(name="rec_token_cleanup") - self.auth.rec_token.cleanup = self.mock_cleanup - - def test_rec_token2(self): - token1 = achallenges.RecoveryToken(challb=None, domain="0") - token2 = achallenges.RecoveryToken(challb=None, domain="1") - - self.auth.cleanup([token1, token2]) - - self.assertEqual(self.mock_cleanup.call_args_list, - [mock.call(token1), mock.call(token2)]) def test_unexpected(self): - token = achallenges.RecoveryToken(challb=None, domain="0") - unexpected = achallenges.DVSNI(challb=None, domain="0", key="dummy_key") - - self.assertRaises( - errors.ContAuthError, self.auth.cleanup, [token, unexpected]) + unexpected = achallenges.DVSNI( + challb=None, domain="0", account=mock.Mock("dummy_key")) + self.assertRaises(errors.ContAuthError, self.auth.cleanup, [unexpected]) def gen_client_resp(chall): diff --git a/letsencrypt/tests/le_util_test.py b/letsencrypt/tests/le_util_test.py index 6a6ad3a54..98b7eb803 100644 --- a/letsencrypt/tests/le_util_test.py +++ b/letsencrypt/tests/le_util_test.py @@ -227,6 +227,37 @@ class UniqueLineageNameTest(unittest.TestCase): self.assertRaises(OSError, self._call, "wow") +class SafelyRemoveTest(unittest.TestCase): + """Tests for letsencrypt.le_util.safely_remove.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.path = os.path.join(self.tmp, "foo") + + def tearDown(self): + shutil.rmtree(self.tmp) + + def _call(self): + from letsencrypt.le_util import safely_remove + return safely_remove(self.path) + + def test_exists(self): + with open(self.path, "w"): + pass # just create the file + self._call() + self.assertFalse(os.path.exists(self.path)) + + def test_missing(self): + self._call() + # no error, yay! + self.assertFalse(os.path.exists(self.path)) + + @mock.patch("letsencrypt.le_util.os.remove") + def test_other_error_passthrough(self, mock_remove): + mock_remove.side_effect = OSError + self.assertRaises(OSError, self._call) + + class SafeEmailTest(unittest.TestCase): """Test safe_email.""" @classmethod diff --git a/letsencrypt/tests/recovery_token_test.py b/letsencrypt/tests/recovery_token_test.py deleted file mode 100644 index 8e767d3cf..000000000 --- a/letsencrypt/tests/recovery_token_test.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Tests for recovery_token.py.""" -import os -import unittest -import shutil -import tempfile - -import mock - -from acme import challenges - -from letsencrypt import achallenges - - -class RecoveryTokenTest(unittest.TestCase): - def setUp(self): - from letsencrypt.recovery_token import RecoveryToken - server = "demo_server" - self.base_dir = tempfile.mkdtemp("tokens") - self.token_dir = os.path.join(self.base_dir, server) - self.rec_token = RecoveryToken(server, self.base_dir) - - def tearDown(self): - shutil.rmtree(self.base_dir) - - def test_store_token(self): - self.rec_token.store_token("example.com", 111) - path = os.path.join(self.token_dir, "example.com") - self.assertTrue(os.path.isfile(path)) - with open(path) as token_fd: - self.assertEqual(token_fd.read(), "111") - - def test_requires_human(self): - self.rec_token.store_token("example2.com", 222) - self.assertFalse(self.rec_token.requires_human("example2.com")) - self.assertTrue(self.rec_token.requires_human("example3.com")) - - def test_cleanup(self): - self.rec_token.store_token("example3.com", 333) - self.assertFalse(self.rec_token.requires_human("example3.com")) - - self.rec_token.cleanup(achallenges.RecoveryToken( - challb=challenges.RecoveryToken(), domain="example3.com")) - self.assertTrue(self.rec_token.requires_human("example3.com")) - - # Shouldn't throw an error - self.rec_token.cleanup(achallenges.RecoveryToken( - challb=None, domain="example4.com")) - - # SHOULD throw an error (OSError other than nonexistent file) - self.assertRaises( - OSError, self.rec_token.cleanup, - achallenges.RecoveryToken( - challb=None, domain=("a" + "r" * 10000 + ".com"))) - - def test_perform_stored(self): - self.rec_token.store_token("example4.com", 444) - response = self.rec_token.perform( - achallenges.RecoveryToken( - challb=challenges.RecoveryToken(), domain="example4.com")) - - self.assertEqual( - response, challenges.RecoveryTokenResponse(token="444")) - - @mock.patch("letsencrypt.recovery_token.zope.component.getUtility") - def test_perform_not_stored(self, mock_input): - mock_input().input.side_effect = [(0, "555"), (1, "000")] - response = self.rec_token.perform( - achallenges.RecoveryToken( - challb=challenges.RecoveryToken(), domain="example5.com")) - self.assertEqual( - response, challenges.RecoveryTokenResponse(token="555")) - - response = self.rec_token.perform( - achallenges.RecoveryToken( - challb=challenges.RecoveryToken(), domain="example6.com")) - self.assertTrue(response is None) - - -if __name__ == "__main__": - unittest.main() # pragma: no cover