mirror of
https://github.com/certbot/certbot.git
synced 2026-01-19 13:24:57 +03:00
Merge remote-tracking branch 'kuba/update-challenges' into update-challenges
This commit is contained in:
@@ -1,13 +1,9 @@
|
||||
"""ACME Identifier Validation Challenges."""
|
||||
import binascii
|
||||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography import x509
|
||||
import OpenSSL
|
||||
import requests
|
||||
|
||||
@@ -54,43 +50,45 @@ class SimpleHTTP(DVChallenge):
|
||||
|
||||
"""
|
||||
typ = "simpleHttp"
|
||||
token = jose.Field("token")
|
||||
|
||||
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
|
||||
"""Minimum size of the :attr:`token` in bytes."""
|
||||
|
||||
# TODO: acme-spec doesn't specify token as base64-encoded value
|
||||
token = jose.Field(
|
||||
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
|
||||
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
|
||||
|
||||
@property
|
||||
def good_token(self): # XXX: @token.decoder
|
||||
"""Is `token` good?
|
||||
|
||||
.. todo:: acme-spec wants "It MUST NOT contain any non-ASCII
|
||||
characters", but it should also warrant that it doesn't
|
||||
contain ".." or "/"...
|
||||
|
||||
"""
|
||||
# TODO: check that path combined with uri does not go above
|
||||
# URI_ROOT_PATH!
|
||||
return b'..' not in self.token and b'/' not in self.token
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class SimpleHTTPResponse(ChallengeResponse):
|
||||
"""ACME "simpleHttp" challenge response.
|
||||
|
||||
:ivar unicode path:
|
||||
:ivar unicode tls:
|
||||
:ivar bool tls:
|
||||
|
||||
"""
|
||||
typ = "simpleHttp"
|
||||
path = jose.Field("path")
|
||||
tls = jose.Field("tls", default=True, omitempty=True)
|
||||
|
||||
URI_ROOT_PATH = ".well-known/acme-challenge"
|
||||
"""URI root path for the server provisioned resource."""
|
||||
|
||||
_URI_TEMPLATE = "{scheme}://{domain}/" + URI_ROOT_PATH + "/{path}"
|
||||
_URI_TEMPLATE = "{scheme}://{domain}/" + URI_ROOT_PATH + "/{token}"
|
||||
|
||||
MAX_PATH_LEN = 25
|
||||
"""Maximum allowed `path` length."""
|
||||
|
||||
CONTENT_TYPE = "text/plain"
|
||||
|
||||
@property
|
||||
def good_path(self):
|
||||
"""Is `path` good?
|
||||
|
||||
.. todo:: acme-spec: "The value MUST be comprised entirely of
|
||||
characters from the URL-safe alphabet for Base64 encoding
|
||||
[RFC4648]", base64.b64decode ignores those characters
|
||||
|
||||
"""
|
||||
# TODO: check that path combined with uri does not go above
|
||||
# URI_ROOT_PATH!
|
||||
return len(self.path) <= 25
|
||||
CONTENT_TYPE = "application/jose+json"
|
||||
|
||||
@property
|
||||
def scheme(self):
|
||||
@@ -102,19 +100,73 @@ class SimpleHTTPResponse(ChallengeResponse):
|
||||
"""Port that the ACME client should be listening for validation."""
|
||||
return 443 if self.tls else 80
|
||||
|
||||
def uri(self, domain):
|
||||
def uri(self, domain, chall):
|
||||
"""Create an URI to the provisioned resource.
|
||||
|
||||
Forms an URI to the HTTPS server provisioned resource
|
||||
(containing :attr:`~SimpleHTTP.token`).
|
||||
|
||||
:param unicode domain: Domain name being verified.
|
||||
:param challenges.SimpleHTTP chall:
|
||||
|
||||
"""
|
||||
return self._URI_TEMPLATE.format(
|
||||
scheme=self.scheme, domain=domain, path=self.path)
|
||||
scheme=self.scheme, domain=domain, token=chall.encode("token"))
|
||||
|
||||
def simple_verify(self, chall, domain, port=None):
|
||||
def gen_resource(self, chall):
|
||||
"""Generate provisioned resource.
|
||||
|
||||
:param .SimpleHTTP chall:
|
||||
:rtype: SimpleHTTPProvisionedResource
|
||||
|
||||
"""
|
||||
return SimpleHTTPProvisionedResource(token=chall.token, tls=self.tls)
|
||||
|
||||
def gen_validation(self, chall, account_key, alg=jose.RS256, **kwargs):
|
||||
"""Generate validation.
|
||||
|
||||
:param .SimpleHTTP chall:
|
||||
:param .JWK account_key: Private account key.
|
||||
:param .JWA alg:
|
||||
|
||||
:returns: `.SimpleHTTPProvisionedResource` signed in `.JWS`
|
||||
:rtype: .JWS
|
||||
|
||||
"""
|
||||
return jose.JWS.sign(
|
||||
payload=self.gen_resource(chall).json_dumps(
|
||||
sort_keys=True).encode('utf-8'),
|
||||
key=account_key, alg=alg, **kwargs)
|
||||
|
||||
def check_validation(self, validation, chall, account_public_key):
|
||||
"""Check validation.
|
||||
|
||||
:param .JWS validation:
|
||||
:param .SimpleHTTP chall:
|
||||
:type account_public_key:
|
||||
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
|
||||
wrapped in `.ComparableKey
|
||||
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not validation.verify(key=account_public_key):
|
||||
return False
|
||||
|
||||
try:
|
||||
resource = SimpleHTTPProvisionedResource.json_loads(
|
||||
validation.payload.decode('utf-8'))
|
||||
except jose.DeserializationError as error:
|
||||
logger.debug(error)
|
||||
return False
|
||||
|
||||
return resource.token == chall.token and resource.tls == self.tls
|
||||
|
||||
def simple_verify(self, chall, domain, account_public_key, port=None):
|
||||
"""Simple verify.
|
||||
|
||||
According to the ACME specification, "the ACME server MUST
|
||||
@@ -123,6 +175,16 @@ class SimpleHTTPResponse(ChallengeResponse):
|
||||
|
||||
:param .SimpleHTTP chall: Corresponding challenge.
|
||||
:param unicode domain: Domain name being verified.
|
||||
:param account_public_key: Public key for the key pair
|
||||
being authorized. If ``None`` key verification is not
|
||||
performed!
|
||||
:type account_public_key:
|
||||
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
|
||||
wrapped in `.ComparableKey
|
||||
:param int port: Port used in the validation.
|
||||
|
||||
:returns: ``True`` iff validation is successful, ``False``
|
||||
@@ -138,76 +200,67 @@ class SimpleHTTPResponse(ChallengeResponse):
|
||||
"Using non-standard port for SimpleHTTP verification: %s", port)
|
||||
domain += ":{0}".format(port)
|
||||
|
||||
uri = self.uri(domain)
|
||||
uri = self.uri(domain, chall)
|
||||
logger.debug("Verifying %s at %s...", chall.typ, uri)
|
||||
try:
|
||||
http_response = requests.get(uri, verify=False)
|
||||
except requests.exceptions.RequestException as error:
|
||||
logger.error("Unable to reach %s: %s", uri, error)
|
||||
return False
|
||||
logger.debug(
|
||||
"Received %s. Headers: %s", http_response, http_response.headers)
|
||||
logger.debug("Received %s: %s. Headers: %s", http_response,
|
||||
http_response.text, http_response.headers)
|
||||
|
||||
good_token = http_response.text == chall.token
|
||||
if not good_token:
|
||||
logger.error(
|
||||
"Unable to verify %s! Expected: %r, returned: %r.",
|
||||
uri, chall.token, http_response.text)
|
||||
# TODO: spec contradicts itself, c.f.
|
||||
# https://github.com/letsencrypt/acme-spec/pull/156/files#r33136438
|
||||
good_ct = self.CONTENT_TYPE == http_response.headers.get(
|
||||
"Content-Type", self.CONTENT_TYPE)
|
||||
return self.good_path and good_ct and good_token
|
||||
if self.CONTENT_TYPE != http_response.headers.get(
|
||||
"Content-Type", self.CONTENT_TYPE):
|
||||
return False
|
||||
|
||||
try:
|
||||
validation = jose.JWS.json_loads(http_response.text)
|
||||
except jose.DeserializationError as error:
|
||||
logger.debug(error)
|
||||
return False
|
||||
|
||||
return self.check_validation(validation, chall, account_public_key)
|
||||
|
||||
|
||||
class SimpleHTTPProvisionedResource(jose.JSONObjectWithFields):
|
||||
"""SimpleHTTP provisioned resource."""
|
||||
typ = fields.Fixed("type", SimpleHTTP.typ)
|
||||
token = SimpleHTTP._fields["token"]
|
||||
# If the "tls" field is not included in the response, then
|
||||
# validation object MUST have its "tls" field set to "true".
|
||||
tls = jose.Field("tls", omitempty=False)
|
||||
|
||||
|
||||
@Challenge.register
|
||||
class DVSNI(DVChallenge):
|
||||
"""ACME "dvsni" challenge.
|
||||
|
||||
:ivar bytes r: Random data, **not** base64-encoded.
|
||||
:ivar bytes nonce: Random data, **not** hex-encoded.
|
||||
:ivar bytes token: Random data, **not** base64-encoded.
|
||||
|
||||
"""
|
||||
typ = "dvsni"
|
||||
|
||||
DOMAIN_SUFFIX = b".acme.invalid"
|
||||
"""Domain name suffix."""
|
||||
|
||||
R_SIZE = 32
|
||||
"""Required size of the :attr:`r` in bytes."""
|
||||
|
||||
NONCE_SIZE = 16
|
||||
"""Required size of the :attr:`nonce` in bytes."""
|
||||
|
||||
PORT = 443
|
||||
"""Port to perform DVSNI challenge."""
|
||||
|
||||
r = jose.Field("r", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
|
||||
decoder=functools.partial(jose.decode_b64jose, size=R_SIZE))
|
||||
nonce = jose.Field("nonce", encoder=jose.encode_hex16,
|
||||
decoder=functools.partial(functools.partial(
|
||||
jose.decode_hex16, size=NONCE_SIZE)))
|
||||
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
|
||||
"""Minimum size of the :attr:`token` in bytes."""
|
||||
|
||||
@property
|
||||
def nonce_domain(self):
|
||||
"""Domain name used in SNI.
|
||||
token = jose.Field(
|
||||
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
|
||||
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
|
||||
|
||||
:rtype: bytes
|
||||
def gen_response(self, account_key, alg=jose.RS256, **kwargs):
|
||||
"""Generate response.
|
||||
|
||||
:param .JWK account_key: Private account key.
|
||||
:rtype: .DVSNIResponse
|
||||
|
||||
"""
|
||||
return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX
|
||||
|
||||
def probe_cert(self, domain, **kwargs):
|
||||
"""Probe DVSNI challenge certificate."""
|
||||
host = socket.gethostbyname(domain)
|
||||
logging.debug('%s resolved to %s', domain, host)
|
||||
|
||||
kwargs.setdefault("host", host)
|
||||
kwargs.setdefault("port", self.PORT)
|
||||
kwargs["name"] = self.nonce_domain
|
||||
# TODO: try different methods?
|
||||
# pylint: disable=protected-access
|
||||
return crypto_util._probe_sni(**kwargs)
|
||||
return DVSNIResponse(validation=jose.JWS.sign(
|
||||
payload=self.json_dumps(sort_keys=True).encode('utf-8'),
|
||||
key=account_key, alg=alg, **kwargs))
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
@@ -219,105 +272,137 @@ class DVSNIResponse(ChallengeResponse):
|
||||
"""
|
||||
typ = "dvsni"
|
||||
|
||||
DOMAIN_SUFFIX = DVSNI.DOMAIN_SUFFIX
|
||||
DOMAIN_SUFFIX = b".acme.invalid"
|
||||
"""Domain name suffix."""
|
||||
|
||||
S_SIZE = 32
|
||||
"""Required size of the :attr:`s` in bytes."""
|
||||
PORT = DVSNI.PORT
|
||||
"""Port to perform DVSNI challenge."""
|
||||
|
||||
s = jose.Field("s", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
|
||||
decoder=functools.partial(jose.decode_b64jose, size=S_SIZE))
|
||||
validation = jose.Field("validation", decoder=jose.JWS.from_json)
|
||||
|
||||
def __init__(self, s=None, *args, **kwargs):
|
||||
s = os.urandom(self.S_SIZE) if s is None else s
|
||||
super(DVSNIResponse, self).__init__(s=s, *args, **kwargs)
|
||||
|
||||
def z(self, chall): # pylint: disable=invalid-name
|
||||
"""Compute the parameter ``z``.
|
||||
|
||||
:param challenge: Corresponding challenge.
|
||||
:type challenge: :class:`DVSNI`
|
||||
@property
|
||||
def z(self): # pylint: disable=invalid-name
|
||||
"""The ``z`` parameter.
|
||||
|
||||
:rtype: bytes
|
||||
|
||||
"""
|
||||
z = hashlib.new("sha256") # pylint: disable=invalid-name
|
||||
z.update(chall.r)
|
||||
z.update(self.s)
|
||||
return z.hexdigest().encode()
|
||||
# Instance of 'Field' has no 'signature' member
|
||||
# pylint: disable=no-member
|
||||
return hashlib.sha256(self.validation.signature.encode(
|
||||
"signature").encode("utf-8")).hexdigest().encode()
|
||||
|
||||
def z_domain(self, chall):
|
||||
@property
|
||||
def z_domain(self):
|
||||
"""Domain name for certificate subjectAltName.
|
||||
|
||||
:rtype bytes:
|
||||
:rtype: bytes
|
||||
|
||||
"""
|
||||
return self.z(chall) + self.DOMAIN_SUFFIX
|
||||
z = self.z # pylint: disable=invalid-name
|
||||
return z[:32] + b'.' + z[32:] + self.DOMAIN_SUFFIX
|
||||
|
||||
def gen_cert(self, chall, domain, key):
|
||||
@property
|
||||
def chall(self):
|
||||
"""Get challenge encoded in the `validation` payload.
|
||||
|
||||
:rtype: DVSNI
|
||||
|
||||
"""
|
||||
# pylint: disable=no-member
|
||||
return DVSNI.json_loads(self.validation.payload.decode('utf-8'))
|
||||
|
||||
def gen_cert(self, key=None, bits=2048):
|
||||
"""Generate DVSNI certificate.
|
||||
|
||||
:param .DVSNI chall: Corresponding challenge.
|
||||
:param unicode domain:
|
||||
:param OpenSSL.crypto.PKey
|
||||
:param OpenSSL.crypto.PKey key: Optional private key used in
|
||||
certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
:param int bits: Number of bits for newly generated key.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and
|
||||
`OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
if key is None:
|
||||
key = OpenSSL.crypto.PKey()
|
||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
|
||||
return crypto_util.gen_ss_cert(key, [
|
||||
domain, chall.nonce_domain.decode(), self.z_domain(chall).decode()])
|
||||
# z_domain is too big to fit into CN, hence first dummy domain
|
||||
'dummy', self.z_domain.decode()], force_san=True), key
|
||||
|
||||
def simple_verify(self, chall, domain, public_key, **kwargs):
|
||||
def probe_cert(self, domain, **kwargs):
|
||||
"""Probe DVSNI challenge certificate.
|
||||
|
||||
:param unicode domain:
|
||||
|
||||
"""
|
||||
host = socket.gethostbyname(domain)
|
||||
logging.debug('%s resolved to %s', domain, host)
|
||||
|
||||
kwargs.setdefault("host", host)
|
||||
kwargs.setdefault("port", self.PORT)
|
||||
kwargs["name"] = self.z_domain
|
||||
# TODO: try different methods?
|
||||
# pylint: disable=protected-access
|
||||
return crypto_util._probe_sni(**kwargs)
|
||||
|
||||
def verify_cert(self, cert):
|
||||
"""Verify DVSNI challenge certificate."""
|
||||
# pylint: disable=protected-access
|
||||
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
|
||||
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
|
||||
return self.z_domain.decode() in sans
|
||||
|
||||
def simple_verify(self, chall, domain, account_public_key,
|
||||
cert=None, **kwargs):
|
||||
"""Simple verify.
|
||||
|
||||
Probes DVSNI certificate and checks it using `verify_cert`;
|
||||
hence all arguments documented in `verify_cert`.
|
||||
|
||||
"""
|
||||
try:
|
||||
cert = chall.probe_cert(domain=domain, **kwargs)
|
||||
except errors.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
return self.verify_cert(chall, domain, public_key, cert)
|
||||
|
||||
def verify_cert(self, chall, domain, public_key, cert):
|
||||
"""Verify DVSNI certificate.
|
||||
Verify ``validation`` using ``account_public_key``, optionally
|
||||
probe DVSNI certificate and check using `verify_cert`.
|
||||
|
||||
:param .challenges.DVSNI chall: Corresponding challenge.
|
||||
:param str domain: Domain name being validated.
|
||||
:param public_key: Public key for the key pair
|
||||
being authorized. If ``None`` key verification is not
|
||||
performed!
|
||||
:type public_key:
|
||||
:type account_public_key:
|
||||
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
|
||||
wrapped in `.ComparableKey
|
||||
:param OpenSSL.crypto.X509 cert:
|
||||
:param OpenSSL.crypto.X509 cert: Optional certificate. If not
|
||||
provided (``None``) certificate will be retrieved using
|
||||
`probe_cert`.
|
||||
|
||||
:returns: ``True`` iff client's control of the domain has been
|
||||
verified, ``False`` otherwise.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
# TODO: check "It is a valid self-signed certificate" and
|
||||
# return False if not
|
||||
|
||||
# pylint: disable=protected-access
|
||||
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
|
||||
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
|
||||
|
||||
cert = x509.load_der_x509_certificate(
|
||||
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert),
|
||||
default_backend())
|
||||
|
||||
if public_key is None:
|
||||
logging.warn('No key verification is performed')
|
||||
elif public_key != jose.ComparableKey(cert.public_key()):
|
||||
# pylint: disable=no-member
|
||||
if not self.validation.verify(key=account_public_key):
|
||||
return False
|
||||
|
||||
return domain in sans and self.z_domain(chall).decode() in sans
|
||||
# TODO: it's not checked that payload has exectly 2 fields!
|
||||
try:
|
||||
decoded_chall = self.chall
|
||||
except jose.DeserializationError as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
|
||||
if decoded_chall.token != chall.token:
|
||||
logger.debug("Wrong token: expected %r, found %r",
|
||||
chall.token, decoded_chall.token)
|
||||
return False
|
||||
|
||||
if cert is None:
|
||||
try:
|
||||
cert = self.probe_cert(domain=domain, **kwargs)
|
||||
except errors.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
|
||||
return self.verify_cert(cert)
|
||||
|
||||
|
||||
@Challenge.register
|
||||
@@ -347,23 +432,6 @@ class RecoveryContactResponse(ChallengeResponse):
|
||||
token = jose.Field("token", omitempty=True)
|
||||
|
||||
|
||||
@Challenge.register
|
||||
class RecoveryToken(ContinuityChallenge):
|
||||
"""ACME "recoveryToken" challenge."""
|
||||
typ = "recoveryToken"
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class RecoveryTokenResponse(ChallengeResponse):
|
||||
"""ACME "recoveryToken" challenge response.
|
||||
|
||||
:ivar unicode token:
|
||||
|
||||
"""
|
||||
typ = "recoveryToken"
|
||||
token = jose.Field("token", omitempty=True)
|
||||
|
||||
|
||||
@Challenge.register
|
||||
class ProofOfPossession(ContinuityChallenge):
|
||||
"""ACME "proofOfPossession" challenge.
|
||||
|
||||
@@ -22,10 +22,11 @@ class SimpleHTTPTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from acme.challenges import SimpleHTTP
|
||||
self.msg = SimpleHTTP(
|
||||
token='evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA')
|
||||
token=jose.decode_b64jose(
|
||||
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA'))
|
||||
self.jmsg = {
|
||||
'type': 'simpleHttp',
|
||||
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA',
|
||||
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
|
||||
}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
@@ -39,56 +40,36 @@ class SimpleHTTPTest(unittest.TestCase):
|
||||
from acme.challenges import SimpleHTTP
|
||||
hash(SimpleHTTP.from_json(self.jmsg))
|
||||
|
||||
def test_good_token(self):
|
||||
self.assertTrue(self.msg.good_token)
|
||||
self.assertFalse(
|
||||
self.msg.update(token=b'..').good_token)
|
||||
|
||||
|
||||
class SimpleHTTPResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import SimpleHTTPResponse
|
||||
self.msg_http = SimpleHTTPResponse(
|
||||
path='6tbIMBC5Anhl5bOlWT5ZFA', tls=False)
|
||||
self.msg_https = SimpleHTTPResponse(path='6tbIMBC5Anhl5bOlWT5ZFA')
|
||||
self.msg_http = SimpleHTTPResponse(tls=False)
|
||||
self.msg_https = SimpleHTTPResponse(tls=True)
|
||||
self.jmsg_http = {
|
||||
'resource': 'challenge',
|
||||
'type': 'simpleHttp',
|
||||
'path': '6tbIMBC5Anhl5bOlWT5ZFA',
|
||||
'tls': False,
|
||||
}
|
||||
self.jmsg_https = {
|
||||
'resource': 'challenge',
|
||||
'type': 'simpleHttp',
|
||||
'path': '6tbIMBC5Anhl5bOlWT5ZFA',
|
||||
'tls': True,
|
||||
}
|
||||
|
||||
from acme.challenges import SimpleHTTP
|
||||
self.chall = SimpleHTTP(token="foo")
|
||||
self.resp_http = SimpleHTTPResponse(path="bar", tls=False)
|
||||
self.resp_https = SimpleHTTPResponse(path="bar", tls=True)
|
||||
self.chall = SimpleHTTP(token=(b"x" * 16))
|
||||
self.resp_http = SimpleHTTPResponse(tls=False)
|
||||
self.resp_https = SimpleHTTPResponse(tls=True)
|
||||
self.good_headers = {'Content-Type': SimpleHTTPResponse.CONTENT_TYPE}
|
||||
|
||||
def test_good_path(self):
|
||||
self.assertTrue(self.msg_http.good_path)
|
||||
self.assertTrue(self.msg_https.good_path)
|
||||
self.assertFalse(
|
||||
self.msg_http.update(path=(self.msg_http.path * 10)).good_path)
|
||||
|
||||
def test_scheme(self):
|
||||
self.assertEqual('http', self.msg_http.scheme)
|
||||
self.assertEqual('https', self.msg_https.scheme)
|
||||
|
||||
def test_port(self):
|
||||
self.assertEqual(80, self.msg_http.port)
|
||||
self.assertEqual(443, self.msg_https.port)
|
||||
|
||||
def test_uri(self):
|
||||
self.assertEqual(
|
||||
'http://example.com/.well-known/acme-challenge/'
|
||||
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_http.uri('example.com'))
|
||||
self.assertEqual(
|
||||
'https://example.com/.well-known/acme-challenge/'
|
||||
'6tbIMBC5Anhl5bOlWT5ZFA', self.msg_https.uri('example.com'))
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg_http, self.msg_http.to_partial_json())
|
||||
self.assertEqual(self.jmsg_https, self.msg_https.to_partial_json())
|
||||
@@ -105,6 +86,63 @@ class SimpleHTTPResponseTest(unittest.TestCase):
|
||||
hash(SimpleHTTPResponse.from_json(self.jmsg_http))
|
||||
hash(SimpleHTTPResponse.from_json(self.jmsg_https))
|
||||
|
||||
def test_scheme(self):
|
||||
self.assertEqual('http', self.msg_http.scheme)
|
||||
self.assertEqual('https', self.msg_https.scheme)
|
||||
|
||||
def test_port(self):
|
||||
self.assertEqual(80, self.msg_http.port)
|
||||
self.assertEqual(443, self.msg_https.port)
|
||||
|
||||
def test_uri(self):
|
||||
self.assertEqual(
|
||||
'http://example.com/.well-known/acme-challenge/'
|
||||
'eHh4eHh4eHh4eHh4eHh4eA', self.msg_http.uri(
|
||||
'example.com', self.chall))
|
||||
self.assertEqual(
|
||||
'https://example.com/.well-known/acme-challenge/'
|
||||
'eHh4eHh4eHh4eHh4eHh4eA', self.msg_https.uri(
|
||||
'example.com', self.chall))
|
||||
|
||||
def test_gen_check_validation(self):
|
||||
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
|
||||
self.assertTrue(self.resp_http.check_validation(
|
||||
validation=self.resp_http.gen_validation(self.chall, account_key),
|
||||
chall=self.chall, account_public_key=account_key.public_key()))
|
||||
|
||||
def test_gen_check_validation_wrong_key(self):
|
||||
key1 = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
|
||||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa1024_key.pem'))
|
||||
self.assertFalse(self.resp_http.check_validation(
|
||||
validation=self.resp_http.gen_validation(self.chall, key1),
|
||||
chall=self.chall, account_public_key=key2.public_key()))
|
||||
|
||||
def test_check_validation_wrong_payload(self):
|
||||
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
|
||||
validations = tuple(
|
||||
jose.JWS.sign(payload=payload, alg=jose.RS256, key=account_key)
|
||||
for payload in (b'', b'{}', self.chall.json_dumps().encode('utf-8'),
|
||||
self.resp_http.json_dumps().encode('utf-8'))
|
||||
)
|
||||
for validation in validations:
|
||||
self.assertFalse(self.resp_http.check_validation(
|
||||
validation=validation, chall=self.chall,
|
||||
account_public_key=account_key.public_key()))
|
||||
|
||||
def test_check_validation_wrong_fields(self):
|
||||
resource = self.resp_http.gen_resource(self.chall)
|
||||
account_key = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
|
||||
validations = tuple(
|
||||
jose.JWS.sign(payload=bad_resource.json_dumps().encode('utf-8'),
|
||||
alg=jose.RS256, key=account_key)
|
||||
for bad_resource in (resource.update(tls=True),
|
||||
resource.update(token=b'x'*20))
|
||||
)
|
||||
for validation in validations:
|
||||
self.assertFalse(self.resp_http.check_validation(
|
||||
validation=validation, chall=self.chall,
|
||||
account_public_key=account_key.public_key()))
|
||||
|
||||
@mock.patch("acme.challenges.requests.get")
|
||||
def test_simple_verify_good_token(self, mock_get):
|
||||
for resp in self.resp_http, self.resp_https:
|
||||
@@ -132,7 +170,8 @@ class SimpleHTTPResponseTest(unittest.TestCase):
|
||||
|
||||
@mock.patch("acme.challenges.requests.get")
|
||||
def test_simple_verify_port(self, mock_get):
|
||||
self.resp_http.simple_verify(self.chall, "local", 4430)
|
||||
self.resp_http.simple_verify(
|
||||
self.chall, domain="local", account_public_key=None, port=4430)
|
||||
self.assertEqual("local:4430", urllib_parse.urlparse(
|
||||
mock_get.mock_calls[0][1][0]).netloc)
|
||||
|
||||
@@ -142,19 +181,12 @@ class DVSNITest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from acme.challenges import DVSNI
|
||||
self.msg = DVSNI(
|
||||
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
|
||||
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
|
||||
nonce=b'\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.')
|
||||
token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
self.jmsg = {
|
||||
'type': 'dvsni',
|
||||
'r': 'Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI',
|
||||
'nonce': 'a82d5ff8ef740d12881f6d3c2277ab2e',
|
||||
'token': 'a82d5ff8ef740d12881f6d3c2277ab2e',
|
||||
}
|
||||
|
||||
def test_nonce_domain(self):
|
||||
self.assertEqual(b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid',
|
||||
self.msg.nonce_domain)
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
||||
@@ -166,17 +198,66 @@ class DVSNITest(unittest.TestCase):
|
||||
from acme.challenges import DVSNI
|
||||
hash(DVSNI.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_invalid_r_length(self):
|
||||
def test_from_json_invalid_token_length(self):
|
||||
from acme.challenges import DVSNI
|
||||
self.jmsg['r'] = 'abcd'
|
||||
self.jmsg['token'] = jose.encode_b64jose(b'abcd')
|
||||
self.assertRaises(
|
||||
jose.DeserializationError, DVSNI.from_json, self.jmsg)
|
||||
|
||||
def test_from_json_invalid_nonce_length(self):
|
||||
def test_gen_response(self):
|
||||
key = jose.JWKRSA(key=KEY)
|
||||
from acme.challenges import DVSNI
|
||||
self.jmsg['nonce'] = 'abcd'
|
||||
self.assertRaises(
|
||||
jose.DeserializationError, DVSNI.from_json, self.jmsg)
|
||||
self.assertEqual(self.msg, DVSNI.json_loads(
|
||||
self.msg.gen_response(key).validation.payload.decode()))
|
||||
|
||||
|
||||
class DVSNIResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
def setUp(self):
|
||||
self.key = jose.JWKRSA(key=KEY)
|
||||
|
||||
from acme.challenges import DVSNI
|
||||
self.chall = DVSNI(
|
||||
token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
|
||||
from acme.challenges import DVSNIResponse
|
||||
self.validation = jose.JWS.sign(
|
||||
payload=self.chall.json_dumps(sort_keys=True).encode(),
|
||||
key=self.key, alg=jose.RS256)
|
||||
self.msg = DVSNIResponse(validation=self.validation)
|
||||
self.jmsg_to = {
|
||||
'resource': 'challenge',
|
||||
'type': 'dvsni',
|
||||
'validation': self.validation,
|
||||
}
|
||||
self.jmsg_from = {
|
||||
'resource': 'challenge',
|
||||
'type': 'dvsni',
|
||||
'validation': self.validation.to_json(),
|
||||
}
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
label1 = b'e2df3498860637c667fedadc5a8494ec'
|
||||
label2 = b'09dcc75553c9b3bd73662b50e71b1e42'
|
||||
self.z = label1 + label2
|
||||
self.z_domain = label1 + b'.' + label2 + b'.acme.invalid'
|
||||
self.domain = 'foo.com'
|
||||
|
||||
def test_z_and_domain(self):
|
||||
self.assertEqual(self.z, self.msg.z)
|
||||
self.assertEqual(self.z_domain, self.msg.z_domain)
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg_to, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import DVSNIResponse
|
||||
self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg_from))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import DVSNIResponse
|
||||
hash(DVSNIResponse.from_json(self.jmsg_from))
|
||||
|
||||
@mock.patch('acme.challenges.socket.gethostbyname')
|
||||
@mock.patch('acme.challenges.crypto_util._probe_sni')
|
||||
@@ -186,7 +267,7 @@ class DVSNITest(unittest.TestCase):
|
||||
mock_gethostbyname.assert_called_once_with('foo.com')
|
||||
mock_probe_sni.assert_called_once_with(
|
||||
host='127.0.0.1', port=self.msg.PORT,
|
||||
name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
|
||||
name=self.z_domain)
|
||||
|
||||
self.msg.probe_cert('foo.com', host='8.8.8.8')
|
||||
mock_probe_sni.assert_called_with(
|
||||
@@ -203,88 +284,54 @@ class DVSNITest(unittest.TestCase):
|
||||
self.msg.probe_cert('foo.com', name=b'xxx')
|
||||
mock_probe_sni.assert_called_with(
|
||||
host=mock.ANY, port=mock.ANY,
|
||||
name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
|
||||
name=self.z_domain)
|
||||
|
||||
def test_gen_verify_cert(self):
|
||||
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
cert, key2 = self.msg.gen_cert(key1)
|
||||
self.assertEqual(key1, key2)
|
||||
self.assertTrue(self.msg.verify_cert(cert))
|
||||
|
||||
class DVSNIResponseTest(unittest.TestCase):
|
||||
def test_gen_verify_cert_gen_key(self):
|
||||
cert, key = self.msg.gen_cert()
|
||||
self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
|
||||
self.assertTrue(self.msg.verify_cert(cert))
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import DVSNIResponse
|
||||
# pylint: disable=invalid-name
|
||||
s = '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c'
|
||||
self.msg = DVSNIResponse(s=jose.decode_b64jose(s))
|
||||
self.jmsg = {
|
||||
'resource': 'challenge',
|
||||
'type': 'dvsni',
|
||||
's': s,
|
||||
}
|
||||
def test_verify_bad_cert(self):
|
||||
self.assertFalse(self.msg.verify_cert(test_util.load_cert('cert.pem')))
|
||||
|
||||
from acme.challenges import DVSNI
|
||||
self.chall = DVSNI(
|
||||
r=jose.decode_b64jose('Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI'),
|
||||
nonce=jose.decode_b64jose('a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
self.z = (b'38e612b0397cc2624a07d351d7ef50e4'
|
||||
b'6134c0213d9ed52f7d7c611acaeed41b')
|
||||
self.domain = 'foo.com'
|
||||
self.key = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
self.public_key = test_util.load_rsa_private_key(
|
||||
'rsa512_key.pem').public_key()
|
||||
def test_simple_verify_wrong_account_key(self):
|
||||
self.assertFalse(self.msg.simple_verify(
|
||||
self.chall, self.domain, jose.JWKRSA.load(
|
||||
test_util.load_vector('rsa256_key.pem')).public_key()))
|
||||
|
||||
def test_z_and_domain(self):
|
||||
# pylint: disable=invalid-name
|
||||
self.assertEqual(self.z, self.msg.z(self.chall))
|
||||
self.assertEqual(
|
||||
self.z + b'.acme.invalid', self.msg.z_domain(self.chall))
|
||||
def test_simple_verify_wrong_payload(self):
|
||||
for payload in b'', b'{}':
|
||||
msg = self.msg.update(validation=jose.JWS.sign(
|
||||
payload=payload, key=self.key, alg=jose.RS256))
|
||||
self.assertFalse(msg.simple_verify(
|
||||
self.chall, self.domain, self.key.public_key()))
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
def test_simple_verify_wrong_token(self):
|
||||
msg = self.msg.update(validation=jose.JWS.sign(
|
||||
payload=self.chall.update(token=b'b'*20).json_dumps().encode(),
|
||||
key=self.key, alg=jose.RS256))
|
||||
self.assertFalse(msg.simple_verify(
|
||||
self.chall, self.domain, self.key.public_key()))
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import DVSNIResponse
|
||||
self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import DVSNIResponse
|
||||
hash(DVSNIResponse.from_json(self.jmsg))
|
||||
|
||||
@mock.patch('acme.challenges.DVSNIResponse.verify_cert')
|
||||
@mock.patch('acme.challenges.DVSNIResponse.verify_cert', autospec=True)
|
||||
def test_simple_verify(self, mock_verify_cert):
|
||||
chall = mock.Mock()
|
||||
chall.probe_cert.return_value = mock.sentinel.cert
|
||||
mock_verify_cert.return_value = 'x'
|
||||
self.assertEqual('x', self.msg.simple_verify(
|
||||
chall, mock.sentinel.domain, mock.sentinel.key))
|
||||
chall.probe_cert.assert_called_once_with(domain=mock.sentinel.domain)
|
||||
self.msg.verify_cert.assert_called_once_with(
|
||||
chall, mock.sentinel.domain, mock.sentinel.key,
|
||||
mock.sentinel.cert)
|
||||
mock_verify_cert.return_value = mock.sentinel.verification
|
||||
self.assertEqual(mock.sentinel.verification, self.msg.simple_verify(
|
||||
self.chall, self.domain, self.key.public_key(),
|
||||
cert=mock.sentinel.cert))
|
||||
mock_verify_cert.assert_called_once_with(self.msg, mock.sentinel.cert)
|
||||
|
||||
def test_simple_verify_false_on_probe_error(self):
|
||||
chall = mock.Mock()
|
||||
chall.probe_cert.side_effect = errors.Error
|
||||
self.assertFalse(self.msg.simple_verify(
|
||||
chall=chall, domain=None, public_key=None))
|
||||
|
||||
def test_gen_verify_cert_postive_no_key(self):
|
||||
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
|
||||
self.assertTrue(self.msg.verify_cert(
|
||||
self.chall, self.domain, public_key=None, cert=cert))
|
||||
|
||||
def test_gen_verify_cert_postive_with_key(self):
|
||||
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
|
||||
self.assertTrue(self.msg.verify_cert(
|
||||
self.chall, self.domain, public_key=self.public_key, cert=cert))
|
||||
|
||||
def test_gen_verify_cert_negative_with_wrong_key(self):
|
||||
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
|
||||
key = test_util.load_rsa_private_key('rsa256_key.pem').public_key()
|
||||
self.assertFalse(self.msg.verify_cert(
|
||||
self.chall, self.domain, public_key=key, cert=cert))
|
||||
|
||||
def test_gen_verify_cert_negative(self):
|
||||
cert = self.msg.gen_cert(self.chall, self.domain + 'x', self.key)
|
||||
self.assertFalse(self.msg.verify_cert(
|
||||
self.chall, self.domain, public_key=None, cert=cert))
|
||||
self.chall, self.domain, self.key.public_key()))
|
||||
|
||||
|
||||
class RecoveryContactTest(unittest.TestCase):
|
||||
@@ -360,58 +407,6 @@ class RecoveryContactResponseTest(unittest.TestCase):
|
||||
self.assertEqual(self.jmsg, msg.to_partial_json())
|
||||
|
||||
|
||||
class RecoveryTokenTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import RecoveryToken
|
||||
self.msg = RecoveryToken()
|
||||
self.jmsg = {'type': 'recoveryToken'}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import RecoveryToken
|
||||
self.assertEqual(self.msg, RecoveryToken.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import RecoveryToken
|
||||
hash(RecoveryToken.from_json(self.jmsg))
|
||||
|
||||
|
||||
class RecoveryTokenResponseTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import RecoveryTokenResponse
|
||||
self.msg = RecoveryTokenResponse(token='23029d88d9e123e')
|
||||
self.jmsg = {
|
||||
'resource': 'challenge',
|
||||
'type': 'recoveryToken',
|
||||
'token': '23029d88d9e123e'
|
||||
}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import RecoveryTokenResponse
|
||||
self.assertEqual(
|
||||
self.msg, RecoveryTokenResponse.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import RecoveryTokenResponse
|
||||
hash(RecoveryTokenResponse.from_json(self.jmsg))
|
||||
|
||||
def test_json_without_optionals(self):
|
||||
del self.jmsg['token']
|
||||
|
||||
from acme.challenges import RecoveryTokenResponse
|
||||
msg = RecoveryTokenResponse.from_json(self.jmsg)
|
||||
|
||||
self.assertTrue(msg.token is None)
|
||||
self.assertEqual(self.jmsg, msg.to_partial_json())
|
||||
|
||||
|
||||
class ProofOfPossessionHintsTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
@@ -44,7 +44,7 @@ class ClientTest(unittest.TestCase):
|
||||
# Registration
|
||||
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
|
||||
reg = messages.Registration(
|
||||
contact=self.contact, key=KEY.public_key(), recovery_token='t')
|
||||
contact=self.contact, key=KEY.public_key())
|
||||
self.new_reg = messages.NewRegistration(**dict(reg))
|
||||
self.regr = messages.RegistrationResource(
|
||||
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
|
||||
|
||||
@@ -155,13 +155,18 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
|
||||
for part in parts if part.startswith(prefix)]
|
||||
|
||||
|
||||
def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)):
|
||||
def gen_ss_cert(key, domains, not_before=None,
|
||||
validity=(7 * 24 * 60 * 60), force_san=True):
|
||||
"""Generate new self-signed certificate.
|
||||
|
||||
:type domains: `list` of `unicode`
|
||||
:param OpenSSL.crypto.PKey key:
|
||||
:param bool force_san:
|
||||
|
||||
Uses key and contains all domains.
|
||||
If more than one domain is provided, all of the domains are put into
|
||||
``subjectAltName`` X.509 extension and first domain is set as the
|
||||
subject CN. If only one domain is provided no ``subjectAltName``
|
||||
extension is used, unless `force_san` is ``True``.
|
||||
|
||||
"""
|
||||
assert domains, "Must provide one or more hostnames for the cert."
|
||||
@@ -178,7 +183,7 @@ def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)):
|
||||
# TODO: what to put into cert.get_subject()?
|
||||
cert.set_issuer(cert.get_subject())
|
||||
|
||||
if len(domains) > 1:
|
||||
if force_san or len(domains) > 1:
|
||||
extensions.append(OpenSSL.crypto.X509Extension(
|
||||
b"subjectAltName",
|
||||
critical=False,
|
||||
|
||||
@@ -1,9 +1,34 @@
|
||||
"""ACME JSON fields."""
|
||||
import logging
|
||||
|
||||
import pyrfc3339
|
||||
|
||||
from acme import jose
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Fixed(jose.Field):
|
||||
"""Fixed field."""
|
||||
|
||||
def __init__(self, json_name, value):
|
||||
self.value = value
|
||||
super(Fixed, self).__init__(
|
||||
json_name=json_name, default=value, omitempty=False)
|
||||
|
||||
def decode(self, value):
|
||||
if value != self.value:
|
||||
raise jose.DeserializationError('Expected {0!r}'.format(self.value))
|
||||
return self.value
|
||||
|
||||
def encode(self, value):
|
||||
if value != self.value:
|
||||
logger.warn('Overriding fixed field ({0}) with {1}'.format(
|
||||
self.json_name, value))
|
||||
return value
|
||||
|
||||
|
||||
class RFC3339Field(jose.Field):
|
||||
"""RFC3339 field encoder/decoder.
|
||||
|
||||
@@ -31,8 +56,6 @@ class Resource(jose.Field):
|
||||
def __init__(self, resource_type, *args, **kwargs):
|
||||
self.resource_type = resource_type
|
||||
super(Resource, self).__init__(
|
||||
# TODO: omitempty used only to trick
|
||||
# JSONObjectWithFieldsMeta._defaults..., server implementation
|
||||
'resource', default=resource_type, *args, **kwargs)
|
||||
|
||||
def decode(self, value):
|
||||
|
||||
@@ -7,6 +7,26 @@ import pytz
|
||||
from acme import jose
|
||||
|
||||
|
||||
class FixedTest(unittest.TestCase):
|
||||
"""Tests for acme.fields.Fixed."""
|
||||
|
||||
def setUp(self):
|
||||
from acme.fields import Fixed
|
||||
self.field = Fixed('name', 'x')
|
||||
|
||||
def test_decode(self):
|
||||
self.assertEqual('x', self.field.decode('x'))
|
||||
|
||||
def test_decode_bad(self):
|
||||
self.assertRaises(jose.DeserializationError, self.field.decode, 'y')
|
||||
|
||||
def test_encode(self):
|
||||
self.assertEqual('x', self.field.encode('x'))
|
||||
|
||||
def test_encode_override(self):
|
||||
self.assertEqual('y', self.field.encode('y'))
|
||||
|
||||
|
||||
class RFC3339FieldTest(unittest.TestCase):
|
||||
"""Tests for acme.fields.RFC3339Field."""
|
||||
|
||||
|
||||
@@ -8,6 +8,10 @@ class Error(Exception):
|
||||
class DeserializationError(Error):
|
||||
"""JSON deserialization error."""
|
||||
|
||||
def __str__(self):
|
||||
return "Deserialization error: {0}".format(
|
||||
super(DeserializationError, self).__str__())
|
||||
|
||||
|
||||
class SerializationError(Error):
|
||||
"""JSON serialization error."""
|
||||
|
||||
@@ -5,6 +5,7 @@ import json
|
||||
|
||||
import six
|
||||
|
||||
from acme.jose import errors
|
||||
from acme.jose import util
|
||||
|
||||
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
|
||||
@@ -172,7 +173,11 @@ class JSONDeSerializable(object):
|
||||
@classmethod
|
||||
def json_loads(cls, json_string):
|
||||
"""Deserialize from JSON document string."""
|
||||
return cls.from_json(json.loads(json_string))
|
||||
try:
|
||||
loads = json.loads(json_string)
|
||||
except ValueError as error:
|
||||
raise errors.DeserializationError(error)
|
||||
return cls.from_json(loads)
|
||||
|
||||
def json_dumps(self, **kwargs):
|
||||
"""Dump to JSON string using proper serializer.
|
||||
|
||||
@@ -221,6 +221,22 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
|
||||
super(JSONObjectWithFields, self).__init__(
|
||||
**(dict(self._defaults(), **kwargs)))
|
||||
|
||||
def encode(self, name):
|
||||
"""Encode a single field.
|
||||
|
||||
:param str name: Name of the field to be encoded.
|
||||
|
||||
:raises erors.SerializationError: if field cannot be serialized
|
||||
:raises errors.Error: if field could not be found
|
||||
|
||||
"""
|
||||
try:
|
||||
field = self._fields[name]
|
||||
except KeyError:
|
||||
raise errors.Error("Field not found: {0}".format(name))
|
||||
|
||||
return field.encode(getattr(self, name))
|
||||
|
||||
def fields_to_partial_json(self):
|
||||
"""Serialize fields to JSON."""
|
||||
jobj = {}
|
||||
@@ -310,7 +326,8 @@ def decode_b64jose(data, size=None, minimum=False):
|
||||
|
||||
if size is not None and ((not minimum and len(decoded) != size)
|
||||
or (minimum and len(decoded) < size)):
|
||||
raise errors.DeserializationError()
|
||||
raise errors.DeserializationError(
|
||||
"Expected at least or exactly {0} bytes".format(size))
|
||||
|
||||
return decoded
|
||||
|
||||
@@ -418,7 +435,9 @@ class TypedJSONObjectWithFields(JSONObjectWithFields):
|
||||
def get_type_cls(cls, jobj):
|
||||
"""Get the registered class for ``jobj``."""
|
||||
if cls in six.itervalues(cls.TYPES):
|
||||
assert jobj[cls.type_field_name]
|
||||
if cls.type_field_name not in jobj:
|
||||
raise errors.DeserializationError(
|
||||
"Missing type field ({0})".format(cls.type_field_name))
|
||||
# cls is already registered type_cls, force to use it
|
||||
# so that, e.g Revocation.from_json(jobj) fails if
|
||||
# jobj["type"] != "revocation".
|
||||
|
||||
@@ -160,6 +160,18 @@ class JSONObjectWithFieldsTest(unittest.TestCase):
|
||||
def test_init_defaults(self):
|
||||
self.assertEqual(self.mock, self.MockJSONObjectWithFields(y=2, z=3))
|
||||
|
||||
def test_encode(self):
|
||||
self.assertEqual(10, self.MockJSONObjectWithFields(
|
||||
x=5, y=0, z=0).encode("x"))
|
||||
|
||||
def test_encode_wrong_field(self):
|
||||
self.assertRaises(errors.Error, self.mock.encode, 'foo')
|
||||
|
||||
def test_encode_serialization_error_passthrough(self):
|
||||
self.assertRaises(
|
||||
errors.SerializationError,
|
||||
self.MockJSONObjectWithFields(y=500, z=None).encode, "y")
|
||||
|
||||
def test_fields_to_partial_json_omits_empty(self):
|
||||
self.assertEqual(self.mock.fields_to_partial_json(), {'y': 2, 'Z': 3})
|
||||
|
||||
|
||||
@@ -156,7 +156,6 @@ class Registration(ResourceBody):
|
||||
:ivar acme.jose.jwk.JWK key: Public key.
|
||||
:ivar tuple contact: Contact information following ACME spec,
|
||||
`tuple` of `unicode`.
|
||||
:ivar unicode recovery_token:
|
||||
:ivar unicode agreement:
|
||||
|
||||
"""
|
||||
@@ -164,7 +163,6 @@ class Registration(ResourceBody):
|
||||
# JWS.signature.combined.jwk
|
||||
key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json)
|
||||
contact = jose.Field('contact', omitempty=True, default=())
|
||||
recovery_token = jose.Field('recoveryToken', omitempty=True)
|
||||
agreement = jose.Field('agreement', omitempty=True)
|
||||
|
||||
phone_prefix = 'tel:'
|
||||
|
||||
@@ -101,18 +101,14 @@ class RegistrationTest(unittest.TestCase):
|
||||
'mailto:admin@foo.com',
|
||||
'tel:1234',
|
||||
)
|
||||
recovery_token = 'XYZ'
|
||||
agreement = 'https://letsencrypt.org/terms'
|
||||
|
||||
from acme.messages import Registration
|
||||
self.reg = Registration(
|
||||
key=key, contact=contact, recovery_token=recovery_token,
|
||||
agreement=agreement)
|
||||
self.reg = Registration(key=key, contact=contact, agreement=agreement)
|
||||
self.reg_none = Registration()
|
||||
|
||||
self.jobj_to = {
|
||||
'contact': contact,
|
||||
'recoveryToken': recovery_token,
|
||||
'agreement': agreement,
|
||||
'key': key,
|
||||
}
|
||||
@@ -228,11 +224,12 @@ class AuthorizationTest(unittest.TestCase):
|
||||
self.challbs = (
|
||||
ChallengeBody(
|
||||
uri='http://challb1', status=STATUS_VALID,
|
||||
chall=challenges.SimpleHTTP(token='IlirfxKKXAsHtmzK29Pj8A')),
|
||||
chall=challenges.SimpleHTTP(token=b'IlirfxKKXAsHtmzK29Pj8A')),
|
||||
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
|
||||
chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')),
|
||||
chall=challenges.DNS(
|
||||
token=b'DGyRejmCefe7v4NfDGDKfA')),
|
||||
ChallengeBody(uri='http://challb3', status=STATUS_VALID,
|
||||
chall=challenges.RecoveryToken()),
|
||||
chall=challenges.RecoveryContact()),
|
||||
)
|
||||
combinations = ((0, 2), (1, 2))
|
||||
|
||||
|
||||
@@ -94,15 +94,11 @@ def report_new_account(acc, config):
|
||||
config.config_dir),
|
||||
reporter.MEDIUM_PRIORITY, True)
|
||||
|
||||
assert acc.regr.body.recovery_token is not None
|
||||
recovery_msg = ("If you lose your account credentials, you can recover "
|
||||
"them using the token \"{0}\". You must write that down "
|
||||
"and put it in a safe place.".format(
|
||||
acc.regr.body.recovery_token))
|
||||
if acc.regr.body.emails:
|
||||
recovery_msg += (" Another recovery method will be e-mails sent to "
|
||||
"{0}.".format(", ".join(acc.regr.body.emails)))
|
||||
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
|
||||
recovery_msg = ("If you lose your account credentials, you can "
|
||||
"recover through e-mails sent to {0}.".format(
|
||||
", ".join(acc.regr.body.emails)))
|
||||
reporter.add_message(recovery_msg, reporter.HIGH_PRIORITY, True)
|
||||
|
||||
|
||||
class AccountMemoryStorage(interfaces.AccountStorage):
|
||||
|
||||
@@ -17,18 +17,22 @@ Note, that all annotated challenges act as a proxy objects::
|
||||
achall.token == challb.token
|
||||
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
|
||||
import OpenSSL
|
||||
|
||||
from acme import challenges
|
||||
from acme.jose import util as jose_util
|
||||
from acme import jose
|
||||
|
||||
from letsencrypt import crypto_util
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
|
||||
class AnnotatedChallenge(jose_util.ImmutableMap):
|
||||
class AnnotatedChallenge(jose.ImmutableMap):
|
||||
"""Client annotated challenge.
|
||||
|
||||
Wraps around server provided challenge and annotates with data
|
||||
@@ -45,33 +49,56 @@ class AnnotatedChallenge(jose_util.ImmutableMap):
|
||||
|
||||
|
||||
class DVSNI(AnnotatedChallenge):
|
||||
"""Client annotated "dvsni" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain', 'key')
|
||||
"""Client annotated "dvsni" ACME challenge.
|
||||
|
||||
:ivar .Account account:
|
||||
|
||||
"""
|
||||
__slots__ = ('challb', 'domain', 'account')
|
||||
acme_type = challenges.DVSNI
|
||||
|
||||
def gen_cert_and_response(self, s=None): # pylint: disable=invalid-name
|
||||
def gen_cert_and_response(self, key_pem=None, bits=2048, alg=jose.RS256):
|
||||
"""Generate a DVSNI cert and response.
|
||||
|
||||
:returns: ``(cert_pem, response)`` tuple, where ``cert_pem`` is the PEM
|
||||
encoded certificate and ``response`` is an instance
|
||||
:class:`acme.challenges.DVSNIResponse`.
|
||||
:param bytes key_pem: Private PEM-encoded key used for
|
||||
certificate generation. If none provided, a fresh key will
|
||||
be generated.
|
||||
:param int bits: Number of bits for fresh key generation.
|
||||
:param .JWAAlgorithm alg:
|
||||
|
||||
:returns: ``(response, cert_pem, key_pem)`` tuple, where
|
||||
``response`` is an instance of
|
||||
`acme.challenges.DVSNIResponse`, ``cert_pem`` is the
|
||||
PEM-encoded certificate and ``key_pem`` is PEM-encoded
|
||||
private key.
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
key = crypto_util.private_jwk_to_pyopenssl(self.key)
|
||||
response = challenges.DVSNIResponse(s=s)
|
||||
cert = response.gen_cert(self.challb.chall, self.domain, key)
|
||||
key = None if key_pem is None else OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, key_pem)
|
||||
response = self.challb.chall.gen_response(self.account.key, alg=alg)
|
||||
cert, key = response.gen_cert(key=key, bits=bits)
|
||||
|
||||
cert_pem = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert)
|
||||
key_pem = OpenSSL.crypto.dump_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, key)
|
||||
|
||||
return cert_pem, response
|
||||
return response, cert_pem, key_pem
|
||||
|
||||
|
||||
class SimpleHTTP(AnnotatedChallenge):
|
||||
"""Client annotated "simpleHttp" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain', 'key')
|
||||
__slots__ = ('challb', 'domain', 'account')
|
||||
acme_type = challenges.SimpleHTTP
|
||||
|
||||
def gen_response_and_validation(self, tls):
|
||||
response = challenges.SimpleHTTPResponse(tls=tls)
|
||||
|
||||
validation = response.gen_validation(self.chall, self.account.key)
|
||||
logger.debug("Simple HTTP validation payload: %s", validation.payload)
|
||||
return response, validation
|
||||
|
||||
|
||||
class DNS(AnnotatedChallenge):
|
||||
"""Client annotated "dns" ACME challenge."""
|
||||
@@ -85,12 +112,6 @@ class RecoveryContact(AnnotatedChallenge):
|
||||
acme_type = challenges.RecoveryContact
|
||||
|
||||
|
||||
class RecoveryToken(AnnotatedChallenge):
|
||||
"""Client annotated "recoveryToken" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain')
|
||||
acme_type = challenges.RecoveryToken
|
||||
|
||||
|
||||
class ProofOfPossession(AnnotatedChallenge):
|
||||
"""Client annotated "proofOfPossession" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain')
|
||||
|
||||
@@ -322,7 +322,7 @@ class AuthHandler(object):
|
||||
challb = self.authzr[domain].body.challenges[index]
|
||||
chall = challb.chall
|
||||
|
||||
achall = challb_to_achall(challb, self.account.key, domain)
|
||||
achall = challb_to_achall(challb, self.account, domain)
|
||||
|
||||
if isinstance(chall, challenges.ContinuityChallenge):
|
||||
cont_chall.append(achall)
|
||||
@@ -332,15 +332,11 @@ class AuthHandler(object):
|
||||
return cont_chall, dv_chall
|
||||
|
||||
|
||||
def challb_to_achall(challb, key, domain):
|
||||
def challb_to_achall(challb, account, domain):
|
||||
"""Converts a ChallengeBody object to an AnnotatedChallenge.
|
||||
|
||||
:param challb: ChallengeBody
|
||||
:type challb: :class:`acme.messages.ChallengeBody`
|
||||
|
||||
:param key: Key
|
||||
:type key: :class:`letsencrypt.le_util.Key`
|
||||
|
||||
:param .ChallengeBody challb: ChallengeBody
|
||||
:param .Account account:
|
||||
:param str domain: Domain of the challb
|
||||
|
||||
:returns: Appropriate AnnotatedChallenge
|
||||
@@ -352,14 +348,12 @@ def challb_to_achall(challb, key, domain):
|
||||
|
||||
if isinstance(chall, challenges.DVSNI):
|
||||
return achallenges.DVSNI(
|
||||
challb=challb, domain=domain, key=key)
|
||||
challb=challb, domain=domain, account=account)
|
||||
elif isinstance(chall, challenges.SimpleHTTP):
|
||||
return achallenges.SimpleHTTP(
|
||||
challb=challb, domain=domain, key=key)
|
||||
challb=challb, domain=domain, account=account)
|
||||
elif isinstance(chall, challenges.DNS):
|
||||
return achallenges.DNS(challb=challb, domain=domain)
|
||||
elif isinstance(chall, challenges.RecoveryToken):
|
||||
return achallenges.RecoveryToken(challb=challb, domain=domain)
|
||||
elif isinstance(chall, challenges.RecoveryContact):
|
||||
return achallenges.RecoveryContact(
|
||||
challb=challb, domain=domain)
|
||||
|
||||
@@ -22,7 +22,6 @@ class NamespaceConfig(object):
|
||||
- `cert_key_backup`
|
||||
- `in_progress_dir`
|
||||
- `key_dir`
|
||||
- `rec_token_dir`
|
||||
- `renewer_config_file`
|
||||
- `temp_checkpoint_dir`
|
||||
|
||||
@@ -71,11 +70,6 @@ class NamespaceConfig(object):
|
||||
def key_dir(self): # pylint: disable=missing-docstring
|
||||
return os.path.join(self.namespace.config_dir, constants.KEY_DIR)
|
||||
|
||||
# TODO: This should probably include the server name
|
||||
@property
|
||||
def rec_token_dir(self): # pylint: disable=missing-docstring
|
||||
return os.path.join(self.namespace.work_dir, constants.REC_TOKEN_DIR)
|
||||
|
||||
@property
|
||||
def temp_checkpoint_dir(self): # pylint: disable=missing-docstring
|
||||
return os.path.join(
|
||||
|
||||
@@ -88,10 +88,6 @@ LIVE_DIR = "live"
|
||||
TEMP_CHECKPOINT_DIR = "temp_checkpoint"
|
||||
"""Temporary checkpoint directory (relative to `IConfig.work_dir`)."""
|
||||
|
||||
REC_TOKEN_DIR = "recovery_tokens"
|
||||
"""Directory where all recovery tokens are saved (relative to
|
||||
`IConfig.work_dir`)."""
|
||||
|
||||
RENEWAL_CONFIGS_DIR = "configs"
|
||||
"""Renewal configs directory, relative to `IConfig.config_dir`."""
|
||||
|
||||
|
||||
@@ -7,16 +7,12 @@ from letsencrypt import achallenges
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt import proof_of_possession
|
||||
from letsencrypt import recovery_token
|
||||
|
||||
|
||||
class ContinuityAuthenticator(object):
|
||||
"""IAuthenticator for
|
||||
:const:`~acme.challenges.ContinuityChallenge` class challenges.
|
||||
|
||||
:ivar rec_token: Performs "recoveryToken" challenges.
|
||||
:type rec_token: :class:`letsencrypt.recovery_token.RecoveryToken`
|
||||
|
||||
:ivar proof_of_pos: Performs "proofOfPossession" challenges.
|
||||
:type proof_of_pos:
|
||||
:class:`letsencrypt.proof_of_possession.Proof_of_Possession`
|
||||
@@ -25,7 +21,7 @@ class ContinuityAuthenticator(object):
|
||||
zope.interface.implements(interfaces.IAuthenticator)
|
||||
|
||||
# This will have an installer soon for get_key/cert purposes
|
||||
def __init__(self, config, installer):
|
||||
def __init__(self, config, installer): # pylint: disable=unused-argument
|
||||
"""Initialize Client Authenticator.
|
||||
|
||||
:param config: Configuration.
|
||||
@@ -35,13 +31,11 @@ class ContinuityAuthenticator(object):
|
||||
:type installer: :class:`letsencrypt.interfaces.IInstaller`
|
||||
|
||||
"""
|
||||
self.rec_token = recovery_token.RecoveryToken(
|
||||
config.server, config.rec_token_dir)
|
||||
self.proof_of_pos = proof_of_possession.ProofOfPossession(installer)
|
||||
|
||||
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
|
||||
"""Return list of challenge preferences."""
|
||||
return [challenges.ProofOfPossession, challenges.RecoveryToken]
|
||||
return [challenges.ProofOfPossession]
|
||||
|
||||
def perform(self, achalls):
|
||||
"""Perform client specific challenges for IAuthenticator"""
|
||||
@@ -49,16 +43,12 @@ class ContinuityAuthenticator(object):
|
||||
for achall in achalls:
|
||||
if isinstance(achall, achallenges.ProofOfPossession):
|
||||
responses.append(self.proof_of_pos.perform(achall))
|
||||
elif isinstance(achall, achallenges.RecoveryToken):
|
||||
responses.append(self.rec_token.perform(achall))
|
||||
else:
|
||||
raise errors.ContAuthError("Unexpected Challenge")
|
||||
return responses
|
||||
|
||||
def cleanup(self, achalls):
|
||||
def cleanup(self, achalls): # pylint: disable=no-self-use
|
||||
"""Cleanup call for IAuthenticator."""
|
||||
for achall in achalls:
|
||||
if isinstance(achall, achallenges.RecoveryToken):
|
||||
self.rec_token.cleanup(achall)
|
||||
elif not isinstance(achall, achallenges.ProofOfPossession):
|
||||
if not isinstance(achall, achallenges.ProofOfPossession):
|
||||
raise errors.ContAuthError("Unexpected Challenge")
|
||||
|
||||
@@ -8,7 +8,6 @@ import datetime
|
||||
import logging
|
||||
import os
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import OpenSSL
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
@@ -215,15 +214,6 @@ def pyopenssl_load_certificate(data):
|
||||
return _pyopenssl_load(data, OpenSSL.crypto.load_certificate)
|
||||
|
||||
|
||||
def private_jwk_to_pyopenssl(jwk):
|
||||
"""Convert private JWK to pyOpenSSL key."""
|
||||
key_pem = jwk.key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption())
|
||||
return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_pem)
|
||||
|
||||
|
||||
def _get_sans_from_cert_or_req(
|
||||
cert_or_req_str, load_func, typ=OpenSSL.crypto.FILETYPE_PEM):
|
||||
try:
|
||||
@@ -238,7 +228,7 @@ def _get_sans_from_cert_or_req(
|
||||
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
|
||||
"""Get a list of Subject Alternative Names from a certificate.
|
||||
|
||||
:param str csr: Certificate (encoded).
|
||||
:param str cert: Certificate (encoded).
|
||||
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
|
||||
|
||||
:returns: A list of Subject Alternative Names.
|
||||
|
||||
@@ -215,8 +215,6 @@ class IConfig(zope.interface.Interface):
|
||||
in_progress_dir = zope.interface.Attribute(
|
||||
"Directory used before a permanent checkpoint is finalized.")
|
||||
key_dir = zope.interface.Attribute("Keys storage.")
|
||||
rec_token_dir = zope.interface.Attribute(
|
||||
"Directory where all recovery tokens are saved.")
|
||||
temp_checkpoint_dir = zope.interface.Attribute(
|
||||
"Temporary checkpoint directory.")
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import re
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import zope.interface
|
||||
|
||||
from acme.jose import util as jose_util
|
||||
@@ -163,13 +162,13 @@ class Dvsni(object):
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
return os.path.join(
|
||||
self.configurator.config.work_dir, achall.nonce_domain + ".crt")
|
||||
return os.path.join(self.configurator.config.work_dir,
|
||||
achall.chall.encode("token") + ".crt")
|
||||
|
||||
def get_key_path(self, achall):
|
||||
"""Get standardized path to challenge key."""
|
||||
return os.path.join(
|
||||
self.configurator.config.work_dir, achall.nonce_domain + '.pem')
|
||||
return os.path.join(self.configurator.config.work_dir,
|
||||
achall.chall.encode("token") + '.pem')
|
||||
|
||||
def _setup_challenge_cert(self, achall, s=None):
|
||||
# pylint: disable=invalid-name
|
||||
@@ -180,17 +179,11 @@ class Dvsni(object):
|
||||
self.configurator.reverter.register_file_creation(True, key_path)
|
||||
self.configurator.reverter.register_file_creation(True, cert_path)
|
||||
|
||||
cert_pem, response = achall.gen_cert_and_response(s)
|
||||
response, cert_pem, key_pem = achall.gen_cert_and_response(s)
|
||||
|
||||
# Write out challenge cert
|
||||
# Write out challenge cert and key
|
||||
with open(cert_path, "wb") as cert_chall_fd:
|
||||
cert_chall_fd.write(cert_pem)
|
||||
|
||||
# Write out challenge key
|
||||
key_pem = achall.key.key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption())
|
||||
with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file:
|
||||
key_file.write(key_pem)
|
||||
|
||||
|
||||
@@ -120,21 +120,12 @@ class DvsniTest(unittest.TestCase):
|
||||
achalls = [
|
||||
achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(
|
||||
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9"
|
||||
"\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
|
||||
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
|
||||
), "pending"),
|
||||
domain="encryption-example.demo", key=auth_key),
|
||||
challenges.DVSNI(token=b'dvsni1'), "pending"),
|
||||
domain="encryption-example.demo", account=mock.Mock(key=auth_key)),
|
||||
achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(
|
||||
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
|
||||
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
|
||||
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7\xa1\xb2\xc5"
|
||||
"\x96\xba",
|
||||
), "pending"),
|
||||
domain="letsencrypt.demo", key=auth_key),
|
||||
challenges.DVSNI(token=b'dvsni2'), "pending"),
|
||||
domain="letsencrypt.demo", account=mock.Mock(key=auth_key)),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
@@ -153,10 +144,9 @@ class DvsniTest(unittest.TestCase):
|
||||
# http://www.voidspace.org.uk/python/mock/helpers.html#mock.mock_open
|
||||
mock_open, mock_safe_open = mock.mock_open(), mock.mock_open()
|
||||
|
||||
response = challenges.DVSNIResponse(s="randomS1")
|
||||
achall = mock.MagicMock(nonce=self.achalls[0].nonce,
|
||||
nonce_domain=self.achalls[0].nonce_domain)
|
||||
achall.gen_cert_and_response.return_value = ("pem", response)
|
||||
response = challenges.DVSNIResponse(validation=mock.Mock())
|
||||
achall = mock.MagicMock()
|
||||
achall.gen_cert_and_response.return_value = (response, "cert", "key")
|
||||
|
||||
with mock.patch("letsencrypt.plugins.common.open",
|
||||
mock_open, create=True):
|
||||
@@ -168,11 +158,10 @@ class DvsniTest(unittest.TestCase):
|
||||
|
||||
# pylint: disable=no-member
|
||||
mock_open.assert_called_once_with(self.sni.get_cert_path(achall), "wb")
|
||||
mock_open.return_value.write.assert_called_once_with("pem")
|
||||
mock_open.return_value.write.assert_called_once_with("cert")
|
||||
mock_safe_open.assert_called_once_with(
|
||||
self.sni.get_key_path(achall), "wb", chmod=0o400)
|
||||
mock_safe_open.return_value.write.assert_called_once_with(
|
||||
achall.key.key.private_bytes())
|
||||
mock_safe_open.return_value.write.assert_called_once_with("key")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Manual plugin."""
|
||||
import os
|
||||
import logging
|
||||
import pipes
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
@@ -55,7 +56,7 @@ command on the target server (as root):
|
||||
HTTP_TEMPLATE = """\
|
||||
mkdir -p {root}/public_html/{response.URI_ROOT_PATH}
|
||||
cd {root}/public_html
|
||||
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
|
||||
echo -n {validation} > {response.URI_ROOT_PATH}/{encoded_token}
|
||||
# run only once per server:
|
||||
python -c "import BaseHTTPServer, SimpleHTTPServer; \\
|
||||
SimpleHTTPServer.SimpleHTTPRequestHandler.extensions_map = {{'': '{ct}'}}; \\
|
||||
@@ -67,7 +68,7 @@ s.serve_forever()" """
|
||||
HTTPS_TEMPLATE = """\
|
||||
mkdir -p {root}/public_html/{response.URI_ROOT_PATH}
|
||||
cd {root}/public_html
|
||||
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
|
||||
echo -n {validation} > {response.URI_ROOT_PATH}/{encoded_token}
|
||||
# run only once per server:
|
||||
openssl req -new -newkey rsa:4096 -subj "/" -days 1 -nodes -x509 -keyout ../key.pem -out ../cert.pem
|
||||
python -c "import BaseHTTPServer, SimpleHTTPServer, ssl; \\
|
||||
@@ -124,13 +125,13 @@ binary for temporary key/certificate generation.""".replace("\n", "")
|
||||
# same path for each challenge response would be easier for
|
||||
# users, but will not work if multiple domains point at the
|
||||
# same server: default command doesn't support virtual hosts
|
||||
response = challenges.SimpleHTTPResponse(
|
||||
path=jose.b64encode(os.urandom(18)),
|
||||
response, validation = achall.gen_response_and_validation(
|
||||
tls=(not self.config.no_simple_http_tls))
|
||||
assert response.good_path # is encoded os.urandom(18) good?
|
||||
|
||||
command = self.template.format(
|
||||
root=self._root, achall=achall, response=response,
|
||||
validation=pipes.quote(validation.json_dumps()),
|
||||
encoded_token=achall.chall.encode("token"),
|
||||
ct=response.CONTENT_TYPE, port=(
|
||||
response.port if self.config.simple_http_port is None
|
||||
else self.config.simple_http_port))
|
||||
@@ -161,7 +162,8 @@ binary for temporary key/certificate generation.""".replace("\n", "")
|
||||
command=command))
|
||||
|
||||
if response.simple_verify(
|
||||
achall.challb, achall.domain, self.config.simple_http_port):
|
||||
achall.chall, achall.domain,
|
||||
achall.account.key.public_key(), self.config.simple_http_port):
|
||||
return response
|
||||
else:
|
||||
if self.conf("test-mode") and self._httpd.poll() is not None:
|
||||
|
||||
@@ -6,7 +6,6 @@ import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import OpenSSL
|
||||
import zope.component
|
||||
import zope.interface
|
||||
@@ -14,6 +13,7 @@ import zope.interface
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import interfaces
|
||||
|
||||
from letsencrypt.plugins import common
|
||||
@@ -28,6 +28,11 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
the certificate authority. Therefore, it does not rely on any
|
||||
existing server program.
|
||||
|
||||
:param OpenSSL.crypto.PKey private_key: DVSNI challenge certificate
|
||||
key.
|
||||
:param sni_names: Mapping from z_domain (`bytes`) to PEM-encoded
|
||||
certificate (`bytes`).
|
||||
|
||||
"""
|
||||
zope.interface.implements(interfaces.IAuthenticator)
|
||||
zope.interface.classProvides(interfaces.IPluginFactory)
|
||||
@@ -40,9 +45,12 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
self.parent_pid = os.getpid()
|
||||
self.subproc_state = None
|
||||
self.tasks = {}
|
||||
self.sni_names = {}
|
||||
self.sock = None
|
||||
self.connection = None
|
||||
self.private_key = None
|
||||
self.key_pem = crypto_util.make_key(bits=2048)
|
||||
self.private_key = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, self.key_pem)
|
||||
self.ssl_conn = None
|
||||
|
||||
def prepare(self):
|
||||
@@ -121,12 +129,12 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
|
||||
"""
|
||||
sni_name = connection.get_servername()
|
||||
if sni_name in self.tasks:
|
||||
pem_cert = self.tasks[sni_name]
|
||||
if sni_name in self.sni_names:
|
||||
pem_cert = self.sni_names[sni_name]
|
||||
else:
|
||||
# TODO: Should we really present a certificate if we get an
|
||||
# unexpected SNI name? Or should we just disconnect?
|
||||
pem_cert = self.tasks.values()[0]
|
||||
pem_cert = next(self.sni_names.itervalues())
|
||||
cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pem_cert)
|
||||
new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
|
||||
@@ -179,7 +187,7 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
|
||||
return False
|
||||
|
||||
def do_child_process(self, port, key):
|
||||
def do_child_process(self, port):
|
||||
"""Perform the child process side of the TCP listener task.
|
||||
|
||||
This should only be called by :meth:`start_listener`.
|
||||
@@ -189,9 +197,6 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
handler.
|
||||
|
||||
:param int port: Which TCP port to bind.
|
||||
:param key: The private key to use to respond to DVSNI challenge
|
||||
requests.
|
||||
:type key: `letsencrypt.le_util.Key`
|
||||
|
||||
"""
|
||||
signal.signal(signal.SIGINT, self.subproc_signal_handler)
|
||||
@@ -218,11 +223,6 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
self.sock.listen(1)
|
||||
# Signal that we've successfully bound TCP port
|
||||
os.kill(self.parent_pid, signal.SIGIO)
|
||||
self.private_key = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, key.key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()))
|
||||
|
||||
while True:
|
||||
self.connection, _ = self.sock.accept()
|
||||
@@ -245,16 +245,13 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
self.ssl_conn.shutdown()
|
||||
self.ssl_conn.close()
|
||||
|
||||
def start_listener(self, port, key):
|
||||
def start_listener(self, port):
|
||||
"""Start listener.
|
||||
|
||||
Create a child process which will start a TCP listener on the
|
||||
specified port to perform the specified DVSNI challenges.
|
||||
|
||||
:param int port: The TCP port to bind.
|
||||
:param key: The private key to use to respond to DVSNI challenge
|
||||
requests.
|
||||
:type key: :class:`letsencrypt.le_util.Key`
|
||||
|
||||
:returns: ``True`` or ``False`` to indicate success or failure creating
|
||||
the subprocess.
|
||||
@@ -290,7 +287,7 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
self.child_pid = os.getpid()
|
||||
# do_child_process() is normally not expected to return but
|
||||
# should terminate via sys.exit().
|
||||
return self.do_child_process(port, key)
|
||||
return self.do_child_process(port)
|
||||
|
||||
def already_listening(self, port): # pylint: disable=no-self-use
|
||||
"""Check if a process is already listening on the port.
|
||||
@@ -368,12 +365,14 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
results_if_failure = []
|
||||
if not achalls or not isinstance(achalls, list):
|
||||
raise ValueError(".perform() was called without challenge list")
|
||||
# TODO: "bits" should be user-configurable
|
||||
for achall in achalls:
|
||||
if isinstance(achall, achallenges.DVSNI):
|
||||
# We will attempt to do it
|
||||
key = achall.key # TODO: bug; one key per start_listener
|
||||
cert_pem, response = achall.gen_cert_and_response()
|
||||
self.tasks[achall.nonce_domain] = cert_pem
|
||||
response, cert_pem, _ = achall.gen_cert_and_response(
|
||||
key_pem=self.key_pem)
|
||||
self.sni_names[response.z_domain] = cert_pem
|
||||
self.tasks[achall.token] = cert_pem
|
||||
results_if_success.append(response)
|
||||
results_if_failure.append(None)
|
||||
else:
|
||||
@@ -392,7 +391,7 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
return results_if_failure
|
||||
# Try to do the authentication; note that this creates
|
||||
# the listener subprocess via os.fork()
|
||||
if self.start_listener(self.config.dvsni_port, key):
|
||||
if self.start_listener(self.config.dvsni_port):
|
||||
return results_if_success
|
||||
else:
|
||||
# TODO: This should probably raise a DVAuthError exception
|
||||
@@ -411,8 +410,8 @@ class StandaloneAuthenticator(common.Plugin):
|
||||
# Remove this from pending tasks list
|
||||
for achall in achalls:
|
||||
assert isinstance(achall, achallenges.DVSNI)
|
||||
if achall.nonce_domain in self.tasks:
|
||||
del self.tasks[achall.nonce_domain]
|
||||
if achall.token in self.tasks:
|
||||
del self.tasks[achall.token]
|
||||
else:
|
||||
# Could not find the challenge to remove!
|
||||
raise ValueError("could not find the challenge to remove")
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
"""Tests for letsencrypt.plugins.standalone.authenticator."""
|
||||
import os
|
||||
import pkg_resources
|
||||
import psutil
|
||||
import signal
|
||||
import socket
|
||||
import unittest
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
@@ -17,16 +14,14 @@ from acme import jose
|
||||
from letsencrypt import achallenges
|
||||
|
||||
from letsencrypt.tests import acme_util
|
||||
from letsencrypt.tests import test_util
|
||||
|
||||
|
||||
KEY_PATH = pkg_resources.resource_filename(
|
||||
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem"))
|
||||
KEY_DATA = pkg_resources.resource_string(
|
||||
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem"))
|
||||
KEY = jose.JWKRSA(key=jose.ComparableRSAKey(serialization.load_pem_private_key(
|
||||
KEY_DATA, password=None, backend=default_backend())))
|
||||
PRIVATE_KEY = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, KEY_DATA)
|
||||
ACCOUNT = mock.Mock(key=jose.JWKRSA.load(
|
||||
test_util.load_vector("rsa512_key.pem")))
|
||||
CHALL_KEY_PEM = test_util.load_vector("rsa512_key_2.pem")
|
||||
CHALL_KEY = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, CHALL_KEY_PEM)
|
||||
CONFIG = mock.Mock(dvsni_port=5001)
|
||||
|
||||
|
||||
@@ -80,9 +75,10 @@ class SNICallbackTest(unittest.TestCase):
|
||||
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
|
||||
self.cert = achallenges.DVSNI(
|
||||
challb=acme_util.DVSNI_P,
|
||||
domain="example.com", key=KEY).gen_cert_and_response()[0]
|
||||
self.authenticator.private_key = PRIVATE_KEY
|
||||
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
|
||||
domain="example.com",
|
||||
account=ACCOUNT).gen_cert_and_response(key_pem=CHALL_KEY_PEM)[1]
|
||||
self.authenticator.private_key = CHALL_KEY
|
||||
self.authenticator.sni_names = {"abcdef.acme.invalid": self.cert}
|
||||
self.authenticator.child_pid = 12345
|
||||
|
||||
def test_real_servername(self):
|
||||
@@ -116,7 +112,7 @@ class ClientSignalHandlerTest(unittest.TestCase):
|
||||
from letsencrypt.plugins.standalone.authenticator import \
|
||||
StandaloneAuthenticator
|
||||
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
|
||||
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
|
||||
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
|
||||
self.authenticator.child_pid = 12345
|
||||
|
||||
def test_client_signal_handler(self):
|
||||
@@ -145,7 +141,7 @@ class SubprocSignalHandlerTest(unittest.TestCase):
|
||||
from letsencrypt.plugins.standalone.authenticator import \
|
||||
StandaloneAuthenticator
|
||||
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
|
||||
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
|
||||
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
|
||||
self.authenticator.child_pid = 12345
|
||||
self.authenticator.parent_pid = 23456
|
||||
|
||||
@@ -303,12 +299,12 @@ class PerformTest(unittest.TestCase):
|
||||
|
||||
self.achall1 = achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(r="whee", nonce="foo"), "pending"),
|
||||
domain="foo.example.com", key=KEY)
|
||||
challenges.DVSNI(token=b"foo"), "pending"),
|
||||
domain="foo.example.com", account=ACCOUNT)
|
||||
self.achall2 = achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(r="whee", nonce="bar"), "pending"),
|
||||
domain="bar.example.com", key=KEY)
|
||||
challenges.DVSNI(token=b"bar"), "pending"),
|
||||
domain="bar.example.com", account=ACCOUNT)
|
||||
bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge")
|
||||
self.achalls = [self.achall1, self.achall2, bad_achall]
|
||||
|
||||
@@ -326,16 +322,16 @@ class PerformTest(unittest.TestCase):
|
||||
result = self.authenticator.perform(self.achalls)
|
||||
self.assertEqual(len(self.authenticator.tasks), 2)
|
||||
self.assertTrue(
|
||||
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
|
||||
self.authenticator.tasks.has_key(self.achall1.token))
|
||||
self.assertTrue(
|
||||
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
|
||||
self.authenticator.tasks.has_key(self.achall2.token))
|
||||
self.assertTrue(isinstance(result, list))
|
||||
self.assertEqual(len(result), 3)
|
||||
self.assertTrue(isinstance(result[0], challenges.ChallengeResponse))
|
||||
self.assertTrue(isinstance(result[1], challenges.ChallengeResponse))
|
||||
self.assertFalse(result[2])
|
||||
self.authenticator.start_listener.assert_called_once_with(
|
||||
CONFIG.dvsni_port, KEY)
|
||||
CONFIG.dvsni_port)
|
||||
|
||||
def test_cannot_perform(self):
|
||||
"""What happens if start_listener() returns False."""
|
||||
@@ -345,17 +341,17 @@ class PerformTest(unittest.TestCase):
|
||||
result = self.authenticator.perform(self.achalls)
|
||||
self.assertEqual(len(self.authenticator.tasks), 2)
|
||||
self.assertTrue(
|
||||
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
|
||||
self.authenticator.tasks.has_key(self.achall1.token))
|
||||
self.assertTrue(
|
||||
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
|
||||
self.authenticator.tasks.has_key(self.achall2.token))
|
||||
self.assertTrue(isinstance(result, list))
|
||||
self.assertEqual(len(result), 3)
|
||||
self.assertEqual(result, [None, None, False])
|
||||
self.authenticator.start_listener.assert_called_once_with(
|
||||
CONFIG.dvsni_port, KEY)
|
||||
CONFIG.dvsni_port)
|
||||
|
||||
def test_perform_with_pending_tasks(self):
|
||||
self.authenticator.tasks = {"foononce.acme.invalid": "cert_data"}
|
||||
self.authenticator.tasks = {"footoken.acme.invalid": "cert_data"}
|
||||
extra_achall = acme_util.DVSNI_P
|
||||
self.assertRaises(
|
||||
ValueError, self.authenticator.perform, [extra_achall])
|
||||
@@ -384,7 +380,7 @@ class StartListenerTest(unittest.TestCase):
|
||||
self.authenticator.do_parent_process = mock.Mock()
|
||||
self.authenticator.do_parent_process.return_value = True
|
||||
mock_fork.return_value = 22222
|
||||
result = self.authenticator.start_listener(1717, "key")
|
||||
result = self.authenticator.start_listener(1717)
|
||||
# start_listener is expected to return the True or False return
|
||||
# value from do_parent_process.
|
||||
self.assertTrue(result)
|
||||
@@ -396,10 +392,9 @@ class StartListenerTest(unittest.TestCase):
|
||||
self.authenticator.do_parent_process = mock.Mock()
|
||||
self.authenticator.do_child_process = mock.Mock()
|
||||
mock_fork.return_value = 0
|
||||
self.authenticator.start_listener(1717, "key")
|
||||
self.authenticator.start_listener(1717)
|
||||
self.assertEqual(self.authenticator.child_pid, os.getpid())
|
||||
self.authenticator.do_child_process.assert_called_once_with(
|
||||
1717, "key")
|
||||
self.authenticator.do_child_process.assert_called_once_with(1717)
|
||||
|
||||
|
||||
class DoParentProcessTest(unittest.TestCase):
|
||||
@@ -452,9 +447,10 @@ class DoChildProcessTest(unittest.TestCase):
|
||||
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
|
||||
self.cert = achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(r=("x" * 32), nonce="abcdef"), "pending"),
|
||||
domain="example.com", key=KEY).gen_cert_and_response()[0]
|
||||
self.authenticator.private_key = PRIVATE_KEY
|
||||
challenges.DVSNI(token=b"abcdef"), "pending"),
|
||||
domain="example.com", account=ACCOUNT).gen_cert_and_response(
|
||||
key_pem=CHALL_KEY_PEM)[1]
|
||||
self.authenticator.private_key = CHALL_KEY
|
||||
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
|
||||
self.authenticator.parent_pid = 12345
|
||||
|
||||
@@ -475,7 +471,7 @@ class DoChildProcessTest(unittest.TestCase):
|
||||
# do_child_process code assumes that calling sys.exit() will
|
||||
# cause subsequent code not to be executed.)
|
||||
self.assertRaises(
|
||||
IndentationError, self.authenticator.do_child_process, 1717, KEY)
|
||||
IndentationError, self.authenticator.do_child_process, 1717)
|
||||
mock_exit.assert_called_once_with(1)
|
||||
mock_kill.assert_called_once_with(12345, signal.SIGUSR2)
|
||||
|
||||
@@ -490,7 +486,7 @@ class DoChildProcessTest(unittest.TestCase):
|
||||
sample_socket.bind.side_effect = eaccess
|
||||
mock_socket.return_value = sample_socket
|
||||
self.assertRaises(
|
||||
IndentationError, self.authenticator.do_child_process, 1717, KEY)
|
||||
IndentationError, self.authenticator.do_child_process, 1717)
|
||||
mock_exit.assert_called_once_with(1)
|
||||
mock_kill.assert_called_once_with(12345, signal.SIGUSR1)
|
||||
|
||||
@@ -506,7 +502,7 @@ class DoChildProcessTest(unittest.TestCase):
|
||||
sample_socket.bind.side_effect = eio
|
||||
mock_socket.return_value = sample_socket
|
||||
self.assertRaises(
|
||||
socket.error, self.authenticator.do_child_process, 1717, KEY)
|
||||
socket.error, self.authenticator.do_child_process, 1717)
|
||||
|
||||
@mock.patch("letsencrypt.plugins.standalone.authenticator."
|
||||
"OpenSSL.SSL.Connection")
|
||||
@@ -519,7 +515,7 @@ class DoChildProcessTest(unittest.TestCase):
|
||||
mock_socket.return_value = sample_socket
|
||||
mock_connection.return_value = mock.MagicMock()
|
||||
self.assertRaises(
|
||||
CallableExhausted, self.authenticator.do_child_process, 1717, KEY)
|
||||
CallableExhausted, self.authenticator.do_child_process, 1717)
|
||||
mock_socket.assert_called_once_with()
|
||||
sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717))
|
||||
sample_socket.listen.assert_called_once_with(1)
|
||||
@@ -538,9 +534,9 @@ class CleanupTest(unittest.TestCase):
|
||||
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
|
||||
self.achall = achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(r="whee", nonce="foononce"), "pending"),
|
||||
domain="foo.example.com", key="key")
|
||||
self.authenticator.tasks = {self.achall.nonce_domain: "stuff"}
|
||||
challenges.DVSNI(token=b"footoken"), "pending"),
|
||||
domain="foo.example.com", account=mock.Mock(key="key"))
|
||||
self.authenticator.tasks = {self.achall.token: "stuff"}
|
||||
self.authenticator.child_pid = 12345
|
||||
|
||||
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
|
||||
@@ -558,8 +554,8 @@ class CleanupTest(unittest.TestCase):
|
||||
self.assertRaises(
|
||||
ValueError, self.authenticator.cleanup, [achallenges.DVSNI(
|
||||
challb=acme_util.chall_to_challb(
|
||||
challenges.DVSNI(r="whee", nonce="badnonce"), "pending"),
|
||||
domain="bad.example.com", key="key")])
|
||||
challenges.DVSNI(token=b"badtoken"), "pending"),
|
||||
domain="bad.example.com", account=mock.Mock(key="key"))])
|
||||
|
||||
|
||||
class MoreInfoTest(unittest.TestCase):
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
"""Recovery Token Identifier Validation Challenge."""
|
||||
import errno
|
||||
import os
|
||||
|
||||
import zope.component
|
||||
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt import interfaces
|
||||
|
||||
|
||||
class RecoveryToken(object):
|
||||
"""Recovery Token Identifier Validation Challenge.
|
||||
|
||||
Based on draft-barnes-acme, section 6.4.
|
||||
|
||||
"""
|
||||
def __init__(self, server, direc):
|
||||
self.token_dir = os.path.join(direc, server)
|
||||
|
||||
def perform(self, chall):
|
||||
"""Perform the Recovery Token Challenge.
|
||||
|
||||
:param chall: Recovery Token Challenge
|
||||
:type chall: :class:`letsencrypt.achallenges.RecoveryToken`
|
||||
|
||||
:returns: response
|
||||
:rtype: dict
|
||||
|
||||
"""
|
||||
token_fp = os.path.join(self.token_dir, chall.domain)
|
||||
if os.path.isfile(token_fp):
|
||||
with open(token_fp) as token_fd:
|
||||
return challenges.RecoveryTokenResponse(token=token_fd.read())
|
||||
|
||||
cancel, token = zope.component.getUtility(
|
||||
interfaces.IDisplay).input(
|
||||
"%s - Input Recovery Token: " % chall.domain)
|
||||
if cancel != 1:
|
||||
return challenges.RecoveryTokenResponse(token=token)
|
||||
|
||||
return None
|
||||
|
||||
def cleanup(self, chall):
|
||||
"""Cleanup the saved recovery token if it exists.
|
||||
|
||||
:param chall: Recovery Token Challenge
|
||||
:type chall: :class:`letsencrypt.achallenges.RecoveryToken`
|
||||
|
||||
"""
|
||||
try:
|
||||
le_util.safely_remove(os.path.join(self.token_dir, chall.domain))
|
||||
except OSError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def requires_human(self, domain):
|
||||
"""Indicates whether or not domain can be auto solved."""
|
||||
return not os.path.isfile(os.path.join(self.token_dir, domain))
|
||||
|
||||
def store_token(self, domain, token):
|
||||
"""Store token for later automatic use.
|
||||
|
||||
:param str domain: domain associated with the token
|
||||
:param str token: token from authorization
|
||||
|
||||
"""
|
||||
le_util.make_or_verify_dir(self.token_dir, 0o700, os.geteuid())
|
||||
|
||||
with open(os.path.join(self.token_dir, domain), "w") as token_fd:
|
||||
token_fd.write(str(token))
|
||||
@@ -63,7 +63,6 @@ class ReportNewAccountTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.config = mock.MagicMock(config_dir="/etc/letsencrypt")
|
||||
reg = messages.Registration.from_data(email="rhino@jungle.io")
|
||||
reg = reg.update(recovery_token="ECCENTRIC INVISIBILITY RHINOCEROS")
|
||||
self.acc = mock.MagicMock(regr=messages.RegistrationResource(
|
||||
uri=None, new_authzr_uri=None, body=reg))
|
||||
|
||||
@@ -81,7 +80,6 @@ class ReportNewAccountTest(unittest.TestCase):
|
||||
self._call()
|
||||
call_list = mock_zope().add_message.call_args_list
|
||||
self.assertTrue(self.config.config_dir in call_list[0][0][0])
|
||||
self.assertTrue(self.acc.regr.body.recovery_token in call_list[1][0][0])
|
||||
self.assertTrue(
|
||||
", ".join(self.acc.regr.body.emails) in call_list[1][0][0])
|
||||
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
"""Tests for letsencrypt.achallenges."""
|
||||
import unittest
|
||||
|
||||
import OpenSSL
|
||||
import mock
|
||||
|
||||
from acme import challenges
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
from acme import jose
|
||||
|
||||
from letsencrypt.tests import acme_util
|
||||
@@ -15,28 +14,21 @@ class DVSNITest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.achallenges.DVSNI."""
|
||||
|
||||
def setUp(self):
|
||||
self.chall = acme_util.chall_to_challb(
|
||||
challenges.DVSNI(r="r_value", nonce="12345ABCDE"), "pending")
|
||||
self.response = challenges.DVSNIResponse()
|
||||
key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
|
||||
|
||||
self.challb = acme_util.chall_to_challb(acme_util.DVSNI, "pending")
|
||||
account = mock.Mock(key=jose.JWKRSA.load(
|
||||
test_util.load_vector("rsa512_key.pem")))
|
||||
from letsencrypt.achallenges import DVSNI
|
||||
self.achall = DVSNI(challb=self.chall, domain="example.com", key=key)
|
||||
self.achall = DVSNI(
|
||||
challb=self.challb, domain="example.com", account=account)
|
||||
|
||||
def test_proxy(self):
|
||||
self.assertEqual(self.chall.r, self.achall.r)
|
||||
self.assertEqual(self.chall.nonce, self.achall.nonce)
|
||||
self.assertEqual(self.challb.token, self.achall.token)
|
||||
|
||||
def test_gen_cert_and_response(self):
|
||||
cert_pem, _ = self.achall.gen_cert_and_response(s=self.response.s)
|
||||
|
||||
cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert_pem)
|
||||
self.assertEqual(cert.get_subject().CN, "example.com")
|
||||
# pylint: disable=protected-access
|
||||
self.assertEqual(acme_crypto_util._pyopenssl_cert_or_req_san(cert), [
|
||||
"example.com", self.chall.nonce_domain,
|
||||
self.response.z_domain(self.chall)])
|
||||
response, cert_pem, key_pem = self.achall.gen_cert_and_response()
|
||||
self.assertTrue(isinstance(response, challenges.DVSNIResponse))
|
||||
self.assertTrue(isinstance(cert_pem, bytes))
|
||||
self.assertTrue(isinstance(key_pem, bytes))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -15,14 +15,12 @@ KEY = test_util.load_rsa_private_key('rsa512_key.pem')
|
||||
SIMPLE_HTTP = challenges.SimpleHTTP(
|
||||
token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
|
||||
DVSNI = challenges.DVSNI(
|
||||
r=jose.b64decode("Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI"),
|
||||
nonce=jose.b64decode("a82d5ff8ef740d12881f6d3c2277ab2e"))
|
||||
token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA"))
|
||||
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
|
||||
RECOVERY_CONTACT = challenges.RecoveryContact(
|
||||
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",
|
||||
success_url="https://example.ca/confirmrecovery/bb1b9928932",
|
||||
contact="c********n@example.com")
|
||||
RECOVERY_TOKEN = challenges.RecoveryToken()
|
||||
POP = challenges.ProofOfPossession(
|
||||
alg="RS256", nonce=jose.b64decode("eET5udtV7aoX8Xl8gYiZIA"),
|
||||
hints=challenges.ProofOfPossession.Hints(
|
||||
@@ -43,7 +41,7 @@ POP = challenges.ProofOfPossession(
|
||||
)
|
||||
)
|
||||
|
||||
CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, RECOVERY_TOKEN, POP]
|
||||
CHALLENGES = [SIMPLE_HTTP, DVSNI, DNS, RECOVERY_CONTACT, POP]
|
||||
DV_CHALLENGES = [chall for chall in CHALLENGES
|
||||
if isinstance(chall, challenges.DVChallenge)]
|
||||
CONT_CHALLENGES = [chall for chall in CHALLENGES
|
||||
@@ -85,11 +83,9 @@ DVSNI_P = chall_to_challb(DVSNI, messages.STATUS_PENDING)
|
||||
SIMPLE_HTTP_P = chall_to_challb(SIMPLE_HTTP, messages.STATUS_PENDING)
|
||||
DNS_P = chall_to_challb(DNS, messages.STATUS_PENDING)
|
||||
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages.STATUS_PENDING)
|
||||
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages.STATUS_PENDING)
|
||||
POP_P = chall_to_challb(POP, messages.STATUS_PENDING)
|
||||
|
||||
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P,
|
||||
RECOVERY_CONTACT_P, RECOVERY_TOKEN_P, POP_P]
|
||||
CHALLENGES_P = [SIMPLE_HTTP_P, DVSNI_P, DNS_P, RECOVERY_CONTACT_P, POP_P]
|
||||
DV_CHALLENGES_P = [challb for challb in CHALLENGES_P
|
||||
if isinstance(challb.chall, challenges.DVChallenge)]
|
||||
CONT_CHALLENGES_P = [
|
||||
|
||||
@@ -19,7 +19,6 @@ TRANSLATE = {
|
||||
"dvsni": "DVSNI",
|
||||
"simpleHttp": "SimpleHTTP",
|
||||
"dns": "DNS",
|
||||
"recoveryToken": "RecoveryToken",
|
||||
"recoveryContact": "RecoveryContact",
|
||||
"proofOfPossession": "ProofOfPossession",
|
||||
}
|
||||
@@ -41,7 +40,8 @@ class ChallengeFactoryTest(unittest.TestCase):
|
||||
[messages.STATUS_PENDING]*6, False)
|
||||
|
||||
def test_all(self):
|
||||
cont_c, dv_c = self.handler._challenge_factory(self.dom, range(0, 6))
|
||||
cont_c, dv_c = self.handler._challenge_factory(
|
||||
self.dom, range(0, len(acme_util.CHALLENGES)))
|
||||
|
||||
self.assertEqual(
|
||||
[achall.chall for achall in cont_c], acme_util.CONT_CHALLENGES)
|
||||
@@ -49,10 +49,10 @@ class ChallengeFactoryTest(unittest.TestCase):
|
||||
[achall.chall for achall in dv_c], acme_util.DV_CHALLENGES)
|
||||
|
||||
def test_one_dv_one_cont(self):
|
||||
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 4])
|
||||
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 3])
|
||||
|
||||
self.assertEqual(
|
||||
[achall.chall for achall in cont_c], [acme_util.RECOVERY_TOKEN])
|
||||
[achall.chall for achall in cont_c], [acme_util.RECOVERY_CONTACT])
|
||||
self.assertEqual([achall.chall for achall in dv_c], [acme_util.DVSNI])
|
||||
|
||||
def test_unrecognized(self):
|
||||
@@ -80,7 +80,7 @@ class GetAuthorizationsTest(unittest.TestCase):
|
||||
|
||||
self.mock_dv_auth.get_chall_pref.return_value = [challenges.DVSNI]
|
||||
self.mock_cont_auth.get_chall_pref.return_value = [
|
||||
challenges.RecoveryToken]
|
||||
challenges.RecoveryContact]
|
||||
|
||||
self.mock_cont_auth.perform.side_effect = gen_auth_resp
|
||||
self.mock_dv_auth.perform.side_effect = gen_auth_resp
|
||||
@@ -196,7 +196,7 @@ class PollChallengesTest(unittest.TestCase):
|
||||
self.chall_update = {}
|
||||
for dom in self.doms:
|
||||
self.chall_update[dom] = [
|
||||
challb_to_achall(challb, "dummy_key", dom)
|
||||
challb_to_achall(challb, mock.Mock(key="dummy_key"), dom)
|
||||
for challb in self.handler.authzr[dom].body.challenges]
|
||||
|
||||
@mock.patch("letsencrypt.auth_handler.time")
|
||||
@@ -313,11 +313,11 @@ class GenChallengePathTest(unittest.TestCase):
|
||||
self.assertTrue(self._call(challbs[::-1], prefs, None))
|
||||
|
||||
def test_common_case_with_continuity(self):
|
||||
challbs = (acme_util.RECOVERY_TOKEN_P,
|
||||
challbs = (acme_util.POP_P,
|
||||
acme_util.RECOVERY_CONTACT_P,
|
||||
acme_util.DVSNI_P,
|
||||
acme_util.SIMPLE_HTTP_P)
|
||||
prefs = [challenges.RecoveryToken, challenges.DVSNI]
|
||||
prefs = [challenges.ProofOfPossession, challenges.DVSNI]
|
||||
combos = acme_util.gen_combos(challbs)
|
||||
self.assertEqual(self._call(challbs, prefs, combos), (0, 2))
|
||||
|
||||
@@ -325,21 +325,19 @@ class GenChallengePathTest(unittest.TestCase):
|
||||
self.assertTrue(self._call(challbs, prefs, None))
|
||||
|
||||
def test_full_cont_server(self):
|
||||
challbs = (acme_util.RECOVERY_TOKEN_P,
|
||||
acme_util.RECOVERY_CONTACT_P,
|
||||
challbs = (acme_util.RECOVERY_CONTACT_P,
|
||||
acme_util.POP_P,
|
||||
acme_util.DVSNI_P,
|
||||
acme_util.SIMPLE_HTTP_P,
|
||||
acme_util.DNS_P)
|
||||
# Typical webserver client that can do everything except DNS
|
||||
# Attempted to make the order realistic
|
||||
prefs = [challenges.RecoveryToken,
|
||||
challenges.ProofOfPossession,
|
||||
prefs = [challenges.ProofOfPossession,
|
||||
challenges.SimpleHTTP,
|
||||
challenges.DVSNI,
|
||||
challenges.RecoveryContact]
|
||||
combos = acme_util.gen_combos(challbs)
|
||||
self.assertEqual(self._call(challbs, prefs, combos), (0, 4))
|
||||
self.assertEqual(self._call(challbs, prefs, combos), (1, 3))
|
||||
|
||||
# Dumb path trivial test
|
||||
self.assertTrue(self._call(challbs, prefs, None))
|
||||
@@ -444,13 +442,13 @@ class ReportFailedChallsTest(unittest.TestCase):
|
||||
self.dvsni_same = achallenges.DVSNI(
|
||||
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
|
||||
domain="example.com",
|
||||
key=acme_util.KEY)
|
||||
account=mock.Mock(key=acme_util.KEY))
|
||||
|
||||
kwargs["error"] = messages.Error(typ="dnssec", detail="detail")
|
||||
self.dvsni_diff = achallenges.DVSNI(
|
||||
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
|
||||
domain="foo.bar",
|
||||
key=acme_util.KEY)
|
||||
account=mock.Mock(key=acme_util.KEY))
|
||||
|
||||
@mock.patch("letsencrypt.auth_handler.zope.component.getUtility")
|
||||
def test_same_error_and_domain(self, mock_zope):
|
||||
|
||||
@@ -36,7 +36,6 @@ class NamespaceConfigTest(unittest.TestCase):
|
||||
constants.CERT_DIR = 'certs'
|
||||
constants.IN_PROGRESS_DIR = '../p'
|
||||
constants.KEY_DIR = 'keys'
|
||||
constants.REC_TOKEN_DIR = '/r'
|
||||
constants.TEMP_CHECKPOINT_DIR = 't'
|
||||
|
||||
self.assertEqual(
|
||||
@@ -47,7 +46,6 @@ class NamespaceConfigTest(unittest.TestCase):
|
||||
self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new')
|
||||
self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p')
|
||||
self.assertEqual(self.config.key_dir, '/tmp/config/keys')
|
||||
self.assertEqual(self.config.rec_token_dir, '/r')
|
||||
self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t')
|
||||
|
||||
|
||||
|
||||
@@ -17,54 +17,30 @@ class PerformTest(unittest.TestCase):
|
||||
|
||||
self.auth = ContinuityAuthenticator(
|
||||
mock.MagicMock(server="demo_server.org"), None)
|
||||
self.auth.rec_token.perform = mock.MagicMock(
|
||||
name="rec_token_perform", side_effect=gen_client_resp)
|
||||
self.auth.proof_of_pos.perform = mock.MagicMock(
|
||||
name="proof_of_pos_perform", side_effect=gen_client_resp)
|
||||
|
||||
def test_rec_token1(self):
|
||||
token = achallenges.RecoveryToken(challb=None, domain="0")
|
||||
responses = self.auth.perform([token])
|
||||
self.assertEqual(responses, ["RecoveryToken0"])
|
||||
|
||||
def test_rec_token5(self):
|
||||
tokens = []
|
||||
for i in xrange(5):
|
||||
tokens.append(achallenges.RecoveryToken(challb=None, domain=str(i)))
|
||||
|
||||
responses = self.auth.perform(tokens)
|
||||
|
||||
self.assertEqual(len(responses), 5)
|
||||
for i in xrange(5):
|
||||
self.assertEqual(responses[i], "RecoveryToken%d" % i)
|
||||
|
||||
def test_pop_and_rec_token(self):
|
||||
def test_pop(self):
|
||||
achalls = []
|
||||
for i in xrange(4):
|
||||
if i % 2 == 0:
|
||||
achalls.append(achallenges.RecoveryToken(challb=None,
|
||||
domain=str(i)))
|
||||
else:
|
||||
achalls.append(achallenges.ProofOfPossession(challb=None,
|
||||
domain=str(i)))
|
||||
achalls.append(achallenges.ProofOfPossession(
|
||||
challb=None, domain=str(i)))
|
||||
responses = self.auth.perform(achalls)
|
||||
|
||||
self.assertEqual(len(responses), 4)
|
||||
for i in xrange(4):
|
||||
if i % 2 == 0:
|
||||
self.assertEqual(responses[i], "RecoveryToken%d" % i)
|
||||
else:
|
||||
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
|
||||
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
|
||||
|
||||
def test_unexpected(self):
|
||||
self.assertRaises(
|
||||
errors.ContAuthError, self.auth.perform, [
|
||||
achallenges.DVSNI(challb=None, domain="0", key="invalid_key")])
|
||||
achallenges.DVSNI(challb=None, domain="0",
|
||||
account=mock.Mock(key="invalid_key"))])
|
||||
|
||||
def test_chall_pref(self):
|
||||
self.assertEqual(
|
||||
self.auth.get_chall_pref("example.com"),
|
||||
[challenges.ProofOfPossession, challenges.RecoveryToken])
|
||||
[challenges.ProofOfPossession])
|
||||
|
||||
|
||||
class CleanupTest(unittest.TestCase):
|
||||
@@ -75,24 +51,11 @@ class CleanupTest(unittest.TestCase):
|
||||
|
||||
self.auth = ContinuityAuthenticator(
|
||||
mock.MagicMock(server="demo_server.org"), None)
|
||||
self.mock_cleanup = mock.MagicMock(name="rec_token_cleanup")
|
||||
self.auth.rec_token.cleanup = self.mock_cleanup
|
||||
|
||||
def test_rec_token2(self):
|
||||
token1 = achallenges.RecoveryToken(challb=None, domain="0")
|
||||
token2 = achallenges.RecoveryToken(challb=None, domain="1")
|
||||
|
||||
self.auth.cleanup([token1, token2])
|
||||
|
||||
self.assertEqual(self.mock_cleanup.call_args_list,
|
||||
[mock.call(token1), mock.call(token2)])
|
||||
|
||||
def test_unexpected(self):
|
||||
token = achallenges.RecoveryToken(challb=None, domain="0")
|
||||
unexpected = achallenges.DVSNI(challb=None, domain="0", key="dummy_key")
|
||||
|
||||
self.assertRaises(
|
||||
errors.ContAuthError, self.auth.cleanup, [token, unexpected])
|
||||
unexpected = achallenges.DVSNI(
|
||||
challb=None, domain="0", account=mock.Mock("dummy_key"))
|
||||
self.assertRaises(errors.ContAuthError, self.auth.cleanup, [unexpected])
|
||||
|
||||
|
||||
def gen_client_resp(chall):
|
||||
|
||||
@@ -227,6 +227,37 @@ class UniqueLineageNameTest(unittest.TestCase):
|
||||
self.assertRaises(OSError, self._call, "wow")
|
||||
|
||||
|
||||
class SafelyRemoveTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.le_util.safely_remove."""
|
||||
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.path = os.path.join(self.tmp, "foo")
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp)
|
||||
|
||||
def _call(self):
|
||||
from letsencrypt.le_util import safely_remove
|
||||
return safely_remove(self.path)
|
||||
|
||||
def test_exists(self):
|
||||
with open(self.path, "w"):
|
||||
pass # just create the file
|
||||
self._call()
|
||||
self.assertFalse(os.path.exists(self.path))
|
||||
|
||||
def test_missing(self):
|
||||
self._call()
|
||||
# no error, yay!
|
||||
self.assertFalse(os.path.exists(self.path))
|
||||
|
||||
@mock.patch("letsencrypt.le_util.os.remove")
|
||||
def test_other_error_passthrough(self, mock_remove):
|
||||
mock_remove.side_effect = OSError
|
||||
self.assertRaises(OSError, self._call)
|
||||
|
||||
|
||||
class SafeEmailTest(unittest.TestCase):
|
||||
"""Test safe_email."""
|
||||
@classmethod
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
"""Tests for recovery_token.py."""
|
||||
import os
|
||||
import unittest
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import mock
|
||||
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import achallenges
|
||||
|
||||
|
||||
class RecoveryTokenTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.recovery_token import RecoveryToken
|
||||
server = "demo_server"
|
||||
self.base_dir = tempfile.mkdtemp("tokens")
|
||||
self.token_dir = os.path.join(self.base_dir, server)
|
||||
self.rec_token = RecoveryToken(server, self.base_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.base_dir)
|
||||
|
||||
def test_store_token(self):
|
||||
self.rec_token.store_token("example.com", 111)
|
||||
path = os.path.join(self.token_dir, "example.com")
|
||||
self.assertTrue(os.path.isfile(path))
|
||||
with open(path) as token_fd:
|
||||
self.assertEqual(token_fd.read(), "111")
|
||||
|
||||
def test_requires_human(self):
|
||||
self.rec_token.store_token("example2.com", 222)
|
||||
self.assertFalse(self.rec_token.requires_human("example2.com"))
|
||||
self.assertTrue(self.rec_token.requires_human("example3.com"))
|
||||
|
||||
def test_cleanup(self):
|
||||
self.rec_token.store_token("example3.com", 333)
|
||||
self.assertFalse(self.rec_token.requires_human("example3.com"))
|
||||
|
||||
self.rec_token.cleanup(achallenges.RecoveryToken(
|
||||
challb=challenges.RecoveryToken(), domain="example3.com"))
|
||||
self.assertTrue(self.rec_token.requires_human("example3.com"))
|
||||
|
||||
# Shouldn't throw an error
|
||||
self.rec_token.cleanup(achallenges.RecoveryToken(
|
||||
challb=None, domain="example4.com"))
|
||||
|
||||
# SHOULD throw an error (OSError other than nonexistent file)
|
||||
self.assertRaises(
|
||||
OSError, self.rec_token.cleanup,
|
||||
achallenges.RecoveryToken(
|
||||
challb=None, domain=("a" + "r" * 10000 + ".com")))
|
||||
|
||||
def test_perform_stored(self):
|
||||
self.rec_token.store_token("example4.com", 444)
|
||||
response = self.rec_token.perform(
|
||||
achallenges.RecoveryToken(
|
||||
challb=challenges.RecoveryToken(), domain="example4.com"))
|
||||
|
||||
self.assertEqual(
|
||||
response, challenges.RecoveryTokenResponse(token="444"))
|
||||
|
||||
@mock.patch("letsencrypt.recovery_token.zope.component.getUtility")
|
||||
def test_perform_not_stored(self, mock_input):
|
||||
mock_input().input.side_effect = [(0, "555"), (1, "000")]
|
||||
response = self.rec_token.perform(
|
||||
achallenges.RecoveryToken(
|
||||
challb=challenges.RecoveryToken(), domain="example5.com"))
|
||||
self.assertEqual(
|
||||
response, challenges.RecoveryTokenResponse(token="555"))
|
||||
|
||||
response = self.rec_token.perform(
|
||||
achallenges.RecoveryToken(
|
||||
challb=challenges.RecoveryToken(), domain="example6.com"))
|
||||
self.assertTrue(response is None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
Reference in New Issue
Block a user