1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Add tests and fix minor bugs in Order support

* delint

* refactor client tests

* Add test for new order and fix identifiers parsing.

* Add poll_and_finalize test

* Test and fix poll_authorizations timeout

* Add test_failed_authorizations

* Add test_poll_authorizations_success

* Test and fix finalize_order success

* add test_finalize_order_timeout

* add test_finalize_order_error

* test sleep code
This commit is contained in:
Brad Warren
2018-02-14 19:40:14 -08:00
parent e48898a8c8
commit 70a75ebe9d
4 changed files with 180 additions and 49 deletions

View File

@@ -586,12 +586,23 @@ class ClientV2(ClientBase):
authorizations.append(self._authzr_from_response(self.net.get(url)))
return messages.OrderResource(
body=body,
uri=response.headers.get('Location', uri),
fullchain_pem=fullchain_pem,
uri=response.headers.get('Location'),
authorizations=authorizations,
csr_pem=csr_pem)
def poll_and_finalize(self, orderr, deadline=None):
"""Poll authorizations and finalize the order.
If no deadline is provided, this method will timeout after 90
seconds.
:param messages.OrderResource orderr: order to finalize
:param datetime.datetime deadline: when to stop polling and timeout
:returns: finalized order
:rtype: messages.OrderResource
"""
if deadline is None:
deadline = datetime.datetime.now() + datetime.timedelta(seconds=90)
orderr = self.poll_authorizations(orderr, deadline)
@@ -609,8 +620,8 @@ class ClientV2(ClientBase):
time.sleep(1)
# If we didn't get a response for every authorization, we fell through
# the bottom of the loop due to hitting the deadline.
if len(responses) > orderr.body.authorizations:
raise TimeoutError()
if len(responses) < len(orderr.body.authorizations):
raise errors.TimeoutError()
failed = []
for authzr in responses:
if authzr.body.status != messages.STATUS_VALID:
@@ -618,24 +629,33 @@ class ClientV2(ClientBase):
if chall.error != None:
failed.append(authzr)
if len(failed) > 0:
raise ValidationError(failed)
raise errors.ValidationError(failed)
return orderr.update(authorizations=responses)
def finalize_order(self, orderr, deadline):
"""Finalize an order and obtain a certificate.
:param messages.OrderResource orderr: order to finalize
:param datetime.datetime deadline: when to stop polling and timeout
:returns: finalized order
:rtype: messages.OrderResource
"""
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, orderr.csr_pem)
wrapped_csr = messages.CertificateRequest(csr=jose.ComparableX509(csr))
self.net.post(latest.body.finalize, wrapped_csr)
self.net.post(orderr.body.finalize, wrapped_csr)
while datetime.datetime.now() < deadline:
time.sleep(1)
response = self.net.get(orderr.uri)
body = messages.Order.from_json(response.json())
if body.error is not None:
raise IssuanceError(body.error)
raise errors.IssuanceError(body.error)
if body.certificate is not None:
certificate_response = self.net.get(body.certificate).text
return orderr.update(fullchain_pem=certificate_response)
raise TimeoutError()
return orderr.update(body=body, fullchain_pem=certificate_response)
raise errors.TimeoutError()
class BackwardsCompatibleClientV2(object):

View File

