1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

Merge pull request #605 from kuba/py3

Python 3 support in acme.
This commit is contained in:
James Kasten
2015-07-13 11:39:17 -07:00
31 changed files with 534 additions and 337 deletions

View File

@@ -1,11 +1,12 @@
# this file uses slightly different syntax than .gitignore,
# e.g. "tox.cover/" will not ignore tox.cover directory
# e.g. ".tox/" will not ignore .tox directory
# well, official docker build should be done on clean git checkout
# anyway, so .tox should be empty... But I'm sure people will try to
# test docker on their git working directories.
.git
tox.cover
.tox
venv
venv3
docs

2
.gitignore vendored
View File

@@ -4,7 +4,7 @@
build/
dist/
/venv/
/tox.venv/
/venv3/
letsencrypt.log
# coverage

View File

@@ -240,7 +240,9 @@ ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis
ignored-modules=pkg_resources,confargparse,argparse
ignored-modules=pkg_resources,confargparse,argparse,six.moves,six.moves.urllib
# import errors ignored only in 1.4.4
# https://bitbucket.org/logilab/pylint/commits/cd000904c9e2
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).

View File

@@ -12,15 +12,16 @@ env:
global:
- GOPATH=/tmp/go
matrix:
- TOXENV=py26
- TOXENV=py27
- TOXENV=py26 BOULDER_INTEGRATION=1
- TOXENV=py27 BOULDER_INTEGRATION=1
- TOXENV=py33
- TOXENV=py34
- TOXENV=lint
- TOXENV=cover
install: "travis_retry pip install tox coveralls"
before_script: '[ "${TOXENV:0:2}" != "py" ] || ./tests/boulder-start.sh'
# TODO: eliminate substring slice bashism
script: 'travis_retry tox && ([ "${TOXENV:0:2}" != "py" ] || (source tox.venv/bin/activate && ./tests/boulder-integration.sh))'
before_script: '[ "xxx$BOULDER_INTEGRATION" = "xxx" ] || ./tests/boulder-start.sh'
script: 'travis_retry tox && ([ "xxx$BOULDER_INTEGRATION" = "xxx" ] || (source .tox/$TOXENV/bin/activate && ./tests/boulder-integration.sh))'
after_success: '[ "$TOXENV" == "cover" ] && coveralls'

View File

