1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Fully support Revocation with menus

This commit is contained in:
James Kasten
2014-11-09 07:30:40 -05:00
parent 19bc2fa084
commit cbec87e181
4 changed files with 167 additions and 100 deletions

View File

@@ -14,7 +14,7 @@ def main():
sys.exit("\nOnly root can run trustify.\n")
# Parse options
try:
opts, args = getopt.getopt(sys.argv[1:], "", ["text", "test", "view-checkpoints", "privkey=", "csr=", "server=", "rollback="])
opts, args = getopt.getopt(sys.argv[1:], "", ["text", "test", "view-checkpoints", "privkey=", "csr=", "server=", "rollback=", "revoke"])
except getopt.GetoptError as err:
# print help info and exit
print str(err)
@@ -26,6 +26,7 @@ def main():
privkey = None
curses = True
names = args
flag_revoke = False
for o, a in opts:
if o == "--text":
@@ -53,10 +54,7 @@ def main():
sys.exit(0)
elif o == "--revoke":
# Do Stuff
continue
elif o == "--view-keys":
# Do Stuff
continue
flag_revoke = True
elif o == "--test":
#put any temporary tests in here
@@ -73,10 +71,13 @@ def main():
server = "54.183.196.250"
c = client.Client(server, args, csr, privkey, curses)
c.authenticate()
if flag_revoke:
c.list_certs_keys()
else:
c.authenticate()
def usage():
print "Available options: --text, --privkey=, --csr=, --server=, --rollback=, --view-checkpoints, --revoke, --view-keys"
print "Available options: --text, --privkey=, --csr=, --server=, --rollback=, --view-checkpoints, --revoke"
if __name__ == "__main__":
main()

View File

@@ -9,8 +9,8 @@ import M2Crypto
# version.
import urllib2, json
# XXX TODO: per https://docs.google.com/document/pub?id=1roBIeSJsYq3Ntpf6N0PIeeAAvu4ddn7mGo6Qb7aL7ew, urllib2 is unsafe (!) and must be replaced
import os, grp, pwd, sys, time, random, sys
import hashlib, binascii, jose
import os, grp, pwd, sys, time, random, sys, shutil
import hashlib, binascii, jose, csv
import subprocess
from M2Crypto import EVP, X509, RSA
from Crypto.Random import get_random_bytes
@@ -24,8 +24,8 @@ from trustify.client.payment_challenge import Payment_Challenge
from trustify.client import configurator
from trustify.client import logger
from trustify.client import trustify_util
from trustify.client.CONFIG import NONCE_SIZE, cert_file, chain_file
from trustify.client.CONFIG import SERVER_ROOT, KEY_DIR, CERT_DIR
from trustify.client.CONFIG import NONCE_SIZE, CERT_PATH, CHAIN_PATH
from trustify.client.CONFIG import SERVER_ROOT, KEY_DIR, CERT_DIR, CERT_KEY_BACKUP
from trustify.client.CONFIG import CHALLENGE_PREFERENCES, EXCLUSIVE_CHALLENGES
# it's weird to point to chocolate servers via raw IPv6 addresses, and such
# addresses can be %SCARY in some contexts, so out of paranoia let's disable
@@ -43,6 +43,7 @@ class Client(object):
self.curses = use_curses
if self.curses:
import dialog
self.d = dialog.Dialog()
# Logger needs to be initialized before Configurator
self.init_logger()
@@ -97,7 +98,6 @@ class Client(object):
challenge_dict = self.is_expected_msg(challenge_dict, "challenge")
print challenge_dict
#assert self.is_challenge(challenge_dict)
#Perform Challenges
@@ -124,26 +124,127 @@ class Client(object):
self.install_certificate(certificate_dict, vhost)
# Perform optimal config changes
# self.handle_verification_response(r, challenges, vhost)
self.config.save("Completed Augeas Authentication")
self.store_cert_key(False)
return
def revoke(self, cert_file):
x = M2Crypto.X509.load_cert(cert_file)
def revoke(self, c):
x = M2Crypto.X509.load_cert(c["cert_file"])
cert_der = x.as_der()
#self.find_key_for_cert()
self.send(self.revoke_request(cert_der))
revocation_dict = self.send(self.revocation_request(cert_der))
revocation_dict = self.is_expected_msg(revocation_dict, "revocation")
self.d.msgbox("You have successfully revoked the certificate for %s" % c["cn"], width=70, height=16)
self.remove_cert_key(c["cert_file"], c["key_file"])
sys.exit(0)
def remove_cert_key(self, c_file, k_file):
list_file = CERT_KEY_BACKUP + "LIST"
list_file2 = CERT_KEY_BACKUP + "LIST.tmp"
with open(list_file, 'rb') as orgfile:
csvreader = csv.reader(orgfile)
with open(list_file2, 'wb') as newfile:
csvwriter = csv.writer(newfile)
for row in csvreader:
if not (row[1] == c_file and row[2] == k_file):
csvwriter.writerow(row)
shutil.copy2(list_file2, list_file)
os.remove(c_file)
os.remove(k_file)
def store_revocation_token(self, token):
return
def revoke_request(self, cert_der):
return {"type":"revokeRequest", "certificate":jose.b64encode_url(cert_der), "signature":self.create_sig(cert_der)}
def store_cert_key(self, encrypt = False):
list_file = CERT_KEY_BACKUP + "LIST"
trustify_util.make_or_verify_dir(CERT_KEY_BACKUP, 0700)
idx = 0
if encrypt:
logger.error("Unfortunately securely storing the certificates/keys is not yet available. Stay tuned for the next update!")
return False
else:
if os.path.isfile(list_file):
with open(list_file, 'r+b') as csvfile:
csvreader = csv.reader(csvfile)
for r in csvreader:
idx = int(r[0]) + 1
csvwriter = csv.writer(csvfile)
csvwriter.writerow([str(idx), self.cert_file, self.key_file])
else:
with open(list_file, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["0", self.cert_file, self.key_file])
shutil.copy2(self.key_file, CERT_KEY_BACKUP + os.path.basename(self.key_file) + "_" + str(idx))
shutil.copy2(self.cert_file, CERT_KEY_BACKUP + os.path.basename(self.cert_file) + "_" + str(idx))
def list_certs_keys(self):
list_file = CERT_KEY_BACKUP + "LIST"
certs = []
if not os.path.isfile(CERT_KEY_BACKUP + "LIST"):
logger.info("You don't have any certificates saved from trustify")
return
with open(list_file, 'rb') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
c = trustify_util.get_cert_info(row[1])
c["key_file"] = row[2]
c["cert_file"] = row[1]
certs.append(c)
self.__display_certs(certs)
def __display_certs(self, certs):
while True:
menu_choices = [(str(i+1), str(c["cn"]) + " - " + c["pub_key"] + " - " + str(c["not_before"])[:-6]) for i, c in enumerate(certs)]
if self.curses:
code, selection = self.d.menu("Which certificate would you like to revoke?", choices = menu_choices,
help_button=True, help_label="More Info", ok_label="Revoke",
width=70, height=16)
if code == self.d.DIALOG_OK:
self.__confirm_revocation(certs[int(selection)-1])
elif code == self.d.DIALOG_CANCEL:
exit(0)
elif code == "help":
self.__more_info_cert(certs[int(selection)-1])
def __more_info_cert(self, cert):
text = "Certificate Information:\n"
text += "-" * 66
text += trustify_util.cert_info_string(cert)
text += "-" * 66
self.d.msgbox(text, width=70, height=16)
def __confirm_revocation(self, cert):
text = "Are you sure you would like to revoke the following certificate:\n"
text += "-" * 66 + "\n"
text += trustify_util.cert_info_string(cert)
text += "-" * 66
text += "This action cannot be reversed!"
a = self.d.yesno(text, width=70, height=16)
if a == self.d.DIALOG_OK:
self.revoke(cert)
def revocation_request(self, cert_der):
return {"type":"revocationRequest", "certificate":jose.b64encode_url(cert_der), "signature":self.create_sig(cert_der)}
def convert_b64_cert_to_pem(self, b64_der_cert):
x = M2Crypto.X509.load_cert_der_string(jose.b64decode_url(b64_der_cert))
@@ -151,12 +252,12 @@ class Client(object):
def install_certificate(self, certificate_dict, vhost):
cert_chain_abspath = None
cert_fd, cert_fn = trustify_util.unique_file(cert_file, 644)
cert_fd, self.cert_file = trustify_util.unique_file(CERT_PATH, 644)
cert_fd.write(self.convert_b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.close()
logger.info("Server issued certificate; certificate written to %s" % cert_fn)
logger.info("Server issued certificate; certificate written to %s" % self.cert_file)
if certificate_dict.get("chain", None):
chain_fd, chain_fn = trustify_util.unique_file(chain_file, 644)
chain_fd, chain_fn = trustify_util.unique_file(CHAIN_PATH, 644)
for c in certificate_dict.get("chain", []):
chain_fd.write(self.convert_b64_cert_to_pem(c))
chain_fd.close()
@@ -167,7 +268,7 @@ class Client(object):
cert_chain_abspath = os.path.abspath(chain_fn)
for host in vhost:
self.config.deploy_cert(host, os.path.abspath(cert_fn), os.path.abspath(self.key_file), cert_chain_abspath)
self.config.deploy_cert(host, os.path.abspath(self.cert_file), os.path.abspath(self.key_file), cert_chain_abspath)
# Enable any vhost that was issued to, but not enabled
if not host.enabled:
logger.info("Enabling Site " + host.file)
@@ -177,7 +278,7 @@ class Client(object):
self.config.restart(quiet=self.curses)
if self.curses:
dialog.Dialog().msgbox("\nCongratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!", width=70)
self.d.msgbox("\nCongratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!", width=70)
if self.by_default():
self.config.enable_mod("rewrite")
self.redirect_to_ssl(vhost)
@@ -348,15 +449,6 @@ class Client(object):
return json.loads(response)
def all_payment_challenge(self, r):
if not r.challenge:
return False
for chall in r.challenge:
if chall.type != r.Payment:
return False
return True
def redirect_to_ssl(self, vhost):
for ssl_vh in vhost:
success, redirect_vhost = self.config.redirect_all_ssl(ssl_vh)
@@ -367,59 +459,7 @@ class Client(object):
self.config.enable_site(redirect_vhost)
logger.info("Enabling available site: " + redirect_vhost.file)
def check_payment(self, r, k):
while r.challenge and self.all_payment_challenge(r):
# dont need to change domain names here
paymentChallenges = self.challenge_factory(r)
for chall in paymentChallenges:
chall.perform(quiet=self.curses)
logger.info("User has continued Trustify after submitting payment")
proceed_msg = chocolatemessage()
self.init_message(proceed_msg)
proceed_msg.session = r.session
proceed_msg.proceed.timestamp = int(time.time())
proceed_msg.proceed.polldelay = 60
# Send the proceed message
# this used to be k?
r = self.decode(self.do(self.upstream, k))
while r.proceed.IsInitialized():
if r.proceed.IsInitialized():
delay = min(r.proceed.polldelay, 60)
logger.debug("waiting %d" % delay)
time.sleep(delay)
k.session = r.session
# this used to be k?
r = self.decode(self.do(self.upstream, k))
logger.debug(r)
return r
# Figure out k's purpose..
def notify_server_of_completion(self, r, k):
did_it = chocolatemessage()
self.init_message(did_it)
did_it.session = r.session
did_it.completedchallenge.extend(r.challenge)
r=self.decode(self.do(self.upstream, did_it))
logger.debug(r)
delay = 5
# TODO: Check this while statement
while r.proceed.IsInitialized() or (r.challenge and not self.all_payment_challenge(r)):
if r.proceed.IsInitialized():
delay = min(r.proceed.polldelay, 60)
logger.debug("waiting %d" % delay)
time.sleep(delay)
k.session = r.session
r = self.decode(self.do(self.upstream, k))
logger.debug(r)
return r
def get_virtual_hosts(self, domains):
vhost = set()
for name in domains:
@@ -538,18 +578,17 @@ class Client(object):
def filter_names(self, names):
d = dialog.Dialog()
choices = [(n, "", 1) for n in names]
result = d.checklist("Which names would you like to activate HTTPS for?", choices=choices)
result = self.d.checklist("Which names would you like to activate HTTPS for?", choices=choices)
if result[0] != 0 or not result[1]:
sys.exit(1)
return result[1]
def choice_of_ca(self):
d = dialog.Dialog()
choices = self.get_cas()
result = d.menu("Pick a Certificate Authority. They're all unique and special!", width=70, choices=choices)
result = self.d.menu("Pick a Certificate Authority. They're all unique and special!", width=70, choices=choices)
if result[0] != 0:
sys.exit(1)

View File

@@ -12,7 +12,7 @@ import errno
from trustify.client.CONFIG import SERVER_ROOT, BACKUP_DIR
from trustify.client.CONFIG import REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR
from trustify.client.CONFIG import TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR
from trustify.client.CONFIG import OPTIONS_SSL_CONF, TRUSTIFY_VHOST_EXT
from trustify.client.CONFIG import OPTIONS_SSL_CONF
from trustify.client import logger, trustify_util
#from CONFIG import SERVER_ROOT, BACKUP_DIR, REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR, TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR, OPTIONS_SSL_CONF, TRUSTIFY_VHOST_EXT
#import logger, trustify_util
@@ -1134,9 +1134,9 @@ LogLevel warn \n\
# Create Checkpoint
if temporary:
self.__add_to_checkpoint(TEMP_CHECKPOINT_DIR, save_files)
self.add_to_checkpoint(TEMP_CHECKPOINT_DIR, save_files)
else:
self.__add_to_checkpoint(IN_PROGRESS_DIR, save_files)
self.add_to_checkpoint(IN_PROGRESS_DIR, save_files)
if title and not temporary and os.path.isdir(IN_PROGRESS_DIR):
@@ -1176,7 +1176,7 @@ LogLevel warn \n\
return False
return True
def __add_to_checkpoint(self, cp_dir, save_files):
def add_to_checkpoint(self, cp_dir, save_files):
trustify_util.make_or_verify_dir(cp_dir, 0755)
existing_filepaths = []

View File

@@ -17,7 +17,7 @@ def make_csr(key_file, domains):
rsa_key = M2Crypto.RSA.load_key(key_file)
pk = EVP.PKey()
pk.assign_rsa(rsa_key)
x = X509.Request()
x.set_pubkey(pk)
name = x.get_subject()
@@ -42,7 +42,7 @@ def make_ss_cert(key_file, domains):
rsa_key = M2Crypto.RSA.load_key(key_file)
pk = EVP.PKey()
pk.assign_rsa(rsa_key)
x = X509.X509()
x.set_pubkey(pk)
x.set_serial_number(1337)
@@ -67,13 +67,13 @@ def make_ss_cert(key_file, domains):
x.add_ext(X509.new_extension('basicConstraints', 'CA:FALSE'))
#x.add_ext(X509.new_extension('extendedKeyUsage', 'TLS Web Server Authentication'))
x.add_ext(X509.new_extension('subjectAltName', ", ".join(["DNS:%s" % d for d in domains])))
x.sign(pk, 'sha256')
assert x.verify(pk)
assert x.verify()
#print check_purpose(,0
return x.as_pem()
def make_or_verify_dir(directory, permissions=0755, uid=0):
try:
os.makedirs(directory, permissions)
@@ -106,6 +106,33 @@ def unique_file(default_name, mode = 0777):
default_name = f_parsed[0] + '_' + str(count) + f_parsed[1]
count += 1
def get_cert_info(filename):
d = {}
# M2Crypto Library only supports RSA right now
x = M2Crypto.X509.load_cert(filename)
d["not_before"] = x.get_not_before().get_datetime()
d["not_after"] = x.get_not_after().get_datetime()
d["subject"] = x.get_subject().as_text()
d["cn"] = x.get_subject().CN
d["issuer"] = x.get_issuer().as_text()
d["fingerprint"] = x.get_fingerprint(md='sha1')
d["san"] = x.get_ext("subjectAltName").get_value()
d["serial"] = x.get_serial_number()
d["pub_key"] = "RSA " + str(x.get_pubkey().size() * 8)
return d
def cert_info_string(cert):
text = "Subject: %s\n" % cert["subject"]
text += "SAN: %s\n" % cert["san"]
text += "Issuer: %s\n" % cert["issuer"]
text += "Public Key: %s\n" % cert["pub_key"]
text += "Not Before: %s\n" % str(cert["not_before"])
text += "Not After: %s\n" % str(cert["not_after"])
text += "Serial Number: %s\n" % cert["serial"]
text += "SHA1: %s\n" % cert["fingerprint"]
return text
def drop_privs():
nogroup = grp.getgrnam("nogroup").gr_gid
nobody = pwd.getpwnam("nobody").pw_uid