@@ -1,4 +1,5 @@
"""Tests for acme.client."""
import copy
import datetime
import json
import unittest
@@ -18,6 +19,8 @@ from acme import test_util
CERT_DER = test_util.load_vector('cert.der')
CERT_SAN_PEM = test_util.load_vector('cert-san.pem')
CSR_SAN_PEM = test_util.load_vector('csr-san.pem')
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
KEY2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
@@ -34,7 +37,8 @@ DIRECTORY_V1 = messages.Directory({
DIRECTORY_V2 = messages.Directory({
'newAccount': 'https://www.letsencrypt-demo.org/acme/new-account',
'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce'
'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce',
'newOrder': 'https://www.letsencrypt-demo.org/acme/new-order',
})
@@ -57,8 +61,7 @@ class ClientTestBase(unittest.TestCase):
contact=self.contact, key=KEY.public_key())
self.new_reg = messages.NewRegistration(**dict(reg))
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
terms_of_service='https://www.letsencrypt-demo.org/tos')
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1')
# Authorization
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
@@ -75,15 +78,6 @@ class ClientTestBase(unittest.TestCase):
self.authzr = messages.AuthorizationResource(
body=self.authz, uri=authzr_uri)
# Request issuance
self.certr = messages.CertificateResource(
body=messages_test.CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
# Reason code for revocation
self.rsn = 1
class BackwardsCompatibleClientV2Test(ClientTestBase):
"""Tests for acme.client.BackwardsCompatibleClientV2."""
@@ -168,37 +162,29 @@ class BackwardsCompatibleClientV2Test(ClientTestBase):
mock_client().agree_to_tos.assert_not_called()
class ClientV2Test(ClientTestBase):
"""Tests for acme.client.ClientV2."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
def setUp(self):
super(ClientV2Test, self).setUp()
from acme.client import ClientV2
self.directory = DIRECTORY_V2
self.client = ClientV2(directory=self.directory, net=self.net)
def test_new_account_v2(self):
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.regr = messages.RegistrationResource(
body=messages.Registration(
contact=self.contact, key=KEY.public_key()),
uri='https://www.letsencrypt-demo.org/acme/reg/1')
self.assertEqual(self.regr, self.client.new_account(self.regr))
class ClientTest(ClientTestBase):
"""Tests for acme.client.Client."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
def setUp(self):
super(ClientTest, self).setUp()
from acme.client import Client
self.directory = DIRECTORY_V1
# Registration
self.regr = self.regr.update(
terms_of_service='https://www.letsencrypt-demo.org/tos')
# Request issuance
self.certr = messages.CertificateResource(
body=messages_test.CERT, authzrs=(self.authzr,),
uri='https://www.letsencrypt-demo.org/acme/cert/1',
cert_chain_uri='https://www.letsencrypt-demo.org/ca')
# Reason code for revocation
self.rsn = 1
from acme.client import Client
self.client = Client(
directory=self.directory, key=KEY, alg=jose.RS256, net=self.net)
@@ -554,6 +540,129 @@ class ClientTest(ClientTestBase):
self.certr,
self.rsn)
class ClientV2Test(ClientTestBase):
"""Tests for acme.client.ClientV2."""
def setUp(self):
super(ClientV2Test, self).setUp()
self.directory = DIRECTORY_V2
from acme.client import ClientV2
self.client = ClientV2(self.directory, self.net)
self.new_reg = self.new_reg.update(terms_of_service_agreed=True)
self.authzr_uri2 = 'https://www.letsencrypt-demo.org/acme/authz/2'
self.authz2 = self.authz.update(identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='www.example.com'),
status=messages.STATUS_PENDING)
self.authzr2 = messages.AuthorizationResource(
body=self.authz2, uri=self.authzr_uri2)
self.order = messages.Order(
identifiers=(self.authz.identifier, self.authz2.identifier),
status=messages.STATUS_PENDING,
authorizations=(self.authzr.uri, self.authzr_uri2),
finalize='https://www.letsencrypt-demo.org/acme/acct/1/order/1/finalize')
self.orderr = messages.OrderResource(
body=self.order,
uri='https://www.letsencrypt-demo.org/acme/acct/1/order/1',
authorizations=[self.authzr, self.authzr2], csr_pem=CSR_SAN_PEM)
def test_new_account(self):
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.assertEqual(self.regr, self.client.new_account(self.new_reg))
def test_new_order(self):
order_response = copy.deepcopy(self.response)
order_response.status_code = http_client.CREATED
order_response.json.return_value = self.order.to_json()
order_response.headers['Location'] = self.orderr.uri
self.net.post.return_value = order_response
authz_response = copy.deepcopy(self.response)
authz_response.json.return_value = self.authz.to_json()
authz_response.headers['Location'] = self.authzr.uri
authz_response2 = self.response
authz_response2.json.return_value = self.authz2.to_json()
authz_response2.headers['Location'] = self.authzr2.uri
self.net.get.side_effect = (authz_response, authz_response2)
self.assertEqual(self.client.new_order(CSR_SAN_PEM), self.orderr)
@mock.patch('acme.client.datetime')
def test_poll_and_finalize(self, mock_datetime):
mock_datetime.datetime.now.return_value = datetime.datetime(2018, 2, 15)
mock_datetime.timedelta = datetime.timedelta
expected_deadline = mock_datetime.datetime.now() + datetime.timedelta(seconds=90)
self.client.poll_authorizations = mock.Mock(return_value=self.orderr)
self.client.finalize_order = mock.Mock(return_value=self.orderr)
self.assertEqual(self.client.poll_and_finalize(self.orderr), self.orderr)
self.client.poll_authorizations.assert_called_once_with(self.orderr, expected_deadline)
self.client.finalize_order.assert_called_once_with(self.orderr, expected_deadline)
@mock.patch('acme.client.datetime')
def test_poll_authorizations_timeout(self, mock_datetime):
now_side_effect = [datetime.datetime(2018, 2, 15),
datetime.datetime(2018, 2, 16),
datetime.datetime(2018, 2, 17)]
mock_datetime.datetime.now.side_effect = now_side_effect
self.response.json.side_effect = [
self.authz.to_json(), self.authz2.to_json(), self.authz2.to_json()]
self.assertRaises(
errors.TimeoutError, self.client.poll_authorizations, self.orderr, now_side_effect[1])
def test_poll_authorizations_failure(self):
deadline = datetime.datetime(9999, 9, 9)
challb = self.challr.body.update(status=messages.STATUS_INVALID,
error=messages.Error.with_code('unauthorized'))
authz = self.authz.update(status=messages.STATUS_INVALID, challenges=(challb,))
self.response.json.return_value = authz.to_json()
self.assertRaises(
errors.ValidationError, self.client.poll_authorizations, self.orderr, deadline)
def test_poll_authorizations_success(self):
deadline = datetime.datetime(9999, 9, 9)
updated_authz2 = self.authz2.update(status=messages.STATUS_VALID)
updated_authzr2 = messages.AuthorizationResource(
body=updated_authz2, uri=self.authzr_uri2)
updated_orderr = self.orderr.update(authorizations=[self.authzr, updated_authzr2])
self.response.json.side_effect = (
self.authz.to_json(), self.authz2.to_json(), updated_authz2.to_json())
self.assertEqual(self.client.poll_authorizations(self.orderr, deadline), updated_orderr)
def test_finalize_order_success(self):
updated_order = self.order.update(
certificate='https://www.letsencrypt-demo.org/acme/cert/')
updated_orderr = self.orderr.update(body=updated_order, fullchain_pem=CERT_SAN_PEM)
self.response.json.return_value = updated_order.to_json()
self.response.text = CERT_SAN_PEM
deadline = datetime.datetime(9999, 9, 9)
self.assertEqual(self.client.finalize_order(self.orderr, deadline), updated_orderr)
def test_finalize_order_error(self):
updated_order = self.order.update(error=messages.Error.with_code('unauthorized'))
self.response.json.return_value = updated_order.to_json()
deadline = datetime.datetime(9999, 9, 9)
self.assertRaises(errors.IssuanceError, self.client.finalize_order, self.orderr, deadline)
def test_finalize_order_timeout(self):
deadline = datetime.datetime.now() - datetime.timedelta(seconds=60)
self.assertRaises(errors.TimeoutError, self.client.finalize_order, self.orderr, deadline)
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):

View File

@@ -89,7 +89,7 @@ class ValidationError(Error):
"""
def __init__(self, failed_authzrs):
self.failed_authzrs = failed_authzrs
super(ClientError, self).__init__()
super(ValidationError, self).__init__()
class TimeoutError(Error):
"""Error for when polling an authorization or an order times out."""
@@ -103,6 +103,7 @@ class IssuanceError(Error):
:param messages.Error error: The error provided by the server.
"""
self.error = error
super(IssuanceError, self).__init__()
class ConflictError(ClientError):
"""Error for when the server returns a 409 (Conflict) HTTP status.

View File

@@ -509,9 +509,6 @@ class Revocation(jose.JSONObjectWithFields):
class Order(ResourceBody):
"""Order Resource Body.
.. note:: Parsing of identifiers on response doesn't work right now; to make
it work we would need to set up the equivalent of Identifier.from_json, but
for a list.
:ivar list of .Identifier: List of identifiers for the certificate.
:ivar acme.messages.Status status:
:ivar list of str authorizations: URLs of authorizations.
@@ -530,6 +527,10 @@ class Order(ResourceBody):
expires = fields.RFC3339Field('expires', omitempty=True)
error = jose.Field('error', omitempty=True, decoder=Error.from_json)
@identifiers.decoder
def identifiers(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(Identifier.from_json(identifier) for identifier in value)
class OrderResource(ResourceWithURI):
"""Order Resource.