1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Tests for ACME message factories

This commit is contained in:
Jakub Warmuz
2014-12-09 21:55:27 +01:00
parent 231f3d5c61
commit b5a51de16b
3 changed files with 121 additions and 21 deletions

View File

@@ -64,7 +64,7 @@ def pretty(json_string):
def challenge_request(name):
"""Create ACME "challengeRequest message.
:param unicode name: Domain name
:param str name: Domain name
:returns: ACME "challengeRequest" message.
:rtype: dict
@@ -76,15 +76,17 @@ def challenge_request(name):
}
def authorization_request(req_id, name, server_nonce, responses, key):
def authorization_request(req_id, name, server_nonce, responses, key,
nonce=None):
"""Create ACME "authorizationRequest" message.
:param str req_id: SessionID from the server challenge
:param unicode name: Hostname
:param str name: Hostname
:param str server_nonce: Nonce from the server challenge
:param list responses: List of completed challenges
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "authorizationRequest" message.
:rtype: dict
@@ -96,16 +98,17 @@ def authorization_request(req_id, name, server_nonce, responses, key):
"nonce": server_nonce,
"responses": responses,
"signature": crypto_util.create_sig(
name + le_util.jose_b64decode(server_nonce), key),
name + le_util.jose_b64decode(server_nonce), key, nonce),
}
def certificate_request(csr_der, key):
def certificate_request(csr_der, key, nonce=None):
"""Create ACME "certificateRequest" message.
:param str csr_der: DER encoded CSR.
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "certificateRequest" message.
:rtype: dict
@@ -114,17 +117,17 @@ def certificate_request(csr_der, key):
return {
"type": "certificateRequest",
"csr": le_util.jose_b64encode(csr_der),
"signature": crypto_util.create_sig(csr_der, key),
"signature": crypto_util.create_sig(csr_der, key, nonce),
}
def revocation_request(key_file, cert_der):
def revocation_request(cert_der, key, nonce=None):
"""Create ACME "revocationRequest" message.
:param str key_file: Path to a file containing RSA key. Accepted
formats are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str cert_der: DER encoded certificate.
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "revocationRequest" message.
:rtype: dict
@@ -133,14 +136,14 @@ def revocation_request(key_file, cert_der):
return {
"type": "revocationRequest",
"certificate": le_util.jose_b64encode(cert_der),
"signature": crypto_util.create_sig(cert_der, key_file),
"signature": crypto_util.create_sig(cert_der, key, nonce),
}
def status_request(token):
"""Create ACME "statusRequest" message.
:param str token: Token provided in ACME "defer" message.
:param unicode token: Token provided in ACME "defer" message.
:returns: ACME "statusRequest" message.
:rtype: dict

View File

@@ -54,19 +54,116 @@ class PrettyTest(unittest.TestCase):
'{\n "foo": {\n "bar": "baz"\n }\n}')
class ChallengeRequestTest(unittest.TestCase):
"""Tests for letsencrypt.client.acme.challenge_request"""
class MessageFactoriesTest(unittest.TestCase):
"""Tests for ACME message factories from letsencrypt.client.acme."""
def test_supports_unicode(self):
"""Test support unicode parameter"""
def setUp(self):
self.privkey = """-----BEGIN RSA PRIVATE KEY-----
MIIBOgIBAAJBAKx1c7RR7R/drnBSQ/zfx1vQLHUbFLh1AQQQ5R8DZUXd36efNK79
vukFhN9HFoHZiUvOjm0c+pVE6K+EdE/twuUCAwEAAQJAMbrEnJCrQe8YqAbw1/Bn
elAzIamndfE3U8bTavf9sgFpS4HL83rhd6PDbvx81ucaJAT/5x048fM/nFl4fzAc
mQIhAOF/a9o3EIsDKEmUl+Z1OaOiUxDF3kqWSmALEsmvDhwXAiEAw8ljV5RO/rUp
Zu2YMDFq3MKpyyMgBIJ8CxmGRc6gCmMCIGRQzkcmhfqBrhOFwkmozrqIBRIKJIjj
8TRm2LXWZZ2DAiAqVO7PztdNpynugUy4jtbGKKjBrTSNBRGA7OHlUgm0dQIhALQq
6oGU29Vxlvt3k0vmiRKU4AVfLyNXIGtcWcNG46h/
-----END RSA PRIVATE KEY-----"""
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
def _validate(self, msg):
from letsencrypt.client.acme import SCHEMATA
jsonschema.validate(msg, SCHEMATA[msg['type']])
def _signature(self, sig):
return {
'nonce': self.b64nonce,
'alg': 'RS256',
'jwk': {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
},
'sig': sig,
}
def test_challenge_request(self):
from letsencrypt.client.acme import challenge_request
self.assertEqual(
challenge_request(u'unicode'),
msg = challenge_request('example.com')
self.assertEqual(msg, {
'type': 'challengeRequest',
'identifier': 'example.com',
})
self._validate(msg)
def test_authorization_request(self):
from letsencrypt.client.acme import authorization_request
responses = [
{
"type": "challengeRequest",
"identifier": u'unicode',
'type': 'simpleHttps',
'path': 'Hf5GrX4Q7EBax9hc2jJnfw',
},
None, # null
{
'type': 'recoveryToken',
'token': '23029d88d9e123e',
}
]
msg = authorization_request(
'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'example.com',
'czpsrF0KMH6dgajig3TGHw',
responses,
self.privkey,
self.nonce,
)
self.assertEqual(msg, {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': 'czpsrF0KMH6dgajig3TGHw',
'signature': self._signature(
'VkpReso87ogwGul2MGck96TkYs4QoblIgNthgrm9O7EBGlzCRCnTHnx'
'bj6loqaC4f5bn1rgS927Gp1Kvbqnmqg'),
'responses': responses,
})
self._validate(msg)
def test_certificate_request(self):
from letsencrypt.client.acme import certificate_request
msg = certificate_request(
'TODO: real DER CSR?', self.privkey, self.nonce)
self.assertEqual(msg, {
'type': 'certificateRequest',
'csr': 'VE9ETzogcmVhbCBERVIgQ1NSPw',
'signature': self._signature(
'HEQVN4MU1yDrArP2T7WZQ12XlHCn5DgTPgb5eWT5_vjRPppLSNe6uWE'
'x9SFwG9d9umqn49nZCSW7uskA2lcW6Q'),
})
self._validate(msg)
def test_revocation_request(self):
from letsencrypt.client.acme import revocation_request
msg = revocation_request(
'TODO: real DER cert?', self.privkey, self.nonce)
self.assertEqual(msg, {
'type': 'revocationRequest',
'certificate': 'VE9ETzogcmVhbCBERVIgY2VydD8',
'signature': self._signature(
'ABXA1IsyTalTXIojxmGnIUGyZASmvqEvTQ98jJ5KFs2FTswLEmsoqFX'
'fU6l5_fous-tsbXOfLN-7PjfZ5XWPvg'),
})
self._validate(msg)
def test_status_request(self):
from letsencrypt.client.acme import status_request
msg = status_request(u'O7-s9MNq1siZHlgrMzi9_A')
self.assertEqual(msg, {
'type': 'statusRequest',
'token': u'O7-s9MNq1siZHlgrMzi9_A',
})
self._validate(msg)
if __name__ == '__main__':
unittest.main()

View File

@@ -209,7 +209,7 @@ class Client(object):
cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der()
revocation = self.send_and_receive_expected(
acme.revocation_request(cert["backup_key_file"], cert_der),
acme.revocation_request(cert_der, cert["backup_key_file"]),
"revocation")
display.generic_notification(