1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-23 07:20:55 +03:00

Merge pull request #529 from kuba/acme-resource-json

JSONDeSerializable acme.messages.Resource.
This commit is contained in:
James Kasten
2015-06-22 18:15:12 -04:00
4 changed files with 197 additions and 54 deletions

View File

@@ -466,6 +466,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
updated_authzr, response = self.poll(updated[authzr])
updated[authzr] = updated_authzr
# pylint: disable=no-member
if updated_authzr.body.status != messages.STATUS_VALID:
# push back to the priority queue, with updated retry_after
heapq.heappush(waiting, (self.retry_after(

View File

@@ -5,7 +5,6 @@ import os
import pkg_resources
import unittest
import M2Crypto
import mock
import requests
@@ -14,16 +13,9 @@ from acme import errors
from acme import jose
from acme import jws as acme_jws
from acme import messages
from acme import messages_test
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'cert.der')),
M2Crypto.X509.FORMAT_DER))
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')),
M2Crypto.X509.FORMAT_DER))
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
@@ -82,7 +74,7 @@ class ClientTest(unittest.TestCase):
# Request issuance
self.certr = messages.CertificateResource(
body=CERT, authzrs=(self.authzr,),
body=messages_test.CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
@@ -380,27 +372,27 @@ class ClientTest(unittest.TestCase):
self.assertRaises(errors.UnexpectedUpdate, self.net.poll, self.authzr)
def test_request_issuance(self):
self.response.content = CERT.as_der()
self.response.content = messages_test.CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
self._mock_post_get()
self.assertEqual(
self.certr, self.net.request_issuance(CSR, (self.authzr,)))
self.assertEqual(self.certr, self.net.request_issuance(
messages_test.CSR, (self.authzr,)))
# TODO: check POST args
def test_request_issuance_missing_up(self):
self.response.content = CERT.as_der()
self.response.content = messages_test.CERT.as_der()
self.response.headers['Location'] = self.certr.uri
self._mock_post_get()
self.assertEqual(
self.certr.update(cert_chain_uri=None),
self.net.request_issuance(CSR, (self.authzr,)))
self.net.request_issuance(messages_test.CSR, (self.authzr,)))
def test_request_issuance_missing_location(self):
self._mock_post_get()
self.assertRaises(
errors.ClientError, self.net.request_issuance,
CSR, (self.authzr,))
messages_test.CSR, (self.authzr,))
@mock.patch('acme.client.datetime')
@mock.patch('acme.client.time')
@@ -484,10 +476,10 @@ class ClientTest(unittest.TestCase):
def test_check_cert(self):
self.response.headers['Location'] = self.certr.uri
self.response.content = CERT.as_der()
self.response.content = messages_test.CERT.as_der()
self._mock_post_get()
self.assertEqual(
self.certr.update(body=CERT), self.net.check_cert(self.certr))
self.assertEqual(self.certr.update(body=messages_test.CERT),
self.net.check_cert(self.certr))
# TODO: split here and separate test
self.response.headers['Location'] = 'foo'
@@ -495,7 +487,7 @@ class ClientTest(unittest.TestCase):
errors.UnexpectedUpdate, self.net.check_cert, self.certr)
def test_check_cert_missing_location(self):
self.response.content = CERT.as_der()
self.response.content = messages_test.CERT.as_der()
self._mock_post_get()
self.assertRaises(errors.ClientError, self.net.check_cert, self.certr)

View File

