1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-23 07:20:55 +03:00

acme.util.ComparableX509

This commit is contained in:
Jakub Warmuz
2015-02-12 14:40:19 +00:00
parent a57574cbba
commit a2e807debf
4 changed files with 47 additions and 25 deletions

View File

@@ -262,17 +262,10 @@ class Certificate(Message):
fields["refresh"] = self.refresh
return fields
def __eq__(self, other):
# pylint: disable=redefined-outer-name
# M2Crypto.X509 does not implement __eq__, do it manually
return isinstance(other, Certificate) and self.certificate.as_der(
) == other.certificate.as_der() and [
cert.as_der() for cert in self.chain] == [
cert.as_der() for cert in other.chain]
@classmethod
def _decode_cert(cls, b64der):
return M2Crypto.X509.load_cert_der_string(jose.b64decode(b64der))
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
@@ -290,7 +283,7 @@ class Certificate(Message):
class CertificateRequest(Message):
"""ACME "certificateRequest" message.
:ivar str csr: DER encoded CSR.
:ivar str csr: CSR.
:ivar signature: Signature.
:type signature: :class:`letsencrypt.acme.other.Signature`
@@ -313,7 +306,7 @@ class CertificateRequest(Message):
"""
return cls(signature=other.Signature.from_msg(
kwargs["csr"], key, sig_nonce), **kwargs)
kwargs["csr"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
@@ -324,17 +317,26 @@ class CertificateRequest(Message):
"""
# TODO: must also check that the public key encoded in the JWK object
# is the correct key for a given context.
return self.signature.verify(self.csr)
return self.signature.verify(self.csr.as_der())
@classmethod
def _decode_csr(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_request_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_csr(cls, csr):
return jose.b64encode(csr.as_der())
def _fields_to_json(self):
return {
"csr": jose.b64encode(self.csr),
"csr": self._encode_csr(self.csr),
"signature": self.signature,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(csr=jose.b64decode(jobj["csr"]),
return cls(csr=cls._decode_csr(jobj["csr"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))

View File

@@ -13,8 +13,10 @@ from letsencrypt.acme import other
KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
CERT = M2Crypto.X509.load_cert_string(pkg_resources.resource_string(
CERT = M2Crypto.X509.load_cert(pkg_resources.resource_filename(
'letsencrypt.client.tests', 'testdata/cert.pem'))
CSR = M2Crypto.X509.load_request(pkg_resources.resource_filename(
'letsencrypt.client.tests', 'testdata/csr.pem'))
class MessageTest(unittest.TestCase):
@@ -294,28 +296,27 @@ class CertificateTest(unittest.TestCase):
class CertificateRequestTest(unittest.TestCase):
def setUp(self):
self.csr = 'TODO: real DER CSR?'
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='\x1cD\x157\x83\x14\xd7 \xeb\x02\xb3\xf6O\xb5\x99C]\x97'
'\x94p\xa7\xe48\x13>\x06\xf9yd\xf9\xfe\xf8\xd1>\x9aKH'
'\xd7\xba\xb9a1\xf5!p\x1b\xd7}\xbaj\xa7\xe3\xd9\xd9\t%'
'\xbb\xba\xc9\x00\xdaW\x16\xe9',
sig='\x15\xed\x84\xaa:\xf2DO\x0e9 \xbcg\xf8\xc0\xcf\x87\x9a'
'\x95\xeb\xffT[\x84[\xec\x85\x7f\x8eK\xe9\xc2\x12\xc8Q'
'\xafo\xc6h\x07\xba\xa6\xdf\xd1\xa7"$\xba=Z\x13n\x14\x0b'
'k\xfe\xee\xb4\xe4\xc8\x05\x9a\x08\xa7',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9')
from letsencrypt.acme.messages import CertificateRequest
self.msg = CertificateRequest(csr=self.csr, signature=signature)
self.msg = CertificateRequest(csr=CSR, signature=signature)
self.jmsg = {
'type': 'certificateRequest',
'csr': 'VE9ETzogcmVhbCBERVIgQ1NSPw',
'csr': jose.b64encode(CSR.as_der()),
'signature': signature,
}
def test_create(self):
from letsencrypt.acme.messages import CertificateRequest
self.assertEqual(self.msg, CertificateRequest.create(
csr=self.csr, key=KEY,
csr=CSR, key=KEY,
sig_nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'))
def test_verify(self):

View File

@@ -9,6 +9,25 @@ from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
class ComparableX509(object): # pylint: disable=too-few-public-methods
"""Wrapper for M2Crypto.X509.* objects that supports __eq__.
Wraps around:
- :class:`M2Crypto.X509.X509`
- :class:`M2Crypto.X509.Request`
"""
def __init__(self, wrapped):
self._wrapped = wrapped
def __getattr__(self, name):
return getattr(self._wrapped, name)
def __eq__(self, other):
return self.as_der() == other.as_der()
def load_schema(name):
"""Load JSON schema from distribution."""
return json.load(open(pkg_resources.resource_filename(

View File

@@ -130,8 +130,8 @@ class Client(object):
logging.info("Preparing and sending CSR...")
return self.network.send_and_receive_expected(
acme.messages.CertificateRequest.create(
csr=csr_der, key=Crypto.PublicKey.RSA.importKey(
self.authkey.pem)),
csr=M2Crypto.X509.load_request_der_string(csr_der),
key=Crypto.PublicKey.RSA.importKey(self.authkey.pem)),
acme.messages.Certificate)
def save_certificate(self, certificate_msg, cert_path, chain_path):