mirror of
https://github.com/certbot/certbot.git
synced 2026-01-23 07:20:55 +03:00
Move jose b64 to acme.jose
This commit is contained in:
@@ -1,11 +1,11 @@
|
||||
"""JOSE."""
|
||||
import base64
|
||||
import binascii
|
||||
import zope.interface
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import zope.interface
|
||||
|
||||
from letsencrypt.acme import interfaces
|
||||
from letsencrypt.client import le_util
|
||||
|
||||
|
||||
def _leading_zeros(arg):
|
||||
@@ -43,13 +43,13 @@ class JWK(object):
|
||||
@classmethod
|
||||
def _encode_param(cls, param):
|
||||
"""Encode numeric key parameter."""
|
||||
return le_util.jose_b64encode(binascii.unhexlify(
|
||||
return b64encode(binascii.unhexlify(
|
||||
_leading_zeros(hex(param)[2:].rstrip("L"))))
|
||||
|
||||
@classmethod
|
||||
def _decode_param(cls, param):
|
||||
"""Decode numeric key parameter."""
|
||||
return long(binascii.hexlify(le_util.jose_b64decode(param)), 16)
|
||||
return long(binascii.hexlify(b64decode(param)), 16)
|
||||
|
||||
def to_json(self):
|
||||
"""Serialize to JSON."""
|
||||
@@ -66,3 +66,54 @@ class JWK(object):
|
||||
return cls(Crypto.PublicKey.RSA.construct(
|
||||
(cls._decode_param(json_object["n"]),
|
||||
cls._decode_param(json_object["e"]))))
|
||||
|
||||
|
||||
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
|
||||
#
|
||||
# Jose Base64:
|
||||
#
|
||||
# - URL-safe Base64
|
||||
#
|
||||
# - padding stripped
|
||||
|
||||
|
||||
def b64encode(data):
|
||||
"""JOSE Base64 encode.
|
||||
|
||||
:param data: Data to be encoded.
|
||||
:type data: str or bytearray
|
||||
|
||||
:returns: JOSE Base64 string.
|
||||
:rtype: str
|
||||
|
||||
:raises TypeError: if `data` is of incorrect type
|
||||
|
||||
"""
|
||||
if not isinstance(data, str):
|
||||
raise TypeError('argument should be str or bytearray')
|
||||
return base64.urlsafe_b64encode(data).rstrip('=')
|
||||
|
||||
|
||||
def b64decode(data):
|
||||
"""JOSE Base64 decode.
|
||||
|
||||
:param data: Base64 string to be decoded. If it's unicode, then
|
||||
only ASCII characters are allowed.
|
||||
:type data: str or unicode
|
||||
|
||||
:returns: Decoded data.
|
||||
|
||||
:raises TypeError: if input is of incorrect type
|
||||
:raises ValueError: if input is unicode with non-ASCII characters
|
||||
|
||||
"""
|
||||
if isinstance(data, unicode):
|
||||
try:
|
||||
data = data.encode('ascii')
|
||||
except UnicodeEncodeError:
|
||||
raise ValueError(
|
||||
'unicode argument should contain only ASCII characters')
|
||||
elif not isinstance(data, str):
|
||||
raise TypeError('argument should be a str or unicode')
|
||||
|
||||
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))
|
||||
|
||||
@@ -57,5 +57,71 @@ class JWKTest(unittest.TestCase):
|
||||
JWK.from_json(self.jwk256json)))
|
||||
|
||||
|
||||
# https://en.wikipedia.org/wiki/Base64#Examples
|
||||
B64_PADDING_EXAMPLES = {
|
||||
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
|
||||
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
|
||||
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
|
||||
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
|
||||
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
|
||||
}
|
||||
|
||||
|
||||
B64_URL_UNSAFE_EXAMPLES = {
|
||||
chr(251) + chr(239): '--8',
|
||||
chr(255) * 2: '__8',
|
||||
}
|
||||
|
||||
|
||||
class B64EncodeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.b64encode."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, data):
|
||||
from letsencrypt.acme.jose import b64encode
|
||||
return b64encode(data)
|
||||
|
||||
def test_unsafe_url(self):
|
||||
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(text), b64)
|
||||
|
||||
def test_different_paddings(self):
|
||||
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(text), b64)
|
||||
|
||||
def test_unicode_fails_with_type_error(self):
|
||||
self.assertRaises(TypeError, self._call, u'some unicode')
|
||||
|
||||
|
||||
class B64DecodeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.acme.jose.b64decode."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, data):
|
||||
from letsencrypt.acme.jose import b64decode
|
||||
return b64decode(data)
|
||||
|
||||
def test_unsafe_url(self):
|
||||
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64), text)
|
||||
|
||||
def test_input_without_padding(self):
|
||||
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64), text)
|
||||
|
||||
def test_input_with_padding(self):
|
||||
for text, (b64, pad) in B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64 + pad), text)
|
||||
|
||||
def test_unicode_with_ascii(self):
|
||||
self.assertEqual(self._call(u'YQ'), 'a')
|
||||
|
||||
def test_non_ascii_unicode_fails(self):
|
||||
self.assertRaises(ValueError, self._call, u'\u0105')
|
||||
|
||||
def test_type_error_no_unicode_or_str(self):
|
||||
self.assertRaises(TypeError, self._call, object())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -12,8 +12,6 @@ from letsencrypt.acme import jose
|
||||
from letsencrypt.acme import other
|
||||
from letsencrypt.acme import util
|
||||
|
||||
from letsencrypt.client import le_util
|
||||
|
||||
|
||||
SCHEMATA = dict([
|
||||
(schema, json.load(open(pkg_resources.resource_filename(
|
||||
@@ -186,7 +184,7 @@ class Challenge(Message):
|
||||
def _fields_to_json(self):
|
||||
fields = {
|
||||
"sessionID": self.session_id,
|
||||
"nonce": le_util.jose_b64encode(self.nonce),
|
||||
"nonce": jose.b64encode(self.nonce),
|
||||
"challenges": self.challenges,
|
||||
}
|
||||
if self.combinations:
|
||||
@@ -196,7 +194,7 @@ class Challenge(Message):
|
||||
@classmethod
|
||||
def _valid_from_json(cls, json_object):
|
||||
return cls(json_object["sessionID"],
|
||||
le_util.jose_b64decode(json_object["nonce"]),
|
||||
jose.b64decode(json_object["nonce"]),
|
||||
json_object["challenges"], json_object.get("combinations"))
|
||||
|
||||
|
||||
@@ -339,7 +337,7 @@ class Certificate(Message):
|
||||
|
||||
def _fields_to_json(self):
|
||||
fields = {
|
||||
"certificate": le_util.jose_b64encode(self.certificate.as_der())}
|
||||
"certificate": jose.b64encode(self.certificate.as_der())}
|
||||
if self.chain is not None:
|
||||
fields["chain"] = self.chain
|
||||
if self.refresh is not None:
|
||||
@@ -349,7 +347,7 @@ class Certificate(Message):
|
||||
@classmethod
|
||||
def _valid_from_json(cls, json_object):
|
||||
certificate = M2Crypto.X509.load_cert_der_string(
|
||||
le_util.jose_b64decode(json_object["certificate"]))
|
||||
jose.b64decode(json_object["certificate"]))
|
||||
return cls(certificate,
|
||||
json_object.get("chain"),
|
||||
json_object.get("refresh"))
|
||||
@@ -398,13 +396,13 @@ class CertificateRequest(Message):
|
||||
|
||||
def _fields_to_json(self):
|
||||
return {
|
||||
"csr": le_util.jose_b64encode(self.csr),
|
||||
"csr": jose.b64encode(self.csr),
|
||||
"signature": self.signature,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _valid_from_json(cls, json_object):
|
||||
return cls(le_util.jose_b64decode(json_object["csr"]),
|
||||
return cls(jose.b64decode(json_object["csr"]),
|
||||
other.Signature.from_json(json_object["signature"]))
|
||||
|
||||
|
||||
@@ -524,13 +522,13 @@ class RevocationRequest(Message):
|
||||
|
||||
def _fields_to_json(self):
|
||||
return {
|
||||
"certificate": le_util.jose_b64encode(self.certificate),
|
||||
"certificate": jose.b64encode(self.certificate),
|
||||
"signature": self.signature,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _valid_from_json(cls, json_string):
|
||||
return cls(le_util.jose_b64decode(json_string["certificate"]),
|
||||
return cls(jose.b64decode(json_string["certificate"]),
|
||||
other.Signature.from_json(json_string["signature"]))
|
||||
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ from letsencrypt.acme import interfaces
|
||||
from letsencrypt.acme import jose
|
||||
|
||||
from letsencrypt.client import CONFIG
|
||||
from letsencrypt.client import le_util
|
||||
|
||||
|
||||
class Signature(object):
|
||||
@@ -88,15 +87,14 @@ class Signature(object):
|
||||
"""Seriliaze to JSON."""
|
||||
return {
|
||||
"alg": self.alg,
|
||||
"sig": le_util.jose_b64encode(self.sig),
|
||||
"nonce": le_util.jose_b64encode(self.nonce),
|
||||
"sig": jose.b64encode(self.sig),
|
||||
"nonce": jose.b64encode(self.nonce),
|
||||
"jwk": self.jwk,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_object):
|
||||
"""Deserialize from JSON."""
|
||||
return cls(json_object["alg"],
|
||||
le_util.jose_b64decode(json_object["sig"]),
|
||||
le_util.jose_b64decode(json_object["nonce"]),
|
||||
return cls(json_object["alg"], jose.b64decode(json_object["sig"]),
|
||||
jose.b64decode(json_object["nonce"]),
|
||||
jose.JWK.from_json(json_object["jwk"]))
|
||||
|
||||
@@ -4,9 +4,10 @@ import hashlib
|
||||
|
||||
from Crypto import Random
|
||||
|
||||
from letsencrypt.acme import jose
|
||||
|
||||
from letsencrypt.client import CONFIG
|
||||
from letsencrypt.client import crypto_util
|
||||
from letsencrypt.client import le_util
|
||||
|
||||
|
||||
# Authenticator Challenges
|
||||
@@ -45,7 +46,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key):
|
||||
"""
|
||||
# Generate S
|
||||
dvsni_s = Random.get_random_bytes(CONFIG.S_SIZE)
|
||||
dvsni_r = le_util.jose_b64decode(r_b64)
|
||||
dvsni_r = jose.b64decode(r_b64)
|
||||
|
||||
# Generate extension
|
||||
ext = _dvsni_gen_ext(dvsni_r, dvsni_s)
|
||||
@@ -53,7 +54,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key):
|
||||
cert_pem = crypto_util.make_ss_cert(
|
||||
key.pem, [nonce + CONFIG.INVALID_EXT, name, ext])
|
||||
|
||||
return cert_pem, le_util.jose_b64encode(dvsni_s)
|
||||
return cert_pem, jose.b64encode(dvsni_s)
|
||||
|
||||
|
||||
def _dvsni_gen_ext(dvsni_r, dvsni_s):
|
||||
|
||||
@@ -7,7 +7,7 @@ import Crypto.Signature.PKCS1_v1_5
|
||||
|
||||
import M2Crypto
|
||||
|
||||
from letsencrypt.client import le_util
|
||||
from letsencrypt.acme import jose
|
||||
|
||||
|
||||
def make_csr(key_str, domains):
|
||||
@@ -196,4 +196,4 @@ def get_cert_info(filename):
|
||||
def b64_cert_to_pem(b64_der_cert):
|
||||
"""Convert JOSE Base-64 encoded DER cert to PEM."""
|
||||
return M2Crypto.X509.load_cert_der_string(
|
||||
le_util.jose_b64decode(b64_der_cert)).as_pem()
|
||||
jose.b64decode(b64_der_cert)).as_pem()
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
"""Utilities for all Let's Encrypt."""
|
||||
import base64
|
||||
import errno
|
||||
import os
|
||||
import stat
|
||||
@@ -68,54 +67,3 @@ def unique_file(path, mode=0o777):
|
||||
except OSError:
|
||||
pass
|
||||
count += 1
|
||||
|
||||
|
||||
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
|
||||
#
|
||||
# Jose Base64:
|
||||
#
|
||||
# - URL-safe Base64
|
||||
#
|
||||
# - padding stripped
|
||||
|
||||
|
||||
def jose_b64encode(data):
|
||||
"""JOSE Base64 encode.
|
||||
|
||||
:param data: Data to be encoded.
|
||||
:type data: str or bytearray
|
||||
|
||||
:returns: JOSE Base64 string.
|
||||
:rtype: str
|
||||
|
||||
:raises TypeError: if `data` is of incorrect type
|
||||
|
||||
"""
|
||||
if not isinstance(data, str):
|
||||
raise TypeError('argument should be str or bytearray')
|
||||
return base64.urlsafe_b64encode(data).rstrip('=')
|
||||
|
||||
|
||||
def jose_b64decode(data):
|
||||
"""JOSE Base64 decode.
|
||||
|
||||
:param data: Base64 string to be decoded. If it's unicode, then
|
||||
only ASCII characters are allowed.
|
||||
:type data: str or unicode
|
||||
|
||||
:returns: Decoded data.
|
||||
|
||||
:raises TypeError: if input is of incorrect type
|
||||
:raises ValueError: if input is unicode with non-ASCII characters
|
||||
|
||||
"""
|
||||
if isinstance(data, unicode):
|
||||
try:
|
||||
data = data.encode('ascii')
|
||||
except UnicodeEncodeError:
|
||||
raise ValueError(
|
||||
'unicode argument should contain only ASCII characters')
|
||||
elif not isinstance(data, str):
|
||||
raise TypeError('argument should be a str or unicode')
|
||||
|
||||
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))
|
||||
|
||||
@@ -6,10 +6,11 @@ import unittest
|
||||
|
||||
import M2Crypto
|
||||
|
||||
from letsencrypt.acme import jose
|
||||
|
||||
from letsencrypt.client import challenge_util
|
||||
from letsencrypt.client import client
|
||||
from letsencrypt.client import CONFIG
|
||||
from letsencrypt.client import le_util
|
||||
|
||||
|
||||
class DvsniGenCertTest(unittest.TestCase):
|
||||
@@ -20,7 +21,7 @@ class DvsniGenCertTest(unittest.TestCase):
|
||||
"""Basic test for straightline code."""
|
||||
domain = "example.com"
|
||||
dvsni_r = "r_value"
|
||||
r_b64 = le_util.jose_b64encode(dvsni_r)
|
||||
r_b64 = jose.b64encode(dvsni_r)
|
||||
pem = pkg_resources.resource_string(
|
||||
__name__, os.path.join("testdata", "rsa256_key.pem"))
|
||||
key = client.Client.Key("path", pem)
|
||||
@@ -29,7 +30,7 @@ class DvsniGenCertTest(unittest.TestCase):
|
||||
|
||||
# pylint: disable=protected-access
|
||||
ext = challenge_util._dvsni_gen_ext(
|
||||
dvsni_r, le_util.jose_b64decode(s_b64))
|
||||
dvsni_r, jose.b64decode(s_b64))
|
||||
self._standard_check_cert(cert_pem, domain, nonce, ext)
|
||||
|
||||
def _standard_check_cert(self, pem, domain, nonce, ext):
|
||||
|
||||
@@ -121,71 +121,5 @@ class UniqueFileTest(unittest.TestCase):
|
||||
self.assertTrue(basename3.endswith('foo.txt'))
|
||||
|
||||
|
||||
# https://en.wikipedia.org/wiki/Base64#Examples
|
||||
JOSE_B64_PADDING_EXAMPLES = {
|
||||
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
|
||||
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
|
||||
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
|
||||
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
|
||||
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
|
||||
}
|
||||
|
||||
|
||||
B64_URL_UNSAFE_EXAMPLES = {
|
||||
chr(251) + chr(239): '--8',
|
||||
chr(255) * 2: '__8',
|
||||
}
|
||||
|
||||
|
||||
class JOSEB64EncodeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.le_util.jose_b64encode."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, data):
|
||||
from letsencrypt.client.le_util import jose_b64encode
|
||||
return jose_b64encode(data)
|
||||
|
||||
def test_unsafe_url(self):
|
||||
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(text), b64)
|
||||
|
||||
def test_different_paddings(self):
|
||||
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(text), b64)
|
||||
|
||||
def test_unicode_fails_with_type_error(self):
|
||||
self.assertRaises(TypeError, self._call, u'some unicode')
|
||||
|
||||
|
||||
class JOSEB64DecodeTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.le_util.jose_b64decode."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, data):
|
||||
from letsencrypt.client.le_util import jose_b64decode
|
||||
return jose_b64decode(data)
|
||||
|
||||
def test_unsafe_url(self):
|
||||
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64), text)
|
||||
|
||||
def test_input_without_padding(self):
|
||||
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64), text)
|
||||
|
||||
def test_input_with_padding(self):
|
||||
for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems():
|
||||
self.assertEqual(self._call(b64 + pad), text)
|
||||
|
||||
def test_unicode_with_ascii(self):
|
||||
self.assertEqual(self._call(u'YQ'), 'a')
|
||||
|
||||
def test_non_ascii_unicode_fails(self):
|
||||
self.assertRaises(ValueError, self._call, u'\u0105')
|
||||
|
||||
def test_type_error_no_unicode_or_str(self):
|
||||
self.assertRaises(TypeError, self._call, object())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user