@@ -53,6 +53,7 @@ class Error(jose.JSONObjectWithFields, Exception):
else:
return str(self.detail)
class _Constant(jose.JSONDeSerializable):
"""ACME constant."""
__slots__ = ('name',)
@@ -109,31 +110,29 @@ class Identifier(jose.JSONObjectWithFields):
value = jose.Field('value')
class Resource(jose.ImmutableMap):
class Resource(jose.JSONObjectWithFields):
"""ACME Resource.
:ivar str uri: Location of the resource.
:ivar acme.messages.ResourceBody body: Resource body.
"""
body = jose.Field('body')
class ResourceWithURI(Resource):
"""ACME Resource with URI.
:ivar str uri: Location of the resource.
"""
__slots__ = ('body', 'uri')
uri = jose.Field('uri') # no ChallengeResource.uri
class ResourceBody(jose.JSONObjectWithFields):
"""ACME Resource Body."""
class RegistrationResource(Resource):
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar str new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar str terms_of_service: URL for the CA TOS.
"""
__slots__ = ('body', 'uri', 'new_authzr_uri', 'terms_of_service')
class Registration(ResourceBody):
"""Registration Resource Body.
@@ -148,21 +147,59 @@ class Registration(ResourceBody):
recovery_token = jose.Field('recoveryToken', omitempty=True)
agreement = jose.Field('agreement', omitempty=True)
phone_prefix = 'tel:'
email_prefix = 'mailto:'
class ChallengeResource(Resource, jose.JSONObjectWithFields):
"""Challenge Resource.
@classmethod
def from_data(cls, phone=None, email=None, **kwargs):
"""Create registration resource from contact details."""
details = list(kwargs.pop('contact', ()))
if phone is not None:
details.append(cls.phone_prefix + phone)
if email is not None:
details.append(cls.email_prefix + email)
kwargs['contact'] = tuple(details)
return cls(**kwargs)
:ivar acme.messages.ChallengeBody body:
:ivar str authzr_uri: URI found in the 'up' ``Link`` header.
"""
__slots__ = ('body', 'authzr_uri')
def _filter_contact(self, prefix):
return tuple(
detail[len(prefix):] for detail in self.contact
if detail.startswith(prefix))
@property
def uri(self): # pylint: disable=missing-docstring,no-self-argument
# bug? 'method already defined line None'
# pylint: disable=function-redefined
return self.body.uri
def phones(self):
"""All phones found in the ``contact`` field."""
return self._filter_contact(self.phone_prefix)
@property
def emails(self):
"""All emails found in the ``contact`` field."""
return self._filter_contact(self.email_prefix)
@property
def phone(self):
"""Phone."""
assert len(self.phones) == 1
return self.phones[0]
@property
def email(self):
"""Email."""
assert len(self.emails) == 1
return self.emails[0]
class RegistrationResource(ResourceWithURI):
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar str new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar str terms_of_service: URL for the CA TOS.
"""
body = jose.Field('body', decoder=Registration.from_json)
new_authzr_uri = jose.Field('new_authzr_uri')
terms_of_service = jose.Field('terms_of_service', omitempty=True)
class ChallengeBody(ResourceBody):
@@ -201,14 +238,21 @@ class ChallengeBody(ResourceBody):
return getattr(self.chall, name)
class AuthorizationResource(Resource):
"""Authorization Resource.
class ChallengeResource(Resource):
"""Challenge Resource.
:ivar acme.messages.Authorization body:
:ivar str new_cert_uri: URI found in the 'next' ``Link`` header
:ivar acme.messages.ChallengeBody body:
:ivar str authzr_uri: URI found in the 'up' ``Link`` header.
"""
__slots__ = ('body', 'uri', 'new_cert_uri')
body = jose.Field('body', decoder=ChallengeBody.from_json)
authzr_uri = jose.Field('authzr_uri')
@property
def uri(self): # pylint: disable=missing-docstring,no-self-argument
# bug? 'method already defined line None'
# pylint: disable=function-redefined
return self.body.uri # pylint: disable=no-member
class Authorization(ResourceBody):
@@ -246,6 +290,17 @@ class Authorization(ResourceBody):
for combo in self.combinations)
class AuthorizationResource(ResourceWithURI):
"""Authorization Resource.
:ivar acme.messages.Authorization body:
:ivar str new_cert_uri: URI found in the 'next' ``Link`` header
"""
body = jose.Field('body', decoder=Authorization.from_json)
new_cert_uri = jose.Field('new_cert_uri')
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME new-cert request.
@@ -258,7 +313,7 @@ class CertificateRequest(jose.JSONObjectWithFields):
authorizations = jose.Field('authorizations', decoder=tuple)
class CertificateResource(Resource):
class CertificateResource(ResourceWithURI):
"""Certificate Resource.
:ivar acme.jose.util.ComparableX509 body:
@@ -267,7 +322,8 @@ class CertificateResource(Resource):
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
"""
__slots__ = ('body', 'uri', 'cert_chain_uri', 'authzrs')
cert_chain_uri = jose.Field('cert_chain_uri')
authzrs = jose.Field('authzrs')
class Revocation(jose.JSONObjectWithFields):

View File

