mirror of
https://github.com/certbot/certbot.git
synced 2026-01-23 07:20:55 +03:00
Clean crypto_util module.
This commit is contained in:
@@ -14,41 +14,51 @@ from letsencrypt.client import le_util
|
||||
|
||||
|
||||
def b64_cert_to_pem(b64_der_cert):
|
||||
x = M2Crypto.X509.load_cert_der_string(le_util.b64_url_dec(b64_der_cert))
|
||||
return x.as_pem()
|
||||
return M2Crypto.X509.load_cert_der_string(
|
||||
le_util.b64_url_dec(b64_der_cert)).as_pem()
|
||||
|
||||
def create_sig(msg, key_file, signer_nonce = None, signer_nonce_len = CONFIG.NONCE_SIZE):
|
||||
|
||||
def create_sig(msg, key_file, signer_nonce=None,
|
||||
signer_nonce_len=CONFIG.NONCE_SIZE):
|
||||
# DOES prepend signer_nonce to message
|
||||
# TODO: Change this over to M2Crypto... PKey
|
||||
# Protect against crypto unicode errors... is this sufficient? Do I need to escape?
|
||||
# Protect against crypto unicode errors... is this sufficient?
|
||||
# Do I need to escape?
|
||||
msg = str(msg)
|
||||
key = Crypto.PublicKey.RSA.importKey(open(key_file).read())
|
||||
if signer_nonce is None:
|
||||
signer_nonce = Random.get_random_bytes(signer_nonce_len)
|
||||
h = Crypto.Hash.SHA256.new(signer_nonce + msg)
|
||||
hashed = Crypto.Hash.SHA256.new(signer_nonce + msg)
|
||||
signer = Crypto.Signature.PKCS1_v1_5.new(key)
|
||||
signature = signer.sign(h)
|
||||
signature = signer.sign(hashed)
|
||||
#print "signing:", signer_nonce + msg
|
||||
#print "signature:", signature
|
||||
n, e = key.n, key.e
|
||||
n_bytes = binascii.unhexlify(leading_zeros(hex(n)[2:].replace("L", "")))
|
||||
e_bytes = binascii.unhexlify(leading_zeros(hex(e)[2:].replace("L", "")))
|
||||
n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].replace("L", "")))
|
||||
e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", "")))
|
||||
n_encoded = le_util.b64_url_enc(n_bytes)
|
||||
e_encoded = le_util.b64_url_enc(e_bytes)
|
||||
signer_nonce_encoded = le_util.b64_url_enc(signer_nonce)
|
||||
sig_encoded = le_util.b64_url_enc(signature)
|
||||
jwk = { "kty": "RSA", "n": n_encoded, "e": e_encoded }
|
||||
signature = { "nonce": signer_nonce_encoded, "alg": "RS256", "jwk": jwk, "sig": sig_encoded }
|
||||
jwk = {"kty": "RSA", "n": n_encoded, "e": e_encoded}
|
||||
signature = {
|
||||
"nonce": signer_nonce_encoded,
|
||||
"alg": "RS256",
|
||||
"jwk": jwk,
|
||||
"sig": sig_encoded
|
||||
}
|
||||
# return json.dumps(signature)
|
||||
return (signature)
|
||||
return signature
|
||||
|
||||
def leading_zeros(s):
|
||||
if len(s) % 2:
|
||||
return "0" + s
|
||||
return s
|
||||
|
||||
def sha256(m):
|
||||
return hashlib.sha256(m).hexdigest()
|
||||
def leading_zeros(arg):
|
||||
if len(arg) % 2:
|
||||
return "0" + arg
|
||||
return arg
|
||||
|
||||
|
||||
def sha256(arg):
|
||||
return hashlib.sha256(arg).hexdigest()
|
||||
|
||||
|
||||
# based on M2Crypto unit test written by Toby Allsopp
|
||||
def make_key(bits=CONFIG.RSA_KEY_SIZE):
|
||||
@@ -70,12 +80,12 @@ def make_csr(key_file, domains):
|
||||
"""
|
||||
assert domains, "Must provide one or more hostnames for the CSR."
|
||||
rsa_key = M2Crypto.RSA.load_key(key_file)
|
||||
pk = M2Crypto.EVP.PKey()
|
||||
pk.assign_rsa(rsa_key)
|
||||
pubkey = M2Crypto.EVP.PKey()
|
||||
pubkey.assign_rsa(rsa_key)
|
||||
|
||||
x = M2Crypto.X509.Request()
|
||||
x.set_pubkey(pk)
|
||||
name = x.get_subject()
|
||||
csr = M2Crypto.X509.Request()
|
||||
csr.set_pubkey(pubkey)
|
||||
name = csr.get_subject()
|
||||
name.C = "US"
|
||||
name.ST = "Michigan"
|
||||
name.L = "Ann Arbor"
|
||||
@@ -84,71 +94,80 @@ def make_csr(key_file, domains):
|
||||
name.CN = domains[0]
|
||||
|
||||
extstack = M2Crypto.X509.X509_Extension_Stack()
|
||||
ext = M2Crypto.X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains]))
|
||||
ext = M2Crypto.X509.new_extension(
|
||||
'subjectAltName', ", ".join(["DNS:%s" % d for d in domains]))
|
||||
|
||||
extstack.push(ext)
|
||||
x.add_extensions(extstack)
|
||||
x.sign(pk,'sha256')
|
||||
assert x.verify(pk)
|
||||
pk2 = x.get_pubkey()
|
||||
assert x.verify(pk2)
|
||||
return x.as_pem(), x.as_der()
|
||||
csr.add_extensions(extstack)
|
||||
csr.sign(pubkey, 'sha256')
|
||||
assert csr.verify(pubkey)
|
||||
pubkey2 = csr.get_pubkey()
|
||||
assert csr.verify(pubkey2)
|
||||
return csr.as_pem(), csr.as_der()
|
||||
|
||||
|
||||
def make_ss_cert(key_file, domains):
|
||||
"""
|
||||
Returns new self-signed cert in PEM form using key_file containing all domains
|
||||
"""Returns new self-signed cert in PEM form.
|
||||
|
||||
Uses key_file and contains all domains.
|
||||
"""
|
||||
assert domains, "Must provide one or more hostnames for the CSR."
|
||||
rsa_key = M2Crypto.RSA.load_key(key_file)
|
||||
pk = M2Crypto.EVP.PKey()
|
||||
pk.assign_rsa(rsa_key)
|
||||
pubkey = M2Crypto.EVP.PKey()
|
||||
pubkey.assign_rsa(rsa_key)
|
||||
|
||||
x = M2Crypto.X509.X509()
|
||||
x.set_pubkey(pk)
|
||||
x.set_serial_number(1337)
|
||||
x.set_version(2)
|
||||
cert = M2Crypto.X509.X509()
|
||||
cert.set_pubkey(pubkey)
|
||||
cert.set_serial_number(1337)
|
||||
cert.set_version(2)
|
||||
|
||||
t = long(time.time())
|
||||
current_ts = long(time.time())
|
||||
current = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
current.set_time(t)
|
||||
current.set_time(current_ts)
|
||||
expire = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
expire.set_time((7 * 24 * 60 * 60) + t)
|
||||
x.set_not_before(current)
|
||||
x.set_not_after(expire)
|
||||
expire.set_time((7 * 24 * 60 * 60) + current_ts)
|
||||
cert.set_not_before(current)
|
||||
cert.set_not_after(expire)
|
||||
|
||||
name = x.get_subject()
|
||||
name = cert.get_subject()
|
||||
name.C = "US"
|
||||
name.ST = "Michigan"
|
||||
name.L = "Ann Arbor"
|
||||
name.O = "University of Michigan and the EFF"
|
||||
name.CN = domains[0]
|
||||
x.set_issuer(x.get_subject())
|
||||
cert.set_issuer(cert.get_subject())
|
||||
|
||||
x.add_ext(M2Crypto.X509.new_extension('basicConstraints', 'CA:FALSE'))
|
||||
#x.add_ext(M2Crypto.X509.new_extension('extendedKeyUsage', 'TLS Web Server Authentication'))
|
||||
x.add_ext(M2Crypto.X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains])))
|
||||
cert.add_ext(M2Crypto.X509.new_extension('basicConstraints', 'CA:FALSE'))
|
||||
#cert.add_ext(M2Crypto.X509.new_extension(
|
||||
# 'extendedKeyUsage', 'TLS Web Server Authentication'))
|
||||
cert.add_ext(M2Crypto.X509.new_extension(
|
||||
'subjectAltName', ", ".join(["DNS:%s" % d for d in domains])))
|
||||
|
||||
x.sign(pk, 'sha256')
|
||||
assert x.verify(pk)
|
||||
assert x.verify()
|
||||
cert.sign(pubkey, 'sha256')
|
||||
assert cert.verify(pubkey)
|
||||
assert cert.verify()
|
||||
#print check_purpose(,0
|
||||
return x.as_pem()
|
||||
return cert.as_pem()
|
||||
|
||||
|
||||
def get_cert_info(filename):
|
||||
d = {}
|
||||
# M2Crypto Library only supports RSA right now
|
||||
x = M2Crypto.X509.load_cert(filename)
|
||||
d["not_before"] = x.get_not_before().get_datetime()
|
||||
d["not_after"] = x.get_not_after().get_datetime()
|
||||
d["subject"] = x.get_subject().as_text()
|
||||
d["cn"] = x.get_subject().CN
|
||||
d["issuer"] = x.get_issuer().as_text()
|
||||
d["fingerprint"] = x.get_fingerprint(md='sha1')
|
||||
try:
|
||||
d["san"] = x.get_ext("subjectAltName").get_value()
|
||||
except:
|
||||
d["san"] = ""
|
||||
cert = M2Crypto.X509.load_cert(filename)
|
||||
|
||||
try:
|
||||
san = cert.get_ext("subjectAltName").get_value()
|
||||
except:
|
||||
san = ""
|
||||
|
||||
return {
|
||||
"not_before": cert.get_not_before().get_datetime(),
|
||||
"not_after": cert.get_not_after().get_datetime(),
|
||||
"subject": cert.get_subject().as_text(),
|
||||
"cn": cert.get_subject().CN,
|
||||
"issuer": cert.get_issuer().as_text(),
|
||||
"fingerprint": cert.get_fingerprint(md='sha1'),
|
||||
"san": san,
|
||||
"serial": cert.get_serial_number(),
|
||||
"pub_key": "RSA " + str(cert.get_pubkey().size() * 8),
|
||||
}
|
||||
|
||||
d["serial"] = x.get_serial_number()
|
||||
d["pub_key"] = "RSA " + str(x.get_pubkey().size() * 8)
|
||||
return d
|
||||
|
||||
Reference in New Issue
Block a user