diff --git a/letsencrypt/client/crypto_util.py b/letsencrypt/client/crypto_util.py index 875858351..815e795cd 100644 --- a/letsencrypt/client/crypto_util.py +++ b/letsencrypt/client/crypto_util.py @@ -14,41 +14,51 @@ from letsencrypt.client import le_util def b64_cert_to_pem(b64_der_cert): - x = M2Crypto.X509.load_cert_der_string(le_util.b64_url_dec(b64_der_cert)) - return x.as_pem() + return M2Crypto.X509.load_cert_der_string( + le_util.b64_url_dec(b64_der_cert)).as_pem() -def create_sig(msg, key_file, signer_nonce = None, signer_nonce_len = CONFIG.NONCE_SIZE): + +def create_sig(msg, key_file, signer_nonce=None, + signer_nonce_len=CONFIG.NONCE_SIZE): # DOES prepend signer_nonce to message # TODO: Change this over to M2Crypto... PKey - # Protect against crypto unicode errors... is this sufficient? Do I need to escape? + # Protect against crypto unicode errors... is this sufficient? + # Do I need to escape? msg = str(msg) key = Crypto.PublicKey.RSA.importKey(open(key_file).read()) if signer_nonce is None: signer_nonce = Random.get_random_bytes(signer_nonce_len) - h = Crypto.Hash.SHA256.new(signer_nonce + msg) + hashed = Crypto.Hash.SHA256.new(signer_nonce + msg) signer = Crypto.Signature.PKCS1_v1_5.new(key) - signature = signer.sign(h) + signature = signer.sign(hashed) #print "signing:", signer_nonce + msg #print "signature:", signature - n, e = key.n, key.e - n_bytes = binascii.unhexlify(leading_zeros(hex(n)[2:].replace("L", ""))) - e_bytes = binascii.unhexlify(leading_zeros(hex(e)[2:].replace("L", ""))) + n_bytes = binascii.unhexlify(leading_zeros(hex(key.n)[2:].replace("L", ""))) + e_bytes = binascii.unhexlify(leading_zeros(hex(key.e)[2:].replace("L", ""))) n_encoded = le_util.b64_url_enc(n_bytes) e_encoded = le_util.b64_url_enc(e_bytes) signer_nonce_encoded = le_util.b64_url_enc(signer_nonce) sig_encoded = le_util.b64_url_enc(signature) - jwk = { "kty": "RSA", "n": n_encoded, "e": e_encoded } - signature = { "nonce": signer_nonce_encoded, "alg": "RS256", "jwk": jwk, "sig": sig_encoded } + jwk = {"kty": "RSA", "n": n_encoded, "e": e_encoded} + signature = { + "nonce": signer_nonce_encoded, + "alg": "RS256", + "jwk": jwk, + "sig": sig_encoded + } # return json.dumps(signature) - return (signature) + return signature -def leading_zeros(s): - if len(s) % 2: - return "0" + s - return s -def sha256(m): - return hashlib.sha256(m).hexdigest() +def leading_zeros(arg): + if len(arg) % 2: + return "0" + arg + return arg + + +def sha256(arg): + return hashlib.sha256(arg).hexdigest() + # based on M2Crypto unit test written by Toby Allsopp def make_key(bits=CONFIG.RSA_KEY_SIZE): @@ -70,12 +80,12 @@ def make_csr(key_file, domains): """ assert domains, "Must provide one or more hostnames for the CSR." rsa_key = M2Crypto.RSA.load_key(key_file) - pk = M2Crypto.EVP.PKey() - pk.assign_rsa(rsa_key) + pubkey = M2Crypto.EVP.PKey() + pubkey.assign_rsa(rsa_key) - x = M2Crypto.X509.Request() - x.set_pubkey(pk) - name = x.get_subject() + csr = M2Crypto.X509.Request() + csr.set_pubkey(pubkey) + name = csr.get_subject() name.C = "US" name.ST = "Michigan" name.L = "Ann Arbor" @@ -84,71 +94,80 @@ def make_csr(key_file, domains): name.CN = domains[0] extstack = M2Crypto.X509.X509_Extension_Stack() - ext = M2Crypto.X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains])) + ext = M2Crypto.X509.new_extension( + 'subjectAltName', ", ".join(["DNS:%s" % d for d in domains])) extstack.push(ext) - x.add_extensions(extstack) - x.sign(pk,'sha256') - assert x.verify(pk) - pk2 = x.get_pubkey() - assert x.verify(pk2) - return x.as_pem(), x.as_der() + csr.add_extensions(extstack) + csr.sign(pubkey, 'sha256') + assert csr.verify(pubkey) + pubkey2 = csr.get_pubkey() + assert csr.verify(pubkey2) + return csr.as_pem(), csr.as_der() + def make_ss_cert(key_file, domains): - """ - Returns new self-signed cert in PEM form using key_file containing all domains + """Returns new self-signed cert in PEM form. + + Uses key_file and contains all domains. """ assert domains, "Must provide one or more hostnames for the CSR." rsa_key = M2Crypto.RSA.load_key(key_file) - pk = M2Crypto.EVP.PKey() - pk.assign_rsa(rsa_key) + pubkey = M2Crypto.EVP.PKey() + pubkey.assign_rsa(rsa_key) - x = M2Crypto.X509.X509() - x.set_pubkey(pk) - x.set_serial_number(1337) - x.set_version(2) + cert = M2Crypto.X509.X509() + cert.set_pubkey(pubkey) + cert.set_serial_number(1337) + cert.set_version(2) - t = long(time.time()) + current_ts = long(time.time()) current = M2Crypto.ASN1.ASN1_UTCTIME() - current.set_time(t) + current.set_time(current_ts) expire = M2Crypto.ASN1.ASN1_UTCTIME() - expire.set_time((7 * 24 * 60 * 60) + t) - x.set_not_before(current) - x.set_not_after(expire) + expire.set_time((7 * 24 * 60 * 60) + current_ts) + cert.set_not_before(current) + cert.set_not_after(expire) - name = x.get_subject() + name = cert.get_subject() name.C = "US" name.ST = "Michigan" name.L = "Ann Arbor" name.O = "University of Michigan and the EFF" name.CN = domains[0] - x.set_issuer(x.get_subject()) + cert.set_issuer(cert.get_subject()) - x.add_ext(M2Crypto.X509.new_extension('basicConstraints', 'CA:FALSE')) - #x.add_ext(M2Crypto.X509.new_extension('extendedKeyUsage', 'TLS Web Server Authentication')) - x.add_ext(M2Crypto.X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains]))) + cert.add_ext(M2Crypto.X509.new_extension('basicConstraints', 'CA:FALSE')) + #cert.add_ext(M2Crypto.X509.new_extension( + # 'extendedKeyUsage', 'TLS Web Server Authentication')) + cert.add_ext(M2Crypto.X509.new_extension( + 'subjectAltName', ", ".join(["DNS:%s" % d for d in domains]))) - x.sign(pk, 'sha256') - assert x.verify(pk) - assert x.verify() + cert.sign(pubkey, 'sha256') + assert cert.verify(pubkey) + assert cert.verify() #print check_purpose(,0 - return x.as_pem() + return cert.as_pem() + def get_cert_info(filename): - d = {} # M2Crypto Library only supports RSA right now - x = M2Crypto.X509.load_cert(filename) - d["not_before"] = x.get_not_before().get_datetime() - d["not_after"] = x.get_not_after().get_datetime() - d["subject"] = x.get_subject().as_text() - d["cn"] = x.get_subject().CN - d["issuer"] = x.get_issuer().as_text() - d["fingerprint"] = x.get_fingerprint(md='sha1') - try: - d["san"] = x.get_ext("subjectAltName").get_value() - except: - d["san"] = "" + cert = M2Crypto.X509.load_cert(filename) + + try: + san = cert.get_ext("subjectAltName").get_value() + except: + san = "" + + return { + "not_before": cert.get_not_before().get_datetime(), + "not_after": cert.get_not_after().get_datetime(), + "subject": cert.get_subject().as_text(), + "cn": cert.get_subject().CN, + "issuer": cert.get_issuer().as_text(), + "fingerprint": cert.get_fingerprint(md='sha1'), + "san": san, + "serial": cert.get_serial_number(), + "pub_key": "RSA " + str(cert.get_pubkey().size() * 8), + } - d["serial"] = x.get_serial_number() - d["pub_key"] = "RSA " + str(x.get_pubkey().size() * 8) - return d