@@ -35,12 +35,7 @@ class DVChallenge(Challenge): # pylint: disable=abstract-method
class ChallengeResponse(interfaces.ClientRequestableResource,
jose.TypedJSONObjectWithFields):
# _fields_to_partial_json | pylint: disable=abstract-method
"""ACME challenge response.
:ivar str mitm_resource: ACME resource identifier used in client
HTTPS requests in order to protect against MITM.
"""
"""ACME challenge response."""
TYPES = {}
resource_type = 'challenge'
@@ -56,14 +51,23 @@ class ChallengeResponse(interfaces.ClientRequestableResource,
@Challenge.register
class SimpleHTTP(DVChallenge):
"""ACME "simpleHttp" challenge."""
"""ACME "simpleHttp" challenge.
:ivar unicode token:
"""
typ = "simpleHttp"
token = jose.Field("token")
@ChallengeResponse.register
class SimpleHTTPResponse(ChallengeResponse):
"""ACME "simpleHttp" challenge response."""
"""ACME "simpleHttp" challenge response.
:ivar unicode path:
:ivar unicode tls:
"""
typ = "simpleHttp"
path = jose.Field("path")
tls = jose.Field("tls", default=True, omitempty=True)
@@ -107,7 +111,7 @@ class SimpleHTTPResponse(ChallengeResponse):
Forms an URI to the HTTPS server provisioned resource
(containing :attr:`~SimpleHTTP.token`).
:param str domain: Domain name being verified.
:param unicode domain: Domain name being verified.
"""
return self._URI_TEMPLATE.format(
@@ -121,7 +125,7 @@ class SimpleHTTPResponse(ChallengeResponse):
``requests.get`` is called with ``verify=False``.
:param .SimpleHTTP chall: Corresponding challenge.
:param str domain: Domain name being verified.
:param unicode domain: Domain name being verified.
:param int port: Port used in the validation.
:returns: ``True`` iff validation is successful, ``False``
@@ -163,13 +167,13 @@ class SimpleHTTPResponse(ChallengeResponse):
class DVSNI(DVChallenge):
"""ACME "dvsni" challenge.
:ivar str r: Random data, **not** base64-encoded.
:ivar str nonce: Random data, **not** hex-encoded.
:ivar bytes r: Random data, **not** base64-encoded.
:ivar bytes nonce: Random data, **not** hex-encoded.
"""
typ = "dvsni"
DOMAIN_SUFFIX = ".acme.invalid"
DOMAIN_SUFFIX = b".acme.invalid"
"""Domain name suffix."""
R_SIZE = 32
@@ -181,15 +185,19 @@ class DVSNI(DVChallenge):
PORT = 443
"""Port to perform DVSNI challenge."""
r = jose.Field("r", encoder=jose.b64encode, # pylint: disable=invalid-name
r = jose.Field("r", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
decoder=functools.partial(jose.decode_b64jose, size=R_SIZE))
nonce = jose.Field("nonce", encoder=binascii.hexlify,
nonce = jose.Field("nonce", encoder=jose.encode_hex16,
decoder=functools.partial(functools.partial(
jose.decode_hex16, size=NONCE_SIZE)))
@property
def nonce_domain(self):
"""Domain name used in SNI."""
"""Domain name used in SNI.
:rtype: bytes
"""
return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX
@@ -197,7 +205,7 @@ class DVSNI(DVChallenge):
class DVSNIResponse(ChallengeResponse):
"""ACME "dvsni" challenge response.
:param str s: Random data, **not** base64-encoded.
:param bytes s: Random data, **not** base64-encoded.
"""
typ = "dvsni"
@@ -208,7 +216,7 @@ class DVSNIResponse(ChallengeResponse):
S_SIZE = 32
"""Required size of the :attr:`s` in bytes."""
s = jose.Field("s", encoder=jose.b64encode, # pylint: disable=invalid-name
s = jose.Field("s", encoder=jose.encode_b64jose, # pylint: disable=invalid-name
decoder=functools.partial(jose.decode_b64jose, size=S_SIZE))
def __init__(self, s=None, *args, **kwargs):
@@ -221,11 +229,13 @@ class DVSNIResponse(ChallengeResponse):
:param challenge: Corresponding challenge.
:type challenge: :class:`DVSNI`
:rtype: bytes
"""
z = hashlib.new("sha256") # pylint: disable=invalid-name
z.update(chall.r)
z.update(self.s)
return z.hexdigest()
return z.hexdigest().encode()
def z_domain(self, chall):
"""Domain name for certificate subjectAltName."""
@@ -233,7 +243,13 @@ class DVSNIResponse(ChallengeResponse):
@Challenge.register
class RecoveryContact(ContinuityChallenge):
"""ACME "recoveryContact" challenge."""
"""ACME "recoveryContact" challenge.
:ivar unicode activation_url:
:ivar unicode success_url:
:ivar unicode contact:
"""
typ = "recoveryContact"
activation_url = jose.Field("activationURL", omitempty=True)
@@ -243,7 +259,11 @@ class RecoveryContact(ContinuityChallenge):
@ChallengeResponse.register
class RecoveryContactResponse(ChallengeResponse):
"""ACME "recoveryContact" challenge response."""
"""ACME "recoveryContact" challenge response.
:ivar unicode token:
"""
typ = "recoveryContact"
token = jose.Field("token", omitempty=True)
@@ -256,7 +276,11 @@ class RecoveryToken(ContinuityChallenge):
@ChallengeResponse.register
class RecoveryTokenResponse(ChallengeResponse):
"""ACME "recoveryToken" challenge response."""
"""ACME "recoveryToken" challenge response.
:ivar unicode token:
"""
typ = "recoveryToken"
token = jose.Field("token", omitempty=True)
@@ -265,7 +289,8 @@ class RecoveryTokenResponse(ChallengeResponse):
class ProofOfPossession(ContinuityChallenge):
"""ACME "proofOfPossession" challenge.
:ivar str nonce: Random data, **not** base64-encoded.
:ivar .JWAAlgorithm alg:
:ivar bytes nonce: Random data, **not** base64-encoded.
:ivar hints: Various clues for the client (:class:`Hints`).
"""
@@ -277,8 +302,12 @@ class ProofOfPossession(ContinuityChallenge):
"""Hints for "proofOfPossession" challenge.
:ivar jwk: JSON Web Key (:class:`acme.jose.JWK`)
:ivar list certs: List of :class:`acme.jose.ComparableX509`
:ivar tuple cert_fingerprints: `tuple` of `unicode`
:ivar tuple certs: Sequence of :class:`acme.jose.ComparableX509`
certificates.
:ivar tuple subject_key_identifiers: `tuple` of `unicode`
:ivar tuple issuers: `tuple` of `unicode`
:ivar tuple authorized_for: `tuple` of `unicode`
"""
jwk = jose.Field("jwk", decoder=jose.JWK.from_json)
@@ -301,7 +330,7 @@ class ProofOfPossession(ContinuityChallenge):
alg = jose.Field("alg", decoder=jose.JWASignature.from_json)
nonce = jose.Field(
"nonce", encoder=jose.b64encode, decoder=functools.partial(
"nonce", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE))
hints = jose.Field("hints", decoder=Hints.from_json)
@@ -310,8 +339,8 @@ class ProofOfPossession(ContinuityChallenge):
class ProofOfPossessionResponse(ChallengeResponse):
"""ACME "proofOfPossession" challenge response.
:ivar str nonce: Random data, **not** base64-encoded.
:ivar signature: :class:`~acme.other.Signature` of this message.
:ivar bytes nonce: Random data, **not** base64-encoded.
:ivar acme.other.Signature signature: Sugnature of this message.
"""
typ = "proofOfPossession"
@@ -319,7 +348,7 @@ class ProofOfPossessionResponse(ChallengeResponse):
NONCE_SIZE = ProofOfPossession.NONCE_SIZE
nonce = jose.Field(
"nonce", encoder=jose.b64encode, decoder=functools.partial(
"nonce", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE))
signature = jose.Field("signature", decoder=other.Signature.from_json)
@@ -331,7 +360,11 @@ class ProofOfPossessionResponse(ChallengeResponse):
@Challenge.register
class DNS(DVChallenge):
"""ACME "dns" challenge."""
"""ACME "dns" challenge.
:ivar unicode token:
"""
typ = "dns"
token = jose.Field("token")

View File

@@ -4,7 +4,8 @@ import unittest
import mock
import OpenSSL
import requests
import urlparse
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error
from acme import jose
from acme import other
@@ -136,7 +137,7 @@ class SimpleHTTPResponseTest(unittest.TestCase):
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_port(self, mock_get):
self.resp_http.simple_verify(self.chall, "local", 4430)
self.assertEqual("local:4430", urlparse.urlparse(
self.assertEqual("local:4430", urllib_parse.urlparse(
mock_get.mock_calls[0][1][0]).netloc)
@@ -145,9 +146,9 @@ class DVSNITest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNI
self.msg = DVSNI(
r="O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce='\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.')
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=b'\xa8-_\xf8\xeft\r\x12\x88\x1fm<"w\xab.')
self.jmsg = {
'type': 'dvsni',
'r': 'Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI',
@@ -155,7 +156,7 @@ class DVSNITest(unittest.TestCase):
}
def test_nonce_domain(self):
self.assertEqual('a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid',
self.assertEqual(b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid',
self.msg.nonce_domain)
def test_to_partial_json(self):
@@ -187,8 +188,8 @@ class DVSNIResponseTest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNIResponse
self.msg = DVSNIResponse(
s='\xf5\xd6\xe3\xb2]\xe0L\x0bN\x9cKJ\x14I\xa1K\xa3#\xf9\xa8'
'\xcd\x8c7\x0e\x99\x19)\xdc\xb7\xf3\x9bw')
s=b'\xf5\xd6\xe3\xb2]\xe0L\x0bN\x9cKJ\x14I\xa1K\xa3#\xf9\xa8'
b'\xcd\x8c7\x0e\x99\x19)\xdc\xb7\xf3\x9bw')
self.jmsg = {
'type': 'dvsni',
's': '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c',
@@ -197,15 +198,14 @@ class DVSNIResponseTest(unittest.TestCase):
def test_z_and_domain(self):
from acme.challenges import DVSNI
challenge = DVSNI(
r="O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=long('439736375371401115242521957580409149254868992063'
'44333654741504362774620418661L'))
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=int('439736375371401115242521957580409149254868992063'
'44333654741504362774620418661'))
# pylint: disable=invalid-name
z = '38e612b0397cc2624a07d351d7ef50e46134c0213d9ed52f7d7c611acaeed41b'
z = b'38e612b0397cc2624a07d351d7ef50e46134c0213d9ed52f7d7c611acaeed41b'
self.assertEqual(z, self.msg.z(challenge))
self.assertEqual(
'{0}.acme.invalid'.format(z), self.msg.z_domain(challenge))
self.assertEqual(z + b'.acme.invalid', self.msg.z_domain(challenge))
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
@@ -362,7 +362,7 @@ class ProofOfPossessionHintsTest(unittest.TestCase):
self.jmsg_to = {
'jwk': jwk,
'certFingerprints': cert_fingerprints,
'certs': (jose.b64encode(OpenSSL.crypto.dump_certificate(
'certs': (jose.encode_b64jose(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, CERT)),),
'subjectKeyIdentifiers': subject_key_identifiers,
'serialNumbers': serial_numbers,
@@ -413,7 +413,7 @@ class ProofOfPossessionTest(unittest.TestCase):
issuers=(), authorized_for=())
self.msg = ProofOfPossession(
alg=jose.RS256, hints=hints,
nonce='xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ')
nonce=b'xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ')
self.jmsg_to = {
'type': 'proofOfPossession',
@@ -449,16 +449,16 @@ class ProofOfPossessionResponseTest(unittest.TestCase):
# mistake here...
signature = other.Signature(
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.public_key()),
sig='\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83'
'\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap'
'\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde'
'\x99\x08\xf0\x0e{',
nonce='\x99\xc7Q\xb3f2\xbc\xdci\xfe\xd6\x98k\xc67\xdf',
sig=b'\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83'
b'\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap'
b'\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde'
b'\x99\x08\xf0\x0e{',
nonce=b'\x99\xc7Q\xb3f2\xbc\xdci\xfe\xd6\x98k\xc67\xdf',
)
from acme.challenges import ProofOfPossessionResponse
self.msg = ProofOfPossessionResponse(
nonce='xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ',
nonce=b'xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ',
signature=signature)
self.jmsg_to = {

View File

@@ -1,13 +1,15 @@
"""ACME client API."""
import datetime
import heapq
import httplib
import json
import logging
import time
from six.moves import http_client # pylint: disable=import-error
import OpenSSL
import requests
import six
import werkzeug
from acme import errors
@@ -19,7 +21,8 @@ from acme import messages
logger = logging.getLogger(__name__)
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
if six.PY2:
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
class Client(object): # pylint: disable=too-many-instance-attributes
@@ -80,7 +83,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
new_reg = messages.Registration() if new_reg is None else new_reg
response = self.net.post(self.new_reg_uri, new_reg)
assert response.status_code == httplib.CREATED # TODO: handle errors
# TODO: handle errors
assert response.status_code == http_client.CREATED
# "Instance of 'Field' has no key/contact member" bug:
# pylint: disable=no-member
@@ -162,7 +166,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
"""
new_authz = messages.Authorization(identifier=identifier)
response = self.net.post(new_authzr_uri, new_authz)
assert response.status_code == httplib.CREATED # TODO: handle errors
# TODO: handle errors
assert response.status_code == http_client.CREATED
return self._authzr_from_response(response, identifier)
def request_domain_challenges(self, domain, new_authz_uri):
@@ -424,7 +429,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
"""
response = self.net.post(messages.Revocation.url(self.new_reg_uri),
messages.Revocation(certificate=cert))
if response.status_code != httplib.OK:
if response.status_code != http_client.OK:
raise errors.ClientError(
'Successful revocation must return HTTP OK status')
@@ -447,12 +452,13 @@ class ClientNetwork(object):
.. todo:: Implement ``acmePath``.
:param .ClientRequestableResource obj:
:param bytes nonce:
:rtype: `.JWS`
"""
jobj = obj.to_json()
jobj['resource'] = obj.resource_type
dumps = json.dumps(jobj)
dumps = json.dumps(jobj).encode()
logger.debug('Serialized JSON: %s', dumps)
return jws.JWS.sign(
payload=dumps, key=self.key, alg=self.alg, nonce=nonce).json_dumps()
@@ -555,12 +561,12 @@ class ClientNetwork(object):
def _add_nonce(self, response):
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
error = jws.Header.validate_nonce(nonce)
if error is None:
logger.debug('Storing nonce: %r', nonce)
self._nonces.add(nonce)
else:
try:
decoded_nonce = jws.Header._fields['nonce'].decode(nonce)
except jose.DeserializationError as error:
raise errors.BadNonce(nonce, error)
logger.debug('Storing nonce: %r', decoded_nonce)
self._nonces.add(decoded_nonce)
else:
raise errors.MissingNonce(response)

View File

@@ -1,9 +1,10 @@
"""Tests for acme.client."""
import datetime
import httplib
import json
import unittest
from six.moves import http_client # pylint: disable=import-error
import mock
import requests
@@ -27,7 +28,7 @@ class ClientTest(unittest.TestCase):
def setUp(self):
self.response = mock.MagicMock(
ok=True, status_code=httplib.OK, headers={}, links={})
ok=True, status_code=http_client.OK, headers={}, links={})
self.net = mock.MagicMock()
self.net.post.return_value = self.response
self.net.get.return_value = self.response
@@ -73,7 +74,7 @@ class ClientTest(unittest.TestCase):
def test_register(self):
# "Instance of 'Field' has no to_json/update member" bug:
# pylint: disable=no-member
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
@@ -91,7 +92,7 @@ class ClientTest(unittest.TestCase):
errors.UnexpectedUpdate, self.client.register, self.regr.body)
def test_register_missing_next(self):
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.assertRaises(
errors.ClientError, self.client.register, self.regr.body)
@@ -115,7 +116,7 @@ class ClientTest(unittest.TestCase):
self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def test_request_challenges(self):
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.response.headers['Location'] = self.authzr.uri
self.response.json.return_value = self.authz.to_json()
self.response.links = {
@@ -133,7 +134,7 @@ class ClientTest(unittest.TestCase):
self.identifier, self.authzr.uri)
def test_request_challenges_missing_next(self):
self.response.status_code = httplib.CREATED
self.response.status_code = http_client.CREATED
self.assertRaises(
errors.ClientError, self.client.request_challenges,
self.identifier, self.regr)
@@ -345,7 +346,7 @@ class ClientTest(unittest.TestCase):
self.client.new_reg_uri), mock.ANY)
def test_revoke_bad_status_raises_error(self):
self.response.status_code = httplib.METHOD_NOT_ALLOWED
self.response.status_code = http_client.METHOD_NOT_ALLOWED
self.assertRaises(errors.ClientError, self.client.revoke, self.certr)
@@ -360,7 +361,7 @@ class ClientNetworkTest(unittest.TestCase):
self.net = ClientNetwork(
key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl)
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
@@ -380,12 +381,11 @@ class ClientNetworkTest(unittest.TestCase):
pass # pragma: no cover
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockClientRequestableResource('foo'), nonce='Tg')
MockClientRequestableResource('foo'), nonce=b'Tg')
jws = acme_jws.JWS.json_loads(jws_dump)
self.assertEqual(json.loads(jws.payload),
self.assertEqual(json.loads(jws.payload.decode()),
{'foo': 'foo', 'resource': 'mock'})
self.assertEqual(jws.signature.combined.nonce, 'Tg')
# TODO: check that nonce is in protected header
self.assertEqual(jws.signature.combined.nonce, b'Tg')
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
@@ -473,7 +473,7 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
from acme.client import ClientNetwork
self.net = ClientNetwork(key=None, alg=None)
self.response = mock.MagicMock(ok=True, status_code=httplib.OK)
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
self.checked_response = mock.MagicMock()
@@ -481,13 +481,14 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.wrapped_obj = mock.MagicMock()
self.content_type = mock.sentinel.content_type
self.all_nonces = [jose.b64encode('Nonce'), jose.b64encode('Nonce2')]
self.all_nonces = [jose.b64encode(b'Nonce'), jose.b64encode(b'Nonce2')]
self.available_nonces = self.all_nonces[:]
def send_request(*args, **kwargs):
# pylint: disable=unused-argument,missing-docstring
if self.available_nonces:
self.response.headers = {
self.net.REPLAY_NONCE_HEADER: self.available_nonces.pop()}
self.net.REPLAY_NONCE_HEADER:
self.available_nonces.pop().decode()}
else:
self.response.headers = {}
return self.response
@@ -519,21 +520,21 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.assertEqual(self.checked_response, self.net.post(
'uri', self.obj, content_type=self.content_type))
self.net._wrap_in_jws.assert_called_once_with(
self.obj, self.all_nonces.pop())
self.obj, jose.b64decode(self.all_nonces.pop()))
assert not self.available_nonces
self.assertRaises(errors.MissingNonce, self.net.post,
'uri', self.obj, content_type=self.content_type)
self.net._wrap_in_jws.assert_called_with(
self.obj, self.all_nonces.pop())
self.obj, jose.b64decode(self.all_nonces.pop()))
def test_post_wrong_initial_nonce(self): # HEAD
self.available_nonces = ['f', jose.b64encode('good')]
self.available_nonces = [b'f', jose.b64encode(b'good')]
self.assertRaises(errors.BadNonce, self.net.post, 'uri',
self.obj, content_type=self.content_type)
def test_post_wrong_post_response_nonce(self):
self.available_nonces = [jose.b64encode('good'), 'f']
self.available_nonces = [jose.b64encode(b'good'), b'f']
self.assertRaises(errors.BadNonce, self.net.post, 'uri',
self.obj, content_type=self.content_type)

View File

@@ -5,7 +5,7 @@ from acme import jose
class ClientRequestableResource(jose.JSONDeSerializable):
"""Resource that can be requested by client.
:ivar str resource_type: ACME resource identifier used in client
:ivar unicode resource_type: ACME resource identifier used in client
HTTPS requests in order to protect against MITM.
"""

View File

@@ -44,8 +44,10 @@ from acme.jose.json_util import (
decode_cert,
decode_csr,
decode_hex16,
encode_b64jose,
encode_cert,
encode_csr,
encode_hex16,
)
from acme.jose.jwa import (

View File

@@ -9,28 +9,30 @@
.. _`JOSE Base64`:
https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
.. warning:: Do NOT try to call this module "base64",
as it will "shadow" the standard library.
.. Do NOT try to call this module "base64", as it will "shadow" the
standard library.
"""
import base64
import six
def b64encode(data):
"""JOSE Base64 encode.
:param data: Data to be encoded.
:type data: str or bytearray
:type data: `bytes`
:returns: JOSE Base64 string.
:rtype: str
:rtype: bytes
:raises TypeError: if `data` is of incorrect type
"""
if not isinstance(data, str):
raise TypeError('argument should be str or bytearray')
return base64.urlsafe_b64encode(data).rstrip('=')
if not isinstance(data, six.binary_type):
raise TypeError('argument should be {0}'.format(six.binary_type))
return base64.urlsafe_b64encode(data).rstrip(b'=')
def b64decode(data):
@@ -38,21 +40,22 @@ def b64decode(data):
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: str or unicode
:type data: `bytes` or `unicode`
:returns: Decoded data.
:rtype: bytes
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, unicode):
if isinstance(data, six.string_types):
try:
data = data.encode('ascii')
except UnicodeEncodeError:
raise ValueError(
'unicode argument should contain only ASCII characters')
elif not isinstance(data, str):
elif not isinstance(data, six.binary_type):
raise TypeError('argument should be a str or unicode')
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))
return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))