@@ -3,14 +3,22 @@ import os
import pkg_resources
import unittest
import M2Crypto.X509
import mock
from Crypto.PublicKey import RSA
import M2Crypto
import mock
from acme import challenges
from acme import jose
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'cert.der')),
M2Crypto.X509.FORMAT_DER))
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')),
M2Crypto.X509.FORMAT_DER))
KEY = jose.util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
CERT = jose.ComparableX509(M2Crypto.X509.load_cert(
@@ -96,12 +104,16 @@ class ConstantTest(unittest.TestCase):
self.assertTrue(self.const_a != self.const_b)
self.assertFalse(self.const_a != const_a_prime)
class RegistrationTest(unittest.TestCase):
"""Tests for acme.messages.Registration."""
def setUp(self):
key = jose.jwk.JWKRSA(key=KEY.publickey())
contact = ('mailto:letsencrypt-client@letsencrypt.org',)
contact = (
'mailto:admin@foo.com',
'tel:1234',
)
recovery_token = 'XYZ'
agreement = 'https://letsencrypt.org/terms'
@@ -119,6 +131,26 @@ class RegistrationTest(unittest.TestCase):
self.jobj_from = self.jobj_to.copy()
self.jobj_from['key'] = key.to_json()
def test_from_data(self):
from acme.messages import Registration
reg = Registration.from_data(phone='1234', email='admin@foo.com')
self.assertEqual(reg.contact, (
'tel:1234',
'mailto:admin@foo.com',
))
def test_phones(self):
self.assertEqual(('1234',), self.reg.phones)
def test_emails(self):
self.assertEqual(('admin@foo.com',), self.reg.emails)
def test_phone(self):
self.assertEqual('1234', self.reg.phone)
def test_email(self):
self.assertEqual('admin@foo.com', self.reg.email)
def test_to_partial_json(self):
self.assertEqual(self.jobj_to, self.reg.to_partial_json())
@@ -131,6 +163,25 @@ class RegistrationTest(unittest.TestCase):
hash(Registration.from_json(self.jobj_from))
class RegistrationResourceTest(unittest.TestCase):
"""Tests for acme.messages.RegistrationResource."""
def setUp(self):
from acme.messages import RegistrationResource
self.regr = RegistrationResource(
body=mock.sentinel.body, uri=mock.sentinel.uri,
new_authzr_uri=mock.sentinel.new_authzr_uri,
terms_of_service=mock.sentinel.terms_of_service)
def test_to_partial_json(self):
self.assertEqual(self.regr.to_json(), {
'body': mock.sentinel.body,
'uri': mock.sentinel.uri,
'new_authzr_uri': mock.sentinel.new_authzr_uri,
'terms_of_service': mock.sentinel.terms_of_service,
})
class ChallengeResourceTest(unittest.TestCase):
"""Tests for acme.messages.ChallengeResource."""
@@ -222,6 +273,49 @@ class AuthorizationTest(unittest.TestCase):
))
class AuthorizationResourceTest(unittest.TestCase):
"""Tests for acme.messages.AuthorizationResource."""
def test_json_de_serializable(self):
from acme.messages import AuthorizationResource
authzr = AuthorizationResource(
uri=mock.sentinel.uri,
body=mock.sentinel.body,
new_cert_uri=mock.sentinel.new_cert_uri,
)
self.assertTrue(isinstance(authzr, jose.JSONDeSerializable))
class CertificateRequestTest(unittest.TestCase):
"""Tests for acme.messages.CertificateRequest."""
def setUp(self):
from acme.messages import CertificateRequest
self.req = CertificateRequest(csr=CSR, authorizations=('foo',))
def test_json_de_serializable(self):
self.assertTrue(isinstance(self.req, jose.JSONDeSerializable))
from acme.messages import CertificateRequest
self.assertEqual(
self.req, CertificateRequest.from_json(self.req.to_json()))
class CertificateResourceTest(unittest.TestCase):
"""Tests for acme.messages.CertificateResourceTest."""
def setUp(self):
from acme.messages import CertificateResource
self.certr = CertificateResource(
body=CERT, uri=mock.sentinel.uri, authzrs=(),
cert_chain_uri=mock.sentinel.cert_chain_uri)
def test_json_de_serializable(self):
self.assertTrue(isinstance(self.certr, jose.JSONDeSerializable))
from acme.messages import CertificateResource
self.assertEqual(
self.certr, CertificateResource.from_json(self.certr.to_json()))
class RevocationTest(unittest.TestCase):
"""Tests for acme.messages.RevocationTest."""