diff --git a/trustify/client/CONFIG.py b/trustify/client/CONFIG.py index f261e31f7..af817a793 100644 --- a/trustify/client/CONFIG.py +++ b/trustify/client/CONFIG.py @@ -10,6 +10,8 @@ BACKUP_DIR = WORK_DIR + "backups/" TEMP_CHECKPOINT_DIR = WORK_DIR + "temp_checkpoint/" # Directory used before a permanent checkpoint is finalized IN_PROGRESS_DIR = BACKUP_DIR + "IN_PROGRESS/" +# Directory where all certificates/keys are stored - used for easy revocation +CERT_KEY_BACKUP = WORK_DIR + "keys-certs/" # Where all keys should be stored KEY_DIR = SERVER_ROOT + "ssl/" # Certificate storage @@ -28,12 +30,15 @@ APACHE_CHALLENGE_CONF = CONFIG_DIR + "choc_sni_cert_challenge.conf" S_SIZE = 32 NONCE_SIZE = 16 +# Key Sizes +RSA_KEY_SIZE = 2048 + # bits of hashcash to generate difficulty = 23 # Trustify cert and chain files -cert_file = CERT_DIR + "trustify-cert.pem" -chain_file = CERT_DIR + "trustify-chain.pem" +CERT_PATH = CERT_DIR + "trustify-cert.pem" +CHAIN_PATH = CERT_DIR + "trustify-chain.pem" #Invalid Extension INVALID_EXT = ".acme.invalid" diff --git a/trustify/client/client.py b/trustify/client/client.py index e4c92ec85..268254b3e 100755 --- a/trustify/client/client.py +++ b/trustify/client/client.py @@ -10,7 +10,7 @@ import M2Crypto import urllib2, json # XXX TODO: per https://docs.google.com/document/pub?id=1roBIeSJsYq3Ntpf6N0PIeeAAvu4ddn7mGo6Qb7aL7ew, urllib2 is unsafe (!) and must be replaced import os, grp, pwd, sys, time, random, sys, shutil -import hashlib, binascii, jose, csv +import jose, csv import subprocess from M2Crypto import EVP, X509, RSA from Crypto.Random import get_random_bytes @@ -23,15 +23,14 @@ from trustify.client.sni_challenge import SNI_Challenge from trustify.client.payment_challenge import Payment_Challenge from trustify.client import configurator from trustify.client import logger -from trustify.client import trustify_util -from trustify.client.CONFIG import NONCE_SIZE, CERT_PATH, CHAIN_PATH +from trustify.client import trustify_util, crypto_util, display +from trustify.client.CONFIG import NONCE_SIZE, RSA_KEY_SIZE, CERT_PATH, CHAIN_PATH from trustify.client.CONFIG import SERVER_ROOT, KEY_DIR, CERT_DIR, CERT_KEY_BACKUP from trustify.client.CONFIG import CHALLENGE_PREFERENCES, EXCLUSIVE_CHALLENGES # it's weird to point to chocolate servers via raw IPv6 addresses, and such # addresses can be %SCARY in some contexts, so out of paranoia let's disable # them by default allow_raw_ipv6_server = False -RSA_KEY_SIZE = 2048 class Client(object): # In case of import, dialog needs scope over the class @@ -97,8 +96,6 @@ class Client(object): challenge_dict = self.is_expected_msg(challenge_dict, "challenge") - - #assert self.is_challenge(challenge_dict) #Perform Challenges @@ -124,6 +121,7 @@ class Client(object): self.install_certificate(certificate_dict, vhost) # Perform optimal config changes + self.optimize_config(vhost) self.config.save("Completed Augeas Authentication") @@ -133,20 +131,20 @@ class Client(object): def revoke(self, c): - x = M2Crypto.X509.load_cert(c["cert_file"]) + x = M2Crypto.X509.load_cert(c["backup_cert_file"]) cert_der = x.as_der() #self.find_key_for_cert() - revocation_dict = self.send(self.revocation_request(cert_der)) + revocation_dict = self.send(self.revocation_request(c["backup_key_file"], cert_der)) revocation_dict = self.is_expected_msg(revocation_dict, "revocation") self.d.msgbox("You have successfully revoked the certificate for %s" % c["cn"], width=70, height=16) - self.remove_cert_key(c["cert_file"], c["key_file"]) + self.remove_cert_key(c) sys.exit(0) - def remove_cert_key(self, c_file, k_file): + def remove_cert_key(self, c): list_file = CERT_KEY_BACKUP + "LIST" list_file2 = CERT_KEY_BACKUP + "LIST.tmp" with open(list_file, 'rb') as orgfile: @@ -154,12 +152,15 @@ class Client(object): with open(list_file2, 'wb') as newfile: csvwriter = csv.writer(newfile) for row in csvreader: - if not (row[1] == c_file and row[2] == k_file): + if not (row[0] == str(c["idx"]) and row[1] == c["orig_cert_file"] and row[2] == c["orig_key_file"]): csvwriter.writerow(row) + + # remember that these are shutil.copy2(list_file2, list_file) - os.remove(c_file) - os.remove(k_file) + os.remove(list_file2) + os.remove(c['backup_cert_file']) + os.remove(c['backup_key_file']) def store_revocation_token(self, token): return @@ -202,9 +203,12 @@ class Client(object): with open(list_file, 'rb') as csvfile: csvreader = csv.reader(csvfile) for row in csvreader: - c = trustify_util.get_cert_info(row[1]) - c["key_file"] = row[2] - c["cert_file"] = row[1] + c = crypto_util.get_cert_info(row[1]) + c["orig_key_file"] = row[2] + c["orig_cert_file"] = row[1] + c["backup_key_file"] = CERT_KEY_BACKUP + os.path.basename(row[2]) + "_" + row[0] + c["backup_cert_file"] = CERT_KEY_BACKUP + os.path.basename(row[1]) + "_" + row[0] + c["idx"] = int(row[0]) certs.append(c) self.__display_certs(certs) @@ -219,47 +223,30 @@ class Client(object): if code == self.d.DIALOG_OK: - self.__confirm_revocation(certs[int(selection)-1]) + if display.confirm_revocation(certs[int(selection)-1]): + self.revoke(c) + elif code == self.d.DIALOG_CANCEL: exit(0) elif code == "help": - self.__more_info_cert(certs[int(selection)-1]) + display.more_info_cert(certs[int(selection)-1]) - def __more_info_cert(self, cert): - text = "Certificate Information:\n" - text += "-" * 66 - text += trustify_util.cert_info_string(cert) - text += "-" * 66 - self.d.msgbox(text, width=70, height=16) - - def __confirm_revocation(self, cert): - text = "Are you sure you would like to revoke the following certificate:\n" - text += "-" * 66 + "\n" - text += trustify_util.cert_info_string(cert) - text += "-" * 66 - text += "This action cannot be reversed!" - a = self.d.yesno(text, width=70, height=16) - if a == self.d.DIALOG_OK: - self.revoke(cert) - + - def revocation_request(self, cert_der): - return {"type":"revocationRequest", "certificate":jose.b64encode_url(cert_der), "signature":self.create_sig(cert_der)} - - def convert_b64_cert_to_pem(self, b64_der_cert): - x = M2Crypto.X509.load_cert_der_string(jose.b64decode_url(b64_der_cert)) - return x.as_pem() + def revocation_request(self, key_file, cert_der): + return {"type":"revocationRequest", "certificate":jose.b64encode_url(cert_der), "signature":crypto_util.create_sig(cert_der, key_file)} + def install_certificate(self, certificate_dict, vhost): cert_chain_abspath = None cert_fd, self.cert_file = trustify_util.unique_file(CERT_PATH, 644) - cert_fd.write(self.convert_b64_cert_to_pem(certificate_dict["certificate"])) + cert_fd.write(crypto_util.convert_b64_cert_to_pem(certificate_dict["certificate"])) cert_fd.close() logger.info("Server issued certificate; certificate written to %s" % self.cert_file) if certificate_dict.get("chain", None): chain_fd, chain_fn = trustify_util.unique_file(CHAIN_PATH, 644) for c in certificate_dict.get("chain", []): - chain_fd.write(self.convert_b64_cert_to_pem(c)) + chain_fd.write(crypto_util.convert_b64_cert_to_pem(c)) chain_fd.close() logger.info("Cert chain written to %s" % chain_fn) @@ -277,19 +264,18 @@ class Client(object): # sites may have been enabled / final cleanup self.config.restart(quiet=self.curses) - if self.curses: - self.d.msgbox("\nCongratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!", width=70) - if self.by_default(): - self.config.enable_mod("rewrite") - self.redirect_to_ssl(vhost) - self.config.restart(quiet=self.curses) - else: - logger.info("Congratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!") + display.success_installation(self.curses, self.names) + def optimize_config(self, vhost): + if display.redirect_by_default(self.curses): + self.config.enable_mod("rewrite") + self.redirect_to_ssl(vhost) + self.config.restart(quiet=self.curses) + def certificate_request(self, csr_der, key): logger.info("Preparing and sending CSR..") - return {"type":"certificateRequest", "csr":jose.b64encode_url(csr_der), "signature":self.create_sig(csr_der)} + return {"type":"certificateRequest", "csr":jose.b64encode_url(csr_der), "signature":crypto_util.create_sig(csr_der, self.key_file)} def cleanup_challenges(self, challenge_objs): logger.info("Cleaning up challenges...") @@ -307,6 +293,11 @@ class Client(object): logger.info("Waiting for %d seconds..." % delay) time.sleep(delay) msg_dict = self.send(self.status_request(msg_dict["token"])) + else: + logger.fatal("Received unexpected message") + logger.fatal("Expected: %s" % expected) + logger.fatal("Received: " + msg_dict) + sys.exit(33) logger.error("Server has deferred past the max of %d seconds" % (rounds * delay)) return None @@ -314,45 +305,15 @@ class Client(object): def authorization_request(self, id, name, server_nonce, responses): auth_req = {"type":"authorizationRequest", "sessionID":id, "nonce":server_nonce} - auth_req["signature"] = self.create_sig(name + jose.b64decode_url(server_nonce)) + auth_req["signature"] = crypto_util.create_sig(name + jose.b64decode_url(server_nonce), self.key_file) auth_req["responses"] = responses return auth_req def status_request(self, token): return {"type":"statusRequest", "token":token} - - def __leading_zeros(self, s): - if len(s) % 2: - return "0" + s - return s - - def create_sig(self, msg, signer_nonce = None, signer_nonce_len = NONCE_SIZE): - # DOES prepend signer_nonce to message - # TODO: Change this over to M2Crypto... PKey - # Protect against crypto unicode errors... is this sufficient? Do I need to escape? - msg = str(msg) - key = RSA.importKey(open(self.key_file).read()) - if signer_nonce is None: - signer_nonce = get_random_bytes(signer_nonce_len) - h = SHA256.new(signer_nonce + msg) - signer = PKCS1_v1_5.new(key) - signature = signer.sign(h) - #print "signing:", signer_nonce + msg - #print "signature:", signature - n, e = key.n, key.e - n_bytes = binascii.unhexlify(self.__leading_zeros(hex(n)[2:].replace("L", ""))) - e_bytes = binascii.unhexlify(self.__leading_zeros(hex(e)[2:].replace("L", ""))) - n_encoded = jose.b64encode_url(n_bytes) - e_encoded = jose.b64encode_url(e_bytes) - signer_nonce_encoded = jose.b64encode_url(signer_nonce) - sig_encoded = jose.b64encode_url(signature) - jwk = { "kty": "RSA", "n": n_encoded, "e": e_encoded } - signature = { "nonce": signer_nonce_encoded, "alg": "RS256", "jwk": jwk, "sig": sig_encoded } - # return json.dumps(signature) - return (signature) - + def challenge_request(self, names): - logger.info("Temporarily only enabling one name") + #logger.info("Temporarily only enabling one name") return {"type":"challengeRequest", "identifier": names[0]} def verify_identity(self, c): @@ -510,7 +471,7 @@ class Client(object): key_pem = None csr_pem = None if not self.key_file: - key_pem = self.make_key(RSA_KEY_SIZE) + key_pem = crypto_util.make_key(RSA_KEY_SIZE) # Save file trustify_util.make_or_verify_dir(KEY_DIR, 0700) key_f, self.key_file = trustify_util.unique_file(KEY_DIR + "key-trustify.pem", 0600) @@ -525,7 +486,7 @@ class Client(object): sys.exit(1) if not self.csr_file: - csr_pem, csr_der = trustify_util.make_csr(self.key_file, self.names) + csr_pem, csr_der = crypto_util.make_csr(self.key_file, self.names) # Save CSR trustify_util.make_or_verify_dir(CERT_DIR, 0755) csr_f, self.csr_file = trustify_util.unique_file(CERT_DIR + "csr-trustify.pem", 0644) @@ -533,6 +494,7 @@ class Client(object): csr_f.close() logger.info("Creating CSR: %s" % self.csr_file) else: + #TODO fix this der situation try: csr_pem = open(self.csr_file).read().replace("\r", "") except: @@ -544,48 +506,15 @@ class Client(object): return key_pem, csr_pem - - - # based on M2Crypto unit test written by Toby Allsopp - - def make_key(self, bits=RSA_KEY_SIZE): - """ - Returns new RSA key in PEM form with specified bits - """ - rsa = RSA.gen_key(bits, 65537) - key_pem = rsa.as_pem(cipher=None) - rsa = None # should not be freed here - - return key_pem - - def __rsa_sign(self, key, data): - """ - Sign this data with this private key. For client-side use. - - @type key: str - @param key: PEM-encoded string of the private key. - - @type data: str - @param data: The data to be signed. Will be hashed (sha256) prior to - signing. - - @return: binary string of the signature - """ - key = str(key) - data = str(data) - privkey = M2Crypto.RSA.load_key_string(key) - return privkey.sign(hashlib.sha256(data).digest(), 'sha256') - def filter_names(self, names): - choices = [(n, "", 1) for n in names] + choices = [(n, "", 0) for n in names] result = self.d.checklist("Which names would you like to activate HTTPS for?", choices=choices) if result[0] != 0 or not result[1]: sys.exit(1) return result[1] def choice_of_ca(self): - choices = self.get_cas() result = self.d.menu("Pick a Certificate Authority. They're all unique and special!", width=70, choices=choices) @@ -611,11 +540,11 @@ class Client(object): else: EV_choices.append(choice) - random.shuffle(DV_choices) - random.shuffle(OV_choices) - random.shuffle(EV_choices) + # random.shuffle(DV_choices) + # random.shuffle(OV_choices) + # random.shuffle(EV_choices) choices = DV_choices + OV_choices + EV_choices - #choices = [line.split(";", 1) for line in f] + except IOError as e: logger.fatal("Unable to find .ca_offerings file") sys.exit(1) @@ -664,34 +593,6 @@ class Client(object): except: return False - def gen_https_names(self, domains): - """ - Returns a string of the domains formatted nicely with https:// prepended - to each - """ - result = "" - if len(domains) > 2: - for i in range(len(domains)-1): - result = result + "https://" + domains[i] + ", " - result = result + "and " - if len(domains) == 2: - return "https://" + domains[0] + " and https://" + domains[1] - - if domains: - result = result + "https://" + domains[len(domains)-1] - return result - - def by_default(self): - d = dialog.Dialog() - choices = [("Easy", "Allow both HTTP and HTTPS access to these sites"), ("Secure", "Make all requests redirect to secure HTTPS access")] - result = d.menu("Please choose whether HTTPS access is required or optional.", width=70, choices=choices) - if result[0] != 0: - sys.exit(1) - return result[1] == "Secure" - -def sha256(m): - return hashlib.sha256(m).hexdigest() - def old_cert(cert_filename, days_left): cert = M2Crypto.X509.load_cert(cert_filename) diff --git a/trustify/client/configurator.py b/trustify/client/configurator.py index 6b0628f90..1e85fe2b1 100644 --- a/trustify/client/configurator.py +++ b/trustify/client/configurator.py @@ -12,7 +12,7 @@ import errno from trustify.client.CONFIG import SERVER_ROOT, BACKUP_DIR from trustify.client.CONFIG import REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR from trustify.client.CONFIG import TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR -from trustify.client.CONFIG import OPTIONS_SSL_CONF +from trustify.client.CONFIG import OPTIONS_SSL_CONF, TRUSTIFY_VHOST_EXT from trustify.client import logger, trustify_util #from CONFIG import SERVER_ROOT, BACKUP_DIR, REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR, TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR, OPTIONS_SSL_CONF, TRUSTIFY_VHOST_EXT #import logger, trustify_util diff --git a/trustify/client/crypto_util.py b/trustify/client/crypto_util.py new file mode 100644 index 000000000..672c8fbd4 --- /dev/null +++ b/trustify/client/crypto_util.py @@ -0,0 +1,151 @@ +import M2Crypto +import time, jose, binascii +import hashlib +from Crypto.Random import get_random_bytes +from Crypto.PublicKey import RSA +from Crypto.Signature import PKCS1_v1_5 +from Crypto.Hash import SHA256 +from M2Crypto import EVP, X509, ASN1 + + +from trustify.client import logger +from trustify.client.CONFIG import NONCE_SIZE, RSA_KEY_SIZE + + +def convert_b64_cert_to_pem(b64_der_cert): + x = M2Crypto.X509.load_cert_der_string(jose.b64decode_url(b64_der_cert)) + return x.as_pem() + +def create_sig(msg, key_file, signer_nonce = None, signer_nonce_len = NONCE_SIZE): + # DOES prepend signer_nonce to message + # TODO: Change this over to M2Crypto... PKey + # Protect against crypto unicode errors... is this sufficient? Do I need to escape? + msg = str(msg) + key = RSA.importKey(open(key_file).read()) + if signer_nonce is None: + signer_nonce = get_random_bytes(signer_nonce_len) + h = SHA256.new(signer_nonce + msg) + signer = PKCS1_v1_5.new(key) + signature = signer.sign(h) + #print "signing:", signer_nonce + msg + #print "signature:", signature + n, e = key.n, key.e + n_bytes = binascii.unhexlify(leading_zeros(hex(n)[2:].replace("L", ""))) + e_bytes = binascii.unhexlify(leading_zeros(hex(e)[2:].replace("L", ""))) + n_encoded = jose.b64encode_url(n_bytes) + e_encoded = jose.b64encode_url(e_bytes) + signer_nonce_encoded = jose.b64encode_url(signer_nonce) + sig_encoded = jose.b64encode_url(signature) + jwk = { "kty": "RSA", "n": n_encoded, "e": e_encoded } + signature = { "nonce": signer_nonce_encoded, "alg": "RS256", "jwk": jwk, "sig": sig_encoded } + # return json.dumps(signature) + return (signature) + +def leading_zeros(s): + if len(s) % 2: + return "0" + s + return s + +def sha256(m): + return hashlib.sha256(m).hexdigest() + +# based on M2Crypto unit test written by Toby Allsopp +def make_key(bits=RSA_KEY_SIZE): + """ + Returns new RSA key in PEM form with specified bits + """ + rsa = M2Crypto.RSA.gen_key(bits, 65537) + key_pem = rsa.as_pem(cipher=None) + rsa = None # should not be freed here + + return key_pem + + +def make_csr(key_file, domains): + """ + Returns new CSR in PEM and DER form using key_file containing all domains + """ + assert domains, "Must provide one or more hostnames for the CSR." + rsa_key = M2Crypto.RSA.load_key(key_file) + pk = EVP.PKey() + pk.assign_rsa(rsa_key) + + x = X509.Request() + x.set_pubkey(pk) + name = x.get_subject() + name.C = "US" + name.ST = "Michigan" + name.L = "Ann Arbor" + name.O = "EFF" + name.OU = "University of Michigan" + name.CN = domains[0] + + extstack = X509.X509_Extension_Stack() + ext = X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains])) + + extstack.push(ext) + x.add_extensions(extstack) + x.sign(pk,'sha256') + assert x.verify(pk) + pk2 = x.get_pubkey() + assert x.verify(pk2) + return x.as_pem(), x.as_der() + +def make_ss_cert(key_file, domains): + """ + Returns new self-signed cert in PEM form using key_file containing all domains + """ + assert domains, "Must provide one or more hostnames for the CSR." + rsa_key = M2Crypto.RSA.load_key(key_file) + pk = EVP.PKey() + pk.assign_rsa(rsa_key) + + x = X509.X509() + x.set_pubkey(pk) + x.set_serial_number(1337) + x.set_version(2) + + t = long(time.time()) + current = ASN1.ASN1_UTCTIME() + current.set_time(t) + expire = ASN1.ASN1_UTCTIME() + expire.set_time((7 * 24 * 60 * 60) + t) + x.set_not_before(current) + x.set_not_after(expire) + + name = x.get_subject() + name.C = "US" + name.ST = "Michigan" + name.L = "Ann Arbor" + name.O = "University of Michigan and the EFF" + name.CN = domains[0] + x.set_issuer(x.get_subject()) + + x.add_ext(X509.new_extension('basicConstraints', 'CA:FALSE')) + #x.add_ext(X509.new_extension('extendedKeyUsage', 'TLS Web Server Authentication')) + x.add_ext(X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains]))) + + x.sign(pk, 'sha256') + assert x.verify(pk) + assert x.verify() + #print check_purpose(,0 + return x.as_pem() + +def get_cert_info(filename): + d = {} + # M2Crypto Library only supports RSA right now + x = M2Crypto.X509.load_cert(filename) + d["not_before"] = x.get_not_before().get_datetime() + d["not_after"] = x.get_not_after().get_datetime() + d["subject"] = x.get_subject().as_text() + d["cn"] = x.get_subject().CN + d["issuer"] = x.get_issuer().as_text() + d["fingerprint"] = x.get_fingerprint(md='sha1') + try: + d["san"] = x.get_ext("subjectAltName").get_value() + except: + d["san"] = "" + + d["serial"] = x.get_serial_number() + d["pub_key"] = "RSA " + str(x.get_pubkey().size() * 8) + return d diff --git a/trustify/client/display.py b/trustify/client/display.py new file mode 100644 index 000000000..5b0ca962a --- /dev/null +++ b/trustify/client/display.py @@ -0,0 +1,76 @@ +import dialog +from trustify.client import logger + +d = dialog.Dialog() + +WIDTH = 70 +HEIGHT = 16 + + +# TODO: This whole class really needs to be refactored into two classes - one for curses one for text + +def success_installation(curses, domains): + if curses: + d.msgbox("\nCongratulations! You have successfully enabled " + gen_https_names(domains) + "!", width=WIDTH) + else: + logger.info("Congratulations! You have successfully enabled " + gen_https_names(domains) + "!") + +def redirect_by_default(curses): + if curses: + choices = [("Easy", "Allow both HTTP and HTTPS access to these sites"), ("Secure", "Make all requests redirect to secure HTTPS access")] + result = d.menu("Please choose whether HTTPS access is required or optional.", width=WIDTH, choices=choices) + if result[0] != 0: + return False + return result[1] == "Secure" + + else: + ans = raw_input("Would you like to redirect all normal HTTP traffic to HTTPS? y/n") + return ans.startswith('y') or ans.startswith('Y') + + +def gen_https_names(domains): + """ + Returns a string of the domains formatted nicely with https:// prepended + to each + """ + result = "" + if len(domains) > 2: + for i in range(len(domains)-1): + result = result + "https://" + domains[i] + ", " + result = result + "and " + if len(domains) == 2: + return "https://" + domains[0] + " and https://" + domains[1] + + if domains: + result = result + "https://" + domains[len(domains)-1] + return result + + +def confirm_revocation(cert): + text = "Are you sure you would like to revoke the following certificate:\n" + text += cert_info_frame(cert) + text += "This action cannot be reversed!" + a = d.yesno(text, width=WIDTH, height=HEIGHT) + return a == d.DIALOG_OK + +def cert_info_frame(cert): + text = "-" * (WIDTH - 4) + "\n" + text += cert_info_string(cert) + text += "-" * (WIDTH - 4) + return text + +def more_info_cert(cert): + text = "Certificate Information:\n" + text += cert_info_frame(cert) + d.msgbox(text, width=WIDTH, height=HEIGHT) + +def cert_info_string(cert): + text = "Subject: %s\n" % cert["subject"] + text += "SAN: %s\n" % cert["san"] + text += "Issuer: %s\n" % cert["issuer"] + text += "Public Key: %s\n" % cert["pub_key"] + text += "Not Before: %s\n" % str(cert["not_before"]) + text += "Not After: %s\n" % str(cert["not_after"]) + text += "Serial Number: %s\n" % cert["serial"] + text += "SHA1: %s\n" % cert["fingerprint"] + return text diff --git a/trustify/client/sni_challenge.py b/trustify/client/sni_challenge.py index 1fff63f70..4c1607cf2 100755 --- a/trustify/client/sni_challenge.py +++ b/trustify/client/sni_challenge.py @@ -17,7 +17,7 @@ from trustify.client import configurator from trustify.client.CONFIG import CONFIG_DIR, WORK_DIR, SERVER_ROOT from trustify.client.CONFIG import CHOC_CERT_CONF, OPTIONS_SSL_CONF, APACHE_CHALLENGE_CONF, INVALID_EXT from trustify.client.CONFIG import S_SIZE, NONCE_SIZE -from trustify.client import logger, trustify_util +from trustify.client import logger, crypto_util from trustify.client.challenge import Challenge # import configurator @@ -141,7 +141,7 @@ DocumentRoot " + CONFIG_DIR + "challenge_page/ \n \ self.createCHOC_CERT_CONF(name, ext) self.configurator.register_file_creation(True, self.getDvsniCertFile(nonce)) - cert_pem = trustify_util.make_ss_cert(key, [nonce + INVALID_EXT, name, ext]) + cert_pem = crypto_util.make_ss_cert(key, [nonce + INVALID_EXT, name, ext]) with open(self.getDvsniCertFile(nonce), 'w') as f: f.write(cert_pem) diff --git a/trustify/client/trustify_util.py b/trustify/client/trustify_util.py index c5c76d9b9..f93c963a1 100644 --- a/trustify/client/trustify_util.py +++ b/trustify/client/trustify_util.py @@ -4,75 +4,10 @@ import stat import os, pwd, grp import M2Crypto import time -from M2Crypto import EVP, X509, RSA, ASN1 from trustify.client import logger #import logger -def make_csr(key_file, domains): - """ - Returns new CSR in PEM and DER form using key_file containing all domains - """ - assert domains, "Must provide one or more hostnames for the CSR." - rsa_key = M2Crypto.RSA.load_key(key_file) - pk = EVP.PKey() - pk.assign_rsa(rsa_key) - - x = X509.Request() - x.set_pubkey(pk) - name = x.get_subject() - name.CN = domains[0] - - extstack = X509.X509_Extension_Stack() - ext = X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains])) - - extstack.push(ext) - x.add_extensions(extstack) - x.sign(pk,'sha256') - assert x.verify(pk) - pk2 = x.get_pubkey() - assert x.verify(pk2) - return x.as_pem(), x.as_der() - -def make_ss_cert(key_file, domains): - """ - Returns new self-signed cert in PEM form using key_file containing all domains - """ - assert domains, "Must provide one or more hostnames for the CSR." - rsa_key = M2Crypto.RSA.load_key(key_file) - pk = EVP.PKey() - pk.assign_rsa(rsa_key) - - x = X509.X509() - x.set_pubkey(pk) - x.set_serial_number(1337) - x.set_version(2) - - t = long(time.time()) - current = ASN1.ASN1_UTCTIME() - current.set_time(t) - expire = ASN1.ASN1_UTCTIME() - expire.set_time((7 * 24 * 60 * 60) + t) - x.set_not_before(current) - x.set_not_after(expire) - - name = x.get_subject() - name.C = "US" - name.ST = "Michigan" - name.L = "Ann Arbor" - name.O = "University of Michigan and the EFF" - name.CN = domains[0] - x.set_issuer(x.get_subject()) - - x.add_ext(X509.new_extension('basicConstraints', 'CA:FALSE')) - #x.add_ext(X509.new_extension('extendedKeyUsage', 'TLS Web Server Authentication')) - x.add_ext(X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains]))) - - x.sign(pk, 'sha256') - assert x.verify(pk) - assert x.verify() - #print check_purpose(,0 - return x.as_pem() def make_or_verify_dir(directory, permissions=0755, uid=0): try: @@ -107,31 +42,6 @@ def unique_file(default_name, mode = 0777): count += 1 -def get_cert_info(filename): - d = {} - # M2Crypto Library only supports RSA right now - x = M2Crypto.X509.load_cert(filename) - d["not_before"] = x.get_not_before().get_datetime() - d["not_after"] = x.get_not_after().get_datetime() - d["subject"] = x.get_subject().as_text() - d["cn"] = x.get_subject().CN - d["issuer"] = x.get_issuer().as_text() - d["fingerprint"] = x.get_fingerprint(md='sha1') - d["san"] = x.get_ext("subjectAltName").get_value() - d["serial"] = x.get_serial_number() - d["pub_key"] = "RSA " + str(x.get_pubkey().size() * 8) - return d - -def cert_info_string(cert): - text = "Subject: %s\n" % cert["subject"] - text += "SAN: %s\n" % cert["san"] - text += "Issuer: %s\n" % cert["issuer"] - text += "Public Key: %s\n" % cert["pub_key"] - text += "Not Before: %s\n" % str(cert["not_before"]) - text += "Not After: %s\n" % str(cert["not_after"]) - text += "Serial Number: %s\n" % cert["serial"] - text += "SHA1: %s\n" % cert["fingerprint"] - return text def drop_privs(): nogroup = grp.getgrnam("nogroup").gr_gid