View File

@@ -1,20 +1,22 @@
"""Tests for acme.jose.b64."""
import unittest
import six
# https://en.wikipedia.org/wiki/Base64#Examples
B64_PADDING_EXAMPLES = {
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
b'any carnal pleasure.': (b'YW55IGNhcm5hbCBwbGVhc3VyZS4', b'='),
b'any carnal pleasure': (b'YW55IGNhcm5hbCBwbGVhc3VyZQ', b'=='),
b'any carnal pleasur': (b'YW55IGNhcm5hbCBwbGVhc3Vy', b''),
b'any carnal pleasu': (b'YW55IGNhcm5hbCBwbGVhc3U', b'='),
b'any carnal pleas': (b'YW55IGNhcm5hbCBwbGVhcw', b'=='),
}
B64_URL_UNSAFE_EXAMPLES = {
chr(251) + chr(239): '--8',
chr(255) * 2: '__8',
six.int2byte(251) + six.int2byte(239): b'--8',
six.int2byte(255) * 2: b'__8',
}
@@ -26,12 +28,15 @@ class B64EncodeTest(unittest.TestCase):
from acme.jose.b64 import b64encode
return b64encode(data)
def test_empty(self):
self.assertEqual(self._call(b''), b'')
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
for text, b64 in six.iteritems(B64_URL_UNSAFE_EXAMPLES):
self.assertEqual(self._call(text), b64)
def test_different_paddings(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
for text, (b64, _) in six.iteritems(B64_PADDING_EXAMPLES):
self.assertEqual(self._call(text), b64)
def test_unicode_fails_with_type_error(self):
@@ -47,24 +52,24 @@ class B64DecodeTest(unittest.TestCase):
return b64decode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
for text, b64 in six.iteritems(B64_URL_UNSAFE_EXAMPLES):
self.assertEqual(self._call(b64), text)
def test_input_without_padding(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
for text, (b64, _) in six.iteritems(B64_PADDING_EXAMPLES):
self.assertEqual(self._call(b64), text)
def test_input_with_padding(self):
for text, (b64, pad) in B64_PADDING_EXAMPLES.iteritems():
for text, (b64, pad) in six.iteritems(B64_PADDING_EXAMPLES):
self.assertEqual(self._call(b64 + pad), text)
def test_unicode_with_ascii(self):
self.assertEqual(self._call(u'YQ'), 'a')
self.assertEqual(self._call(u'YQ'), b'a')
def test_non_ascii_unicode_fails(self):
self.assertRaises(ValueError, self._call, u'\u0105')
def test_type_error_no_unicode_or_str(self):
def test_type_error_no_unicode_or_bytes(self):
self.assertRaises(TypeError, self._call, object())

View File

@@ -3,12 +3,15 @@ import abc
import collections
import json
import six
from acme.jose import util
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
# pylint: disable=too-few-public-methods
@six.add_metaclass(abc.ABCMeta)
class JSONDeSerializable(object):
# pylint: disable=too-few-public-methods
"""Interface for (de)serializable JSON objects.
@@ -96,7 +99,6 @@ class JSONDeSerializable(object):
return Bar()
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def to_partial_json(self): # pragma: no cover
@@ -133,7 +135,7 @@ class JSONDeSerializable(object):
def _serialize(obj):
if isinstance(obj, JSONDeSerializable):
return _serialize(obj.to_partial_json())
if isinstance(obj, basestring): # strings are sequence
if isinstance(obj, six.string_types): # strings are Sequence
return obj
elif isinstance(obj, list):
return [_serialize(subobj) for subobj in obj]
@@ -143,14 +145,14 @@ class JSONDeSerializable(object):
return tuple(_serialize(subobj) for subobj in obj)
elif isinstance(obj, collections.Mapping):
return dict((_serialize(key), _serialize(value))
for key, value in obj.iteritems())
for key, value in six.iteritems(obj))
else:
return obj
return _serialize(self)
@util.abstractclassmethod
def from_json(cls, unused_jobj):
def from_json(cls, jobj): # pylint: disable=unused-argument
"""Deserialize a decoded JSON document.
:param jobj: Python object, composed of only other basic data
@@ -182,7 +184,11 @@ class JSONDeSerializable(object):
return json.dumps(self, default=self.json_dump_default, **kwargs)
def json_dumps_pretty(self):
"""Dump the object to pretty JSON document string."""
"""Dump the object to pretty JSON document string.
:rtype: str
"""
return self.json_dumps(sort_keys=True, indent=4, separators=(',', ': '))
@classmethod
@@ -190,7 +196,7 @@ class JSONDeSerializable(object):
"""Serialize Python object.
This function is meant to be passed as ``default`` to
:func:`json.load` or :func:`json.loads`. They call
:func:`json.dump` or :func:`json.dumps`. They call
``default(python_object)`` only for non-basic Python types, so
this function necessarily raises :class:`TypeError` if
``python_object`` is not an instance of

View File

@@ -11,6 +11,7 @@ import binascii
import logging
import OpenSSL
import six
from acme.jose import b64
from acme.jose import errors
@@ -109,7 +110,7 @@ class Field(object):
elif isinstance(value, dict):
return util.frozendict(
dict((cls.default_decoder(key), cls.default_decoder(value))
for key, value in value.iteritems()))
for key, value in six.iteritems(value)))
else: # integer or string
return value
@@ -167,17 +168,20 @@ class JSONObjectWithFieldsMeta(abc.ABCMeta):
for base in bases:
fields.update(getattr(base, '_fields', {}))
# Do not reorder, this class might override fields from base classes!
for key, value in dikt.items(): # not iterkeys() (in-place edit!)
for key, value in tuple(six.iteritems(dikt)):
# not six.iterkeys() (in-place edit!)
if isinstance(value, Field):
fields[key] = dikt.pop(key)
dikt['_orig_slots'] = dikt.get('__slots__', ())
dikt['__slots__'] = tuple(list(dikt['_orig_slots']) + fields.keys())
dikt['__slots__'] = tuple(
list(dikt['_orig_slots']) + list(six.iterkeys(fields)))
dikt['_fields'] = fields
return abc.ABCMeta.__new__(mcs, name, bases, dikt)
@six.add_metaclass(JSONObjectWithFieldsMeta)
class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
# pylint: disable=too-few-public-methods
"""JSON object with fields.
@@ -205,13 +209,12 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
assert Foo(bar='baz').bar == 'baz'
"""
__metaclass__ = JSONObjectWithFieldsMeta
@classmethod
def _defaults(cls):
"""Get default fields values."""
return dict([(slot, field.default) for slot, field
in cls._fields.iteritems() if field.omitempty])
in six.iteritems(cls._fields) if field.omitempty])
def __init__(self, **kwargs):
# pylint: disable=star-args
@@ -222,7 +225,7 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
"""Serialize fields to JSON."""
jobj = {}
omitted = set()
for slot, field in self._fields.iteritems():
for slot, field in six.iteritems(self._fields):
value = getattr(self, slot)
if field.omit(value):
@@ -246,7 +249,7 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
@classmethod
def _check_required(cls, jobj):
missing = set()
for _, field in cls._fields.iteritems():
for _, field in six.iteritems(cls._fields):
if not field.omitempty and field.json_name not in jobj:
missing.add(field.json_name)
@@ -260,7 +263,7 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
"""Deserialize fields from JSON."""
cls._check_required(jobj)
fields = {}
for slot, field in cls._fields.iteritems():
for slot, field in six.iteritems(cls._fields):
if field.json_name not in jobj and field.omitempty:
fields[slot] = field.default
else:
@@ -278,17 +281,31 @@ class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable):
return cls(**cls.fields_from_json(jobj))
def encode_b64jose(data):
"""Encode JOSE Base-64 field.
:param bytes data:
:rtype: `unicode`
"""
# b64encode produces ASCII characters only
return b64.b64encode(data).decode('ascii')
def decode_b64jose(data, size=None, minimum=False):
"""Decode JOSE Base-64 field.
:param unicode data:
:param int size: Required length (after decoding).
:param bool minimum: If ``True``, then `size` will be treated as
minimum required length, as opposed to exact equality.
:rtype: bytes
"""
error_cls = TypeError if six.PY2 else binascii.Error
try:
decoded = b64.b64decode(data)
except TypeError as error:
decoded = b64.b64decode(data.encode())
except error_cls as error:
raise errors.DeserializationError(error)
if size is not None and ((not minimum and len(decoded) != size)
@@ -297,35 +314,53 @@ def decode_b64jose(data, size=None, minimum=False):
return decoded
def encode_hex16(value):
"""Hexlify.
:param bytes value:
:rtype: unicode
"""
return binascii.hexlify(value).decode()
def decode_hex16(value, size=None, minimum=False):
"""Decode hexlified field.
:param unicode value:
:param int size: Required length (after decoding).
:param bool minimum: If ``True``, then `size` will be treated as
minimum required length, as opposed to exact equality.
:rtype: bytes
"""
value = value.encode()
if size is not None and ((not minimum and len(value) != size * 2)
or (minimum and len(value) < size * 2)):
raise errors.DeserializationError()
error_cls = TypeError if six.PY2 else binascii.Error
try:
return binascii.unhexlify(value)
except TypeError as error:
except error_cls as error:
raise errors.DeserializationError(error)
def encode_cert(cert):
"""Encode certificate as JOSE Base-64 DER.
:param cert: Certificate.
:type cert: :class:`acme.jose.util.ComparableX509`
:type cert: `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
:rtype: unicode
"""
return b64.b64encode(OpenSSL.crypto.dump_certificate(
return encode_b64jose(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, cert))
def decode_cert(b64der):
"""Decode JOSE Base-64 DER-encoded certificate."""
"""Decode JOSE Base-64 DER-encoded certificate.
:param unicode b64der:
:rtype: `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
"""
try:
return util.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, decode_b64jose(b64der)))
@@ -333,12 +368,22 @@ def decode_cert(b64der):
raise errors.DeserializationError(error)
def encode_csr(csr):
"""Encode CSR as JOSE Base-64 DER."""
return b64.b64encode(OpenSSL.crypto.dump_certificate_request(
"""Encode CSR as JOSE Base-64 DER.
:type csr: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
:rtype: unicode
"""
return encode_b64jose(OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr))
def decode_csr(b64der):
"""Decode JOSE Base-64 DER-encoded CSR."""
"""Decode JOSE Base-64 DER-encoded CSR.
:param unicode b64der:
:rtype: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
"""
try:
return util.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, decode_b64jose(b64der)))
@@ -372,7 +417,7 @@ class TypedJSONObjectWithFields(JSONObjectWithFields):
@classmethod
def get_type_cls(cls, jobj):
"""Get the registered class for ``jobj``."""
if cls in cls.TYPES.itervalues():
if cls in six.itervalues(cls.TYPES):
assert jobj[cls.type_field_name]
# cls is already registered type_cls, force to use it
# so that, e.g Revocation.from_json(jobj) fails if

View File

@@ -3,6 +3,7 @@ import itertools
import unittest
import mock
import six
from acme import test_util
@@ -92,8 +93,8 @@ class JSONObjectWithFieldsMetaTest(unittest.TestCase):
self.field2 = Field('Baz2')
# pylint: disable=invalid-name,missing-docstring,too-few-public-methods
# pylint: disable=blacklisted-name
@six.add_metaclass(JSONObjectWithFieldsMeta)
class A(object):
__metaclass__ = JSONObjectWithFieldsMeta
__slots__ = ('bar',)
baz = self.field
class B(A):
@@ -207,62 +208,82 @@ class JSONObjectWithFieldsTest(unittest.TestCase):
class DeEncodersTest(unittest.TestCase):
def setUp(self):
self.b64_cert = (
'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM'
'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz'
'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF'
'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx'
'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI'
'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW'
'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD'
'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1'
'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE'
'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd'
'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o'
u'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM'
u'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz'
u'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF'
u'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx'
u'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI'
u'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW'
u'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD'
u'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1'
u'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE'
u'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd'
u'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o'
)
self.b64_csr = (
'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F'
'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw'
'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb'
'20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As'
'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3'
'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG'
'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW'
'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg'
u'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F'
u'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw'
u'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb'
u'20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As'
u'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3'
u'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG'
u'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW'
u'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg'
)
def test_decode_b64_jose_padding_error(self):
from acme.jose.json_util import decode_b64jose
self.assertRaises(errors.DeserializationError, decode_b64jose, 'x')
def test_encode_b64jose(self):
from acme.jose.json_util import encode_b64jose
encoded = encode_b64jose(b'x')
self.assertTrue(isinstance(encoded, six.string_types))
self.assertEqual(u'eA', encoded)
def test_decode_b64_jose_size(self):
def test_decode_b64jose(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual('foo', decode_b64jose('Zm9v', size=3))
self.assertRaises(
errors.DeserializationError, decode_b64jose, 'Zm9v', size=2)
self.assertRaises(
errors.DeserializationError, decode_b64jose, 'Zm9v', size=4)
decoded = decode_b64jose(u'eA')
self.assertTrue(isinstance(decoded, six.binary_type))
self.assertEqual(b'x', decoded)
def test_decode_b64_jose_minimum_size(self):
def test_decode_b64jose_padding_error(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual('foo', decode_b64jose('Zm9v', size=3, minimum=True))
self.assertEqual('foo', decode_b64jose('Zm9v', size=2, minimum=True))
self.assertRaises(errors.DeserializationError, decode_b64jose, u'x')
def test_decode_b64jose_size(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual(b'foo', decode_b64jose(u'Zm9v', size=3))
self.assertRaises(
errors.DeserializationError, decode_b64jose, u'Zm9v', size=2)
self.assertRaises(
errors.DeserializationError, decode_b64jose, u'Zm9v', size=4)
def test_decode_b64jose_minimum_size(self):
from acme.jose.json_util import decode_b64jose
self.assertEqual(b'foo', decode_b64jose(u'Zm9v', size=3, minimum=True))
self.assertEqual(b'foo', decode_b64jose(u'Zm9v', size=2, minimum=True))
self.assertRaises(errors.DeserializationError, decode_b64jose,
'Zm9v', size=4, minimum=True)
u'Zm9v', size=4, minimum=True)
def test_encode_hex16(self):
from acme.jose.json_util import encode_hex16
encoded = encode_hex16(b'foo')
self.assertEqual(u'666f6f', encoded)
self.assertTrue(isinstance(encoded, six.string_types))
def test_decode_hex16(self):
from acme.jose.json_util import decode_hex16
self.assertEqual('foo', decode_hex16('666f6f'))
decoded = decode_hex16(u'666f6f')
self.assertEqual(b'foo', decoded)
self.assertTrue(isinstance(decoded, six.binary_type))
def test_decode_hex16_minimum_size(self):
from acme.jose.json_util import decode_hex16
self.assertEqual('foo', decode_hex16('666f6f', size=3, minimum=True))
self.assertEqual('foo', decode_hex16('666f6f', size=2, minimum=True))
self.assertEqual(b'foo', decode_hex16(u'666f6f', size=3, minimum=True))
self.assertEqual(b'foo', decode_hex16(u'666f6f', size=2, minimum=True))
self.assertRaises(errors.DeserializationError, decode_hex16,
'666f6f', size=4, minimum=True)
u'666f6f', size=4, minimum=True)
def test_decode_hex16_odd_length(self):
from acme.jose.json_util import decode_hex16
self.assertRaises(errors.DeserializationError, decode_hex16, 'x')
self.assertRaises(errors.DeserializationError, decode_hex16, u'x')
def test_encode_cert(self):
from acme.jose.json_util import encode_cert
@@ -273,7 +294,7 @@ class DeEncodersTest(unittest.TestCase):
cert = decode_cert(self.b64_cert)
self.assertTrue(isinstance(cert, util.ComparableX509))
self.assertEqual(cert, CERT)
self.assertRaises(errors.DeserializationError, decode_cert, '')
self.assertRaises(errors.DeserializationError, decode_cert, u'')
def test_encode_csr(self):
from acme.jose.json_util import encode_csr
@@ -284,7 +305,7 @@ class DeEncodersTest(unittest.TestCase):
csr = decode_csr(self.b64_csr)
self.assertTrue(isinstance(csr, util.ComparableX509))
self.assertEqual(csr, CSR)
self.assertRaises(errors.DeserializationError, decode_csr, '')
self.assertRaises(errors.DeserializationError, decode_csr, u'')
class TypedJSONObjectWithFieldsTest(unittest.TestCase):

View File

@@ -4,6 +4,7 @@ https://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-40
"""
import abc
import collections
import logging
import cryptography.exceptions
@@ -27,7 +28,7 @@ class JWA(interfaces.JSONDeSerializable): # pylint: disable=abstract-method
"""JSON Web Algorithm."""
class JWASignature(JWA):
class JWASignature(JWA, collections.Hashable):
"""JSON Web Signature Algorithm."""
SIGNATURES = {}
@@ -39,6 +40,9 @@ class JWASignature(JWA):
return NotImplemented
return self.name == other.name
def __hash__(self):
return hash((self.__class__, self.name))
def __ne__(self, other):
return not self == other

View File

@@ -58,12 +58,12 @@ class JWAHSTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def test_it(self):
from acme.jose.jwa import HS256
sig = (
"\xceR\xea\xcd\x94\xab\xcf\xfb\xe0\xacA.:\x1a'\x08i\xe2\xc4"
"\r\x85+\x0e\x85\xaeUZ\xd4\xb3\x97zO"
b"\xceR\xea\xcd\x94\xab\xcf\xfb\xe0\xacA.:\x1a'\x08i\xe2\xc4"
b"\r\x85+\x0e\x85\xaeUZ\xd4\xb3\x97zO"
)
self.assertEqual(HS256.sign('some key', 'foo'), sig)
self.assertTrue(HS256.verify('some key', 'foo', sig) is True)
self.assertTrue(HS256.verify('some key', 'foo', sig + '!') is False)
self.assertEqual(HS256.sign(b'some key', b'foo'), sig)
self.assertTrue(HS256.verify(b'some key', b'foo', sig) is True)
self.assertTrue(HS256.verify(b'some key', b'foo', sig + b'!') is False)
class JWARSTest(unittest.TestCase):
@@ -71,32 +71,33 @@ class JWARSTest(unittest.TestCase):
def test_sign_no_private_part(self):
from acme.jose.jwa import RS256
self.assertRaises(
errors.Error, RS256.sign, RSA512_KEY.public_key(), 'foo')
errors.Error, RS256.sign, RSA512_KEY.public_key(), b'foo')
def test_sign_key_too_small(self):
from acme.jose.jwa import RS256
from acme.jose.jwa import PS256
self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, 'foo')
self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, 'foo')
self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, b'foo')
self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, b'foo')
def test_rs(self):
from acme.jose.jwa import RS256
sig = (
'|\xc6\xb2\xa4\xab(\x87\x99\xfa*:\xea\xf8\xa0N&}\x9f\x0f\xc0O'
'\xc6t\xa3\xe6\xfa\xbb"\x15Y\x80Y\xe0\x81\xb8\x88)\xba\x0c\x9c'
'\xa4\x99\x1e\x19&\xd8\xc7\x99S\x97\xfc\x85\x0cOV\xe6\x07\x99'
'\xd2\xb9.>}\xfd'
b'|\xc6\xb2\xa4\xab(\x87\x99\xfa*:\xea\xf8\xa0N&}\x9f\x0f\xc0O'
b'\xc6t\xa3\xe6\xfa\xbb"\x15Y\x80Y\xe0\x81\xb8\x88)\xba\x0c\x9c'
b'\xa4\x99\x1e\x19&\xd8\xc7\x99S\x97\xfc\x85\x0cOV\xe6\x07\x99'
b'\xd2\xb9.>}\xfd'
)
self.assertEqual(RS256.sign(RSA512_KEY, 'foo'), sig)
self.assertTrue(RS256.verify(RSA512_KEY.public_key(), 'foo', sig))
self.assertEqual(RS256.sign(RSA512_KEY, b'foo'), sig)
self.assertTrue(RS256.verify(RSA512_KEY.public_key(), b'foo', sig))
self.assertFalse(RS256.verify(
RSA512_KEY.public_key(), 'foo', sig + '!'))
RSA512_KEY.public_key(), b'foo', sig + b'!'))
def test_ps(self):
from acme.jose.jwa import PS256
sig = PS256.sign(RSA1024_KEY, 'foo')
self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig))
self.assertFalse(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig + '!'))
sig = PS256.sign(RSA1024_KEY, b'foo')
self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), b'foo', sig))
self.assertFalse(PS256.verify(
RSA1024_KEY.public_key(), b'foo', sig + b'!'))
if __name__ == '__main__':

View File

@@ -9,7 +9,8 @@ from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
from acme.jose import b64
import six
from acme.jose import errors
from acme.jose import json_util
from acme.jose import util
@@ -87,7 +88,7 @@ class JWK(json_util.TypedJSONObjectWithFields):
key, cls.cryptography_key_types):
raise errors.Error("Unable to deserialize {0} into {1}".format(
key.__class__, cls.__class__))
for jwk_cls in cls.TYPES.itervalues():
for jwk_cls in six.itervalues(cls.TYPES):
if isinstance(key, jwk_cls.cryptography_key_types):
return jwk_cls(key=key)
raise errors.Error("Unsupported algorithm: {0}".format(key.__class__))
@@ -127,11 +128,11 @@ class JWKOct(JWK):
# algorithm intended to be used with the key, unless the
# application uses another means or convention to determine
# the algorithm used.
return {'k': self.key}
return {'k': json_util.encode_b64jose(self.key)}
@classmethod
def fields_from_json(cls, jobj):
return cls(key=jobj['k'])
return cls(key=json_util.decode_b64jose(jobj['k']))
def public_key(self):
return self
@@ -158,18 +159,25 @@ class JWKRSA(JWK):
@classmethod
def _encode_param(cls, data):
"""Encode Base64urlUInt.
:type data: long
:rtype: unicode
"""
def _leading_zeros(arg):
if len(arg) % 2:
return '0' + arg
return arg
return b64.b64encode(binascii.unhexlify(
return json_util.encode_b64jose(binascii.unhexlify(
_leading_zeros(hex(data)[2:].rstrip('L'))))
@classmethod
def _decode_param(cls, data):
"""Decode Base64urlUInt."""
try:
return long(binascii.hexlify(json_util.decode_b64jose(data)), 16)
return int(binascii.hexlify(json_util.decode_b64jose(data)), 16)
except ValueError: # invalid literal for long() with base 16
raise errors.DeserializationError()
@@ -198,17 +206,20 @@ class JWKRSA(JWK):
raise errors.Error(
"Some private parameters are missing: {0}".format(
all_params))
p, q, dp, dq, qi = tuple(cls._decode_param(x) for x in all_params)
p, q, dp, dq, qi = tuple(
cls._decode_param(x) for x in all_params)
# TODO: check for oth
else:
p, q = rsa.rsa_recover_prime_factors(n, e, d) # cryptography>=0.8
# cryptography>=0.8
p, q = rsa.rsa_recover_prime_factors(n, e, d)
dp = rsa.rsa_crt_dmp1(d, p)
dq = rsa.rsa_crt_dmq1(d, q)
qi = rsa.rsa_crt_iqmp(p, q)
key = rsa.RSAPrivateNumbers(
p, q, d, dp, dq, qi, public_numbers).private_key(default_backend())
p, q, d, dp, dq, qi, public_numbers).private_key(
default_backend())
return cls(key=key)
@@ -234,4 +245,4 @@ class JWKRSA(JWK):
'qi': private.iqmp,
}
return dict((key, self._encode_param(value))
for key, value in params.iteritems())
for key, value in six.iteritems(params))

View File

@@ -4,6 +4,7 @@ import unittest
from acme import test_util
from acme.jose import errors
from acme.jose import json_util
from acme.jose import util
@@ -29,8 +30,8 @@ class JWKOctTest(unittest.TestCase):
def setUp(self):
from acme.jose.jwk import JWKOct
self.jwk = JWKOct(key='foo')
self.jobj = {'kty': 'oct', 'k': 'foo'}
self.jwk = JWKOct(key=b'foo')
self.jobj = {'kty': 'oct', 'k': json_util.encode_b64jose(b'foo')}
def test_to_partial_json(self):
self.assertEqual(self.jwk.to_partial_json(), self.jobj)
@@ -45,7 +46,7 @@ class JWKOctTest(unittest.TestCase):
def test_load(self):
from acme.jose.jwk import JWKOct
self.assertEqual(self.jwk, JWKOct.load('foo'))
self.assertEqual(self.jwk, JWKOct.load(b'foo'))
def test_public_key(self):
self.assertTrue(self.jwk.public_key() is self.jwk)
@@ -64,7 +65,8 @@ class JWKRSATest(unittest.TestCase):
'n': 'm2Fylv-Uz7trgTW8EBHP3FQSMeZs2GNQ6VRo1sIVJEk',
}
# pylint: disable=protected-access
self.jwk256_not_comparable = JWKRSA(key=RSA256_KEY.public_key()._wrapped)
self.jwk256_not_comparable = JWKRSA(
key=RSA256_KEY.public_key()._wrapped)
self.jwk512 = JWKRSA(key=RSA512_KEY.public_key())
self.jwk512json = {
'kty': 'RSA',
@@ -91,6 +93,12 @@ class JWKRSATest(unittest.TestCase):
self.jwk256_not_comparable.key, util.ComparableRSAKey))
self.assertEqual(self.jwk256, self.jwk256_not_comparable)
def test_encode_param_zero(self):
from acme.jose.jwk import JWKRSA
# pylint: disable=protected-access
# TODO: move encode/decode _param to separate class
self.assertEqual('AA', JWKRSA._encode_param(0))
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
self.assertEqual(self.jwk512, self.jwk512)

View File

@@ -4,6 +4,7 @@ import base64
import sys
import OpenSSL
import six
from acme.jose import b64
from acme.jose import errors
@@ -80,7 +81,7 @@ class Header(json_util.JSONObjectWithFields):
def not_omitted(self):
"""Fields that would not be omitted in the JSON object."""
return dict((name, getattr(self, name))
for name, field in self._fields.iteritems()
for name, field in six.iteritems(self._fields)
if not field.omit(getattr(self, name)))
def __add__(self, other):
@@ -148,15 +149,22 @@ class Signature(json_util.JSONObjectWithFields):
header_cls = Header
__slots__ = ('combined',)
protected = json_util.Field(
'protected', omitempty=True, default='',
decoder=json_util.decode_b64jose, encoder=b64.b64encode) # TODO: utf-8?
protected = json_util.Field('protected', omitempty=True, default='')
header = json_util.Field(
'header', omitempty=True, default=header_cls(),
decoder=header_cls.from_json)
signature = json_util.Field(
'signature', decoder=json_util.decode_b64jose,
encoder=b64.b64encode)
encoder=json_util.encode_b64jose)
@protected.encoder
def protected(value): # pylint: disable=missing-docstring,no-self-argument
# wrong type guess (Signature, not bytes) | pylint: disable=no-member
return json_util.encode_b64jose(value.encode('utf-8'))
@protected.decoder
def protected(value): # pylint: disable=missing-docstring,no-self-argument
return json_util.decode_b64jose(value).decode('utf-8')
def __init__(self, **kwargs):
if 'combined' not in kwargs:
@@ -178,6 +186,11 @@ class Signature(json_util.JSONObjectWithFields):
kwargs['combined'] = combined
return kwargs
@classmethod
def _msg(cls, protected, payload):
return (b64.b64encode(protected.encode('utf-8')) + b'.' +
b64.b64encode(payload))
def verify(self, payload, key=None):
"""Verify.
@@ -188,8 +201,7 @@ class Signature(json_util.JSONObjectWithFields):
key = self.combined.find_key() if key is None else key
return self.combined.alg.verify(
key=key.key, sig=self.signature,
msg=(b64.b64encode(self.protected) + '.' +
b64.b64encode(payload)))
msg=self._msg(self.protected, payload))
@classmethod
def sign(cls, payload, key, alg, include_jwk=True,
@@ -220,8 +232,7 @@ class Signature(json_util.JSONObjectWithFields):
protected = ''
header = cls.header_cls(**header_params) # pylint: disable=star-args
signature = alg.sign(key.key, b64.b64encode(protected)
+ '.' + b64.b64encode(payload))
signature = alg.sign(key.key, cls._msg(protected, payload))
return cls(protected=protected, header=header, signature=signature)
@@ -244,7 +255,7 @@ class JWS(json_util.JSONObjectWithFields):
"""JSON Web Signature.
:ivar str payload: JWS Payload.
:ivar str signaturea: JWS Signatures.
:ivar str signature: JWS Signatures.
"""
__slots__ = ('payload', 'signatures')
@@ -272,33 +283,45 @@ class JWS(json_util.JSONObjectWithFields):
return self.signatures[0]
def to_compact(self):
"""Compact serialization."""
"""Compact serialization.
:rtype: bytes
"""
assert len(self.signatures) == 1
assert 'alg' not in self.signature.header.not_omitted()
# ... it must be in protected
return '{0}.{1}.{2}'.format(
b64.b64encode(self.signature.protected),
b64.b64encode(self.payload),
return (
b64.b64encode(self.signature.protected.encode('utf-8'))
+ b'.' +
b64.b64encode(self.payload)
+ b'.' +
b64.b64encode(self.signature.signature))
@classmethod
def from_compact(cls, compact):
"""Compact deserialization."""
"""Compact deserialization.
:param bytes compact:
"""
try:
protected, payload, signature = compact.split('.')
protected, payload, signature = compact.split(b'.')
except ValueError:
raise errors.DeserializationError(
'Compact JWS serialization should comprise of exactly'
' 3 dot-separated components')
sig = cls.signature_cls(protected=json_util.decode_b64jose(protected),
signature=json_util.decode_b64jose(signature))
return cls(payload=json_util.decode_b64jose(payload), signatures=(sig,))
sig = cls.signature_cls(
protected=b64.b64decode(protected).decode('utf-8'),
signature=b64.b64decode(signature))
return cls(payload=b64.b64decode(payload), signatures=(sig,))
def to_partial_json(self, flat=True): # pylint: disable=arguments-differ
assert self.signatures
payload = b64.b64encode(self.payload)
payload = json_util.encode_b64jose(self.payload)
if flat and len(self.signatures) == 1:
ret = self.signatures[0].to_partial_json()
@@ -329,34 +352,36 @@ class CLI(object):
def sign(cls, args):
"""Sign."""
key = args.alg.kty.load(args.key.read())
args.key.close()
if args.protect is None:
args.protect = []
if args.compact:
args.protect.append('alg')
sig = JWS.sign(payload=sys.stdin.read(), key=key, alg=args.alg,
sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
protect=set(args.protect))
if args.compact:
print sig.to_compact()
six.print_(sig.to_compact().decode('utf-8'))
else: # JSON
print sig.json_dumps_pretty()
six.print_(sig.json_dumps_pretty())
@classmethod
def verify(cls, args):
"""Verify."""
if args.compact:
sig = JWS.from_compact(sys.stdin.read())
sig = JWS.from_compact(sys.stdin.read().encode())
else: # JSON
try:
sig = JWS.json_loads(sys.stdin.read())
except errors.Error as error:
print error
six.print_(error)
return -1
if args.key is not None:
assert args.kty is not None
key = args.kty.load(args.key.read()).public_key()
args.key.close()
else:
key = None
@@ -387,7 +412,7 @@ class CLI(object):
parser_sign = subparsers.add_parser('sign')
parser_sign.set_defaults(func=cls.sign)
parser_sign.add_argument(
'-k', '--key', type=argparse.FileType(), required=True)
'-k', '--key', type=argparse.FileType('rb'), required=True)
parser_sign.add_argument(
'-a', '--alg', type=cls._alg_type, default=jwa.RS256)
parser_sign.add_argument(
@@ -396,7 +421,7 @@ class CLI(object):
parser_verify = subparsers.add_parser('verify')
parser_verify.set_defaults(func=cls.verify)
parser_verify.add_argument(
'-k', '--key', type=argparse.FileType(), required=False)
'-k', '--key', type=argparse.FileType('rb'), required=False)
parser_verify.add_argument(
'--kty', type=cls._kty_type, required=False)

View File

@@ -7,8 +7,8 @@ import OpenSSL
from acme import test_util
from acme.jose import b64
from acme.jose import errors
from acme.jose import json_util
from acme.jose import jwa
from acme.jose import jwk
@@ -73,7 +73,7 @@ class HeaderTest(unittest.TestCase):
self.assertEqual(jobj, {'x5c': [cert_b64, cert_b64]})
self.assertEqual(header, Header.from_json(jobj))
jobj['x5c'][0] = base64.b64encode(
'xxx' + OpenSSL.crypto.dump_certificate(
b'xxx' + OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, CERT))
self.assertRaises(errors.DeserializationError, Header.from_json, jobj)
@@ -90,7 +90,7 @@ class SignatureTest(unittest.TestCase):
from acme.jose.jws import Header
from acme.jose.jws import Signature
self.assertEqual(
Signature(signature='foo', header=Header(alg=jwa.RS256)),
Signature(signature=b'foo', header=Header(alg=jwa.RS256)),
Signature.from_json(
{'signature': 'Zm9v', 'header': {'alg': 'RS256'}}))
@@ -109,12 +109,12 @@ class JWSTest(unittest.TestCase):
from acme.jose.jws import JWS
self.unprotected = JWS.sign(
payload='foo', key=self.privkey, alg=jwa.RS256)
payload=b'foo', key=self.privkey, alg=jwa.RS256)
self.protected = JWS.sign(
payload='foo', key=self.privkey, alg=jwa.RS256,
payload=b'foo', key=self.privkey, alg=jwa.RS256,
protect=frozenset(['jwk', 'alg']))
self.mixed = JWS.sign(
payload='foo', key=self.privkey, alg=jwa.RS256,
payload=b'foo', key=self.privkey, alg=jwa.RS256,
protect=frozenset(['alg']))
def test_pubkey_jwk(self):
@@ -134,8 +134,8 @@ class JWSTest(unittest.TestCase):
def test_compact_lost_unprotected(self):
compact = self.mixed.to_compact()
self.assertEqual(
'eyJhbGciOiAiUlMyNTYifQ.Zm9v.OHdxFVj73l5LpxbFp1AmYX4yJM0Pyb'
'_893n1zQjpim_eLS5J1F61lkvrCrCDErTEJnBGOGesJ72M7b6Ve1cAJA',
b'eyJhbGciOiAiUlMyNTYifQ.Zm9v.OHdxFVj73l5LpxbFp1AmYX4yJM0Pyb'
b'_893n1zQjpim_eLS5J1F61lkvrCrCDErTEJnBGOGesJ72M7b6Ve1cAJA',
compact)
from acme.jose.jws import JWS
@@ -147,7 +147,7 @@ class JWSTest(unittest.TestCase):
def test_from_compact_missing_components(self):
from acme.jose.jws import JWS
self.assertRaises(errors.DeserializationError, JWS.from_compact, '.')
self.assertRaises(errors.DeserializationError, JWS.from_compact, b'.')
def test_json_omitempty(self):
protected_jobj = self.protected.to_partial_json(flat=True)
@@ -164,10 +164,12 @@ class JWSTest(unittest.TestCase):
def test_json_flat(self):
jobj_to = {
'signature': b64.b64encode(self.mixed.signature.signature),
'payload': b64.b64encode('foo'),
'signature': json_util.encode_b64jose(
self.mixed.signature.signature),
'payload': json_util.encode_b64jose(b'foo'),
'header': self.mixed.signature.header,
'protected': b64.b64encode(self.mixed.signature.protected),
'protected': json_util.encode_b64jose(
self.mixed.signature.protected.encode('utf-8')),
}
jobj_from = jobj_to.copy()
jobj_from['header'] = jobj_from['header'].to_json()
@@ -179,7 +181,7 @@ class JWSTest(unittest.TestCase):
def test_json_not_flat(self):
jobj_to = {
'signatures': (self.mixed.signature,),
'payload': b64.b64encode('foo'),
'payload': json_util.encode_b64jose(b'foo'),
}
jobj_from = jobj_to.copy()
jobj_from['signatures'] = [jobj_to['signatures'][0].to_json()]

View File

@@ -3,6 +3,7 @@ import collections
from cryptography.hazmat.primitives.asymmetric import rsa
import OpenSSL
import six
class abstractclassmethod(classmethod):
@@ -156,7 +157,8 @@ class ImmutableMap(collections.Mapping, collections.Hashable):
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, ', '.join(
'{0}={1!r}'.format(key, value) for key, value in self.iteritems()))
'{0}={1!r}'.format(key, value)
for key, value in six.iteritems(self)))
class frozendict(collections.Mapping, collections.Hashable):
@@ -174,7 +176,7 @@ class frozendict(collections.Mapping, collections.Hashable):
# TODO: support generators/iterators
object.__setattr__(self, '_items', items)
object.__setattr__(self, '_keys', tuple(sorted(items.iterkeys())))
object.__setattr__(self, '_keys', tuple(sorted(six.iterkeys(items))))
def __getitem__(self, key):
return self._items[key]
@@ -185,8 +187,11 @@ class frozendict(collections.Mapping, collections.Hashable):
def __len__(self):
return len(self._items)
def _sorted_items(self):
return tuple((key, self[key]) for key in self._keys)
def __hash__(self):
return hash(tuple((key, value) for key, value in self.items()))
return hash(self._sorted_items())
def __getattr__(self, name):
try:
@@ -198,5 +203,5 @@ class frozendict(collections.Mapping, collections.Hashable):
raise AttributeError("can't set attribute")
def __repr__(self):
return 'frozendict({0})'.format(', '.join(
'{0}={1!r}'.format(key, value) for key, value in self.iteritems()))
return 'frozendict({0})'.format(', '.join('{0}={1!r}'.format(
key, value) for key, value in self._sorted_items()))

View File

@@ -2,6 +2,8 @@
import functools
import unittest
import six
from acme import test_util
@@ -168,13 +170,13 @@ class frozendictTest(unittest.TestCase): # pylint: disable=invalid-name
def test_init_other_raises_type_error(self):
from acme.jose.util import frozendict
# specifically fail for generators...
self.assertRaises(TypeError, frozendict, {'a': 'b'}.iteritems())
self.assertRaises(TypeError, frozendict, six.iteritems({'a': 'b'}))
def test_len(self):
self.assertEqual(2, len(self.fdict))
def test_hash(self):
self.assertEqual(1278944519403861804, hash(self.fdict))
self.assertTrue(isinstance(hash(self.fdict), int))
def test_getattr_proxy(self):
self.assertEqual(1, self.fdict.x)

View File

@@ -1,5 +1,4 @@
"""ACME JOSE JWS."""
from acme import errors
from acme import jose
@@ -9,29 +8,15 @@ class Header(jose.Header):
.. todo:: Implement ``acmePath``.
"""
nonce = jose.Field('nonce', omitempty=True)
@classmethod
def validate_nonce(cls, nonce):
"""Validate nonce.
:returns: ``None`` if ``nonce`` is valid, decoding errors otherwise.
"""
try:
jose.b64decode(nonce)
except (ValueError, TypeError) as error:
return error
else:
return None
nonce = jose.Field('nonce', omitempty=True, encoder=jose.encode_b64jose)
@nonce.decoder
def nonce(value): # pylint: disable=missing-docstring,no-self-argument
error = Header.validate_nonce(value)
if error is not None:
try:
return jose.decode_b64jose(value)
except jose.DeserializationError as error:
# TODO: custom error
raise errors.Error("Invalid nonce: {0}".format(error))
return value
raise jose.DeserializationError("Invalid nonce: {0}".format(error))
class Signature(jose.Signature):

View File

@@ -1,7 +1,6 @@
"""Tests for acme.jws."""
import unittest
from acme import errors
from acme import jose
from acme import test_util
@@ -12,8 +11,8 @@ KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
class HeaderTest(unittest.TestCase):
"""Tests for acme.jws.Header."""
good_nonce = jose.b64encode('foo')
wrong_nonce = 'F'
good_nonce = jose.encode_b64jose(b'foo')
wrong_nonce = u'F'
# Following just makes sure wrong_nonce is wrong
try:
jose.b64decode(wrong_nonce)
@@ -22,17 +21,13 @@ class HeaderTest(unittest.TestCase):
else:
assert False # pragma: no cover
def test_validate_nonce(self):
from acme.jws import Header
self.assertTrue(Header.validate_nonce(self.good_nonce) is None)
self.assertFalse(Header.validate_nonce(self.wrong_nonce) is None)
def test_nonce_decoder(self):
from acme.jws import Header
nonce_field = Header._fields['nonce']
self.assertRaises(errors.Error, nonce_field.decode, self.wrong_nonce)
self.assertEqual(self.good_nonce, nonce_field.decode(self.good_nonce))
self.assertRaises(
jose.DeserializationError, nonce_field.decode, self.wrong_nonce)
self.assertEqual(b'foo', nonce_field.decode(self.good_nonce))
class JWSTest(unittest.TestCase):
@@ -41,13 +36,16 @@ class JWSTest(unittest.TestCase):
def setUp(self):
self.privkey = KEY
self.pubkey = self.privkey.public_key()
self.nonce = jose.b64encode('Nonce')
self.nonce = jose.b64encode(b'Nonce')
def test_it(self):
from acme.jws import JWS
jws = JWS.sign(payload='foo', key=self.privkey,
jws = JWS.sign(payload=b'foo', key=self.privkey,
alg=jose.RS256, nonce=self.nonce)
JWS.from_json(jws.to_json())
self.assertEqual(jws.signature.combined.nonce, self.nonce)
# TODO: check that nonce is in protected header
self.assertEqual(jws, JWS.from_json(jws.to_json()))
if __name__ == '__main__':

View File

@@ -1,5 +1,7 @@
"""ACME protocol messages."""
import urlparse
import collections
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error
from acme import challenges
from acme import fields
@@ -12,6 +14,10 @@ class Error(jose.JSONObjectWithFields, Exception):
https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00
:ivar unicode typ:
:ivar unicode title:
:ivar unicode detail:
"""
ERROR_TYPE_NAMESPACE = 'urn:acme:error:'
ERROR_TYPE_DESCRIPTIONS = {
@@ -49,7 +55,11 @@ class Error(jose.JSONObjectWithFields, Exception):
@property
def description(self):
"""Hardcoded error description based on its type."""
"""Hardcoded error description based on its type.
:rtype: unicode
"""
return self.ERROR_TYPE_DESCRIPTIONS[self.typ]
def __str__(self):
@@ -59,7 +69,7 @@ class Error(jose.JSONObjectWithFields, Exception):
return str(self.detail)
class _Constant(jose.JSONDeSerializable):
class _Constant(jose.JSONDeSerializable, collections.Hashable):
"""ACME constant."""
__slots__ = ('name',)
POSSIBLE_NAMES = NotImplemented
@@ -84,6 +94,9 @@ class _Constant(jose.JSONDeSerializable):
def __eq__(self, other):
return isinstance(other, type(self)) and other.name == self.name
def __hash__(self):
return hash((self.__class__, self.name))
def __ne__(self, other):
return not self == other
@@ -108,7 +121,8 @@ IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
class Identifier(jose.JSONObjectWithFields):
"""ACME identifier.
:ivar acme.messages.IdentifierType typ:
:ivar IdentifierType typ:
:ivar unicode value:
"""
typ = jose.Field('type', decoder=IdentifierType.from_json)
@@ -127,7 +141,7 @@ class Resource(jose.JSONObjectWithFields):
class ResourceWithURI(Resource):
"""ACME Resource with URI.
:ivar str uri: Location of the resource.
:ivar unicode uri: Location of the resource.
"""
uri = jose.Field('uri') # no ChallengeResource.uri
@@ -141,7 +155,10 @@ class Registration(interfaces.ClientRequestableResource, ResourceBody):
"""Registration Resource Body.
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec
:ivar tuple contact: Contact information following ACME spec,
`tuple` of `unicode`.
:ivar unicode recovery_token:
:ivar unicode agreement:
"""
resource_type = 'new-reg'
@@ -188,8 +205,8 @@ class RegistrationResource(interfaces.ClientRequestableResource,
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar str new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar str terms_of_service: URL for the CA TOS.
:ivar unicode new_authzr_uri: URI found in the 'next' ``Link`` header
:ivar unicode terms_of_service: URL for the CA TOS.
"""
resource_type = 'reg'
@@ -212,6 +229,7 @@ class ChallengeBody(ResourceBody):
call ``challb.x`` to get ``challb.chall.x`` contents.
:ivar acme.messages.Status status:
:ivar datetime.datetime validated:
:ivar Error error:
"""
__slots__ = ('chall',)
@@ -241,7 +259,7 @@ class ChallengeResource(Resource):
"""Challenge Resource.
:ivar acme.messages.ChallengeBody body:
:ivar str authzr_uri: URI found in the 'up' ``Link`` header.
:ivar unicode authzr_uri: URI found in the 'up' ``Link`` header.
"""
body = jose.Field('body', decoder=ChallengeBody.from_json)
@@ -261,8 +279,6 @@ class Authorization(interfaces.ClientRequestableResource, ResourceBody):
:ivar list challenges: `list` of `.ChallengeBody`
:ivar tuple combinations: Challenge combinations (`tuple` of `tuple`
of `int`, as opposed to `list` of `list` from the spec).
:ivar acme.jose.jwk.JWK key: Public key.
:ivar tuple contact:
:ivar acme.messages.Status status:
:ivar datetime.datetime expires:
@@ -294,7 +310,7 @@ class AuthorizationResource(ResourceWithURI):
"""Authorization Resource.
:ivar acme.messages.Authorization body:
:ivar str new_cert_uri: URI found in the 'next' ``Link`` header
:ivar unicode new_cert_uri: URI found in the 'next' ``Link`` header
"""
body = jose.Field('body', decoder=Authorization.from_json)
@@ -321,7 +337,7 @@ class CertificateResource(interfaces.ClientRequestableResource,
:ivar acme.jose.util.ComparableX509 body:
`OpenSSL.crypto.X509` wrapped in `.ComparableX509`
:ivar str cert_chain_uri: URI found in the 'up' ``Link`` header
:ivar unicode cert_chain_uri: URI found in the 'up' ``Link`` header
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
"""
@@ -353,4 +369,4 @@ class Revocation(interfaces.ClientRequestableResource,
:param str base: New Registration Resource or server (root) URL.
"""
return urlparse.urljoin(base, cls.PATH)
return urllib_parse.urljoin(base, cls.PATH)

View File

@@ -12,22 +12,20 @@ logger = logging.getLogger(__name__)
class Signature(jose.JSONObjectWithFields):
"""ACME signature.
:ivar str alg: Signature algorithm.
:ivar str sig: Signature.
:ivar str nonce: Nonce.
:ivar jwk: JWK.
:type jwk: :class:`JWK`
:ivar .JWASignature alg: Signature algorithm.
:ivar bytes sig: Signature.
:ivar bytes nonce: Nonce.
:ivar .JWK jwk: JWK.
"""
NONCE_SIZE = 16
"""Minimum size of nonce in bytes."""
alg = jose.Field('alg', decoder=jose.JWASignature.from_json)
sig = jose.Field('sig', encoder=jose.b64encode,
sig = jose.Field('sig', encoder=jose.encode_b64jose,
decoder=jose.decode_b64jose)
nonce = jose.Field(
'nonce', encoder=jose.b64encode, decoder=functools.partial(
'nonce', encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE, minimum=True))
jwk = jose.Field('jwk', decoder=jose.JWK.from_json)
@@ -35,27 +33,26 @@ class Signature(jose.JSONObjectWithFields):
def from_msg(cls, msg, key, nonce=None, nonce_size=None, alg=jose.RS256):
"""Create signature with nonce prepended to the message.
.. todo:: Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param str msg: Message to be signed.
:param bytes msg: Message to be signed.
:param key: Key used for signing.
:type key: `cryptography.hazmat.primitives.assymetric.rsa.RSAPrivateKey`
(optionally wrapped in `.ComparableRSAKey`).
:param str nonce: Nonce to be used. If None, nonce of
:param bytes nonce: Nonce to be used. If None, nonce of
``nonce_size`` will be randomly generated.
:param int nonce_size: Size of the automatically generated nonce.
Defaults to :const:`NONCE_SIZE`.
:param .JWASignature alg:
"""
nonce_size = cls.NONCE_SIZE if nonce_size is None else nonce_size
nonce = os.urandom(nonce_size) if nonce is None else nonce
msg_with_nonce = nonce + msg
sig = alg.sign(key, nonce + msg)
logger.debug('%s signed as %s', msg_with_nonce, sig)
logger.debug('%r signed as %r', msg_with_nonce, sig)
return cls(alg=alg, sig=sig, nonce=nonce,
jwk=alg.kty(key=key.public_key()))
@@ -63,7 +60,7 @@ class Signature(jose.JSONObjectWithFields):
def verify(self, msg):
"""Verify the signature.
:param str msg: Message that was used in signing.
:param bytes msg: Message that was used in signing.
"""
# self.alg is not Field, but JWA | pylint: disable=no-member

View File

@@ -13,12 +13,12 @@ class SignatureTest(unittest.TestCase):
"""Tests for acme.sig.Signature."""
def setUp(self):
self.msg = 'message'
self.sig = ('IC\xd8*\xe7\x14\x9e\x19S\xb7\xcf\xec3\x12\xe2\x8a\x03'
'\x98u\xff\xf0\x94\xe2\xd7<\x8f\xa8\xed\xa4KN\xc3\xaa'
'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.msg = b'message'
self.sig = (b'IC\xd8*\xe7\x14\x9e\x19S\xb7\xcf\xec3\x12\xe2\x8a\x03'
b'\x98u\xff\xf0\x94\xe2\xd7<\x8f\xa8\xed\xa4KN\xc3\xaa'
b'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
b'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = b'\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.alg = jose.RS256
self.jwk = jose.JWKRSA(key=KEY.public_key())
@@ -54,7 +54,7 @@ class SignatureTest(unittest.TestCase):
self.assertTrue(self.signature.verify(self.msg))
def test_verify_bad_fails(self):
self.assertFalse(self.signature.verify(self.msg + 'x'))
self.assertFalse(self.signature.verify(self.msg + b'x'))
@classmethod
def _from_msg(cls, *args, **kwargs):

View File

@@ -14,16 +14,27 @@ install_requires = [
'PyOpenSSL',
'pytz',
'requests',
'six',
'werkzeug',
]
testing_extras = [
'nose',
'tox',
]
setup(
name='acme',
packages=find_packages(),
install_requires=install_requires,
extras_require={
'testing': testing_extras,
},
entry_points={
'console_scripts': [
'jws = acme.jose.jws:CLI.run',
],
},
test_suite='acme',
)

View File

@@ -41,7 +41,7 @@ authzr, authzr_response = acme.poll(authzr)
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')))
'acme', os.path.join('testdata', 'csr.der')))
try:
acme.request_issuance(csr, (authzr,))
except messages.Error as error:

16
tox.ini
View File

@@ -6,13 +6,9 @@
# acme and letsencrypt are not yet on pypi, so when Tox invokes
# "install *.zip", it will not find deps
skipsdist = true
envlist = py26,py27,cover,lint
envlist = py26,py27,py33,py34,cover,lint
[testenv]
# share one venv across testenvs, instead of multiple
# .tox/{py26,py27,cover,lint}; but do NOT set envdir to
# {toxinidir}/venv as it will destroy existing dev venv
envdir = {toxinidir}/tox.venv
commands =
pip install -r requirements.txt -e acme -e .[testing] -e letsencrypt-apache -e letsencrypt-nginx
# -q does not suppress errors
@@ -26,6 +22,16 @@ setenv =
PYTHONHASHSEED = 0
# https://testrun.org/tox/latest/example/basic.html#special-handling-of-pythonhas
[testenv:py33]
commands =
pip install -e acme[testing]
nosetests acme
[testenv:py34]
commands =
pip install -e acme[testing]
nosetests acme
[testenv:cover]
basepython = python2.7
commands =