1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Certificate Issuance/Deployment/Redirection; recovery tokens/contact

This commit is contained in:
James Kasten
2014-11-08 06:11:29 -05:00
parent 412b28b219
commit 19bc2fa084
9 changed files with 480 additions and 159 deletions

View File

@@ -51,12 +51,18 @@ def main():
config = configurator.Configurator()
config.display_checkpoints()
sys.exit(0)
elif o == "--revoke":
# Do Stuff
continue
elif o == "--view-keys":
# Do Stuff
continue
elif o == "--test":
#put any temporary tests in here
continue
if not server:
print os.environ
if "ACMESERVER" in os.environ:
server = os.environ["ACMESERVER"]
else:
@@ -70,7 +76,7 @@ def main():
c.authenticate()
def usage():
print "Available options: --text, --privkey=, --csr=, --server=, --rollback=, --view-checkpoints"
print "Available options: --text, --privkey=, --csr=, --server=, --rollback=, --view-checkpoints, --revoke, --view-keys"
if __name__ == "__main__":
main()

View File

@@ -19,6 +19,8 @@ CERT_DIR = SERVER_ROOT + "certs/"
CHOC_CERT_CONF = CONFIG_DIR + "choc_cert_extensions.cnf"
# Contains standard Apache SSL directives
OPTIONS_SSL_CONF = CONFIG_DIR + "options-ssl.conf"
# Trustify SSL vhost configuration extension
TRUSTIFY_VHOST_EXT = "-trustify-ssl.conf"
# Temporary file for challenge virtual hosts
APACHE_CHALLENGE_CONF = CONFIG_DIR + "choc_sni_cert_challenge.conf"
@@ -36,11 +38,11 @@ chain_file = CERT_DIR + "trustify-chain.pem"
#Invalid Extension
INVALID_EXT = ".acme.invalid"
# Rewrite rule arguments used for redirections to https vhost
REWRITE_HTTPS_ARGS = ["^.*$", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,R=permanent]"]
# Challenge Preferences Dict for currently supported challenges
CHALLENGE_PREFERENCES = ["dvsni", "recoveryToken"]
# Mutually Exclusive Challenges - only solve 1
EXCLUSIVE_CHALLENGES = [set(["dvsni", "simpleHttps"])]
# Rewrite rule arguments used for redirections to https vhost
REWRITE_HTTPS_ARGS = ["^.*$", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,R=permanent]"]

270
trustify/client/acme.py Executable file
View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python
# acme.py
# validate JSON objects as ACME protocol messages
import json, jsonschema
schemas = {
"authorization": """{
"id": "https://letsencrypt.org/schema/01/authorization#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for an authorization message",
"type": "object",
"required": ["type"],
"properties": {
"type" : {
"enum" : [ "authorization" ]
},
"recoveryToken" : {
"type": "string"
},
"identifier" : {
"type": "string"
},
"jwk": {
"type": "object"
}
}
}""",
"authorizationRequest": """{
"id": "https://letsencrypt.org/schema/01/authorizationRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for an authorizationRequest message",
"type": "object",
"required": ["type", "sessionID", "nonce", "signature", "responses"],
"properties": {
"type" : {
"enum" : [ "authorizationRequest" ]
},
"sessionID" : {
"type" : "string"
},
"nonce" : {
"type": "string"
},
"signature" : {
"type": "object"
},
"responses": {
"type": "array",
"minItems": 1,
"items": {
"anyOf": [
{ "type": "object" },
{ "type": "null" }
]
}
},
"contact": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
}
}
}""",
"certificate": """{
"id": "https://letsencrypt.org/schema/01/certificate#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a certificate message",
"type": "object",
"required": ["type", "certificate"],
"properties": {
"type" : {
"enum" : [ "certificate" ]
},
"certificate" : {
"type" : "string"
},
"chain" : {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
},
"refresh" : {
"type": "string"
}
}
}""",
"certificateRequest": """{
"id": "https://letsencrypt.org/schema/01/certificateRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a certificateRequest message",
"type": "object",
"required": ["type", "csr", "signature"],
"properties": {
"type" : {
"enum" : [ "certificateRequest" ]
},
"csr" : {
"type" : "string"
},
"signature" : {
"type": "object"
}
}
}""",
"challenge": """{
"id": "https://letsencrypt.org/schema/01/challenge#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a challenge message",
"type": "object",
"required": ["type", "sessionID", "nonce", "challenges"],
"properties": {
"type" : {
"enum" : [ "challenge" ]
},
"sessionID" : {
"type" : "string"
},
"nonce" : {
"type": "string"
},
"challenges": {
"type": "array",
"minItems": 1,
"items": {
"type": "object"
}
},
"combinations": {
"type": "array",
"minItems": 1,
"items": {
"type": "array",
"minItems": 1,
"items": {
"type": "integer"
}
}
}
}
}""",
"challengeRequest": """{
"id": "https://letsencrypt.org/schema/01/challengeRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a challengeRequest message",
"type": "object",
"required": ["type", "identifier"],
"properties": {
"type" : {
"enum" : [ "challengeRequest" ]
},
"identifier" : {
"type": "string"
}
}
}""",
"defer": """{
"id": "https://letsencrypt.org/schema/01/defer#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a defer message",
"type": "object",
"required": ["type", "token"],
"properties": {
"type" : {
"enum" : [ "defer" ]
},
"token" : {
"type": "string"
},
"interval" : {
"type": "integer"
},
"message": {
"type": "string"
}
}
}""",
"error": """{
"id": "https://letsencrypt.org/schema/01/error#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for an error message",
"type": "object",
"required": ["type", "error"],
"properties": {
"type" : {
"enum" : [ "error" ]
},
"error" : {
"enum" : [ "malformed", "unauthorized", "serverInternal", "nonSupported", "unknown", "badCSR" ]
},
"message" : {
"type": "string"
},
"moreInfo": {
"type": "string"
}
}
}""",
"revocation": """{
"id": "https://letsencrypt.org/schema/01/revocation#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a revocation message",
"type": "object",
"required": ["type"],
"properties": {
"type" : {
"enum" : [ "revocation" ]
}
}
}""",
"revocationRequest": """{
"id": "https://letsencrypt.org/schema/01/revocationRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a revocationRequest message",
"type": "object",
"required": ["type", "certificate", "signature"],
"properties": {
"type" : {
"enum" : [ "revocationRequest" ]
},
"certificate" : {
"type" : "string"
},
"signature" : {
"type": "object"
}
}
}""",
"statusRequest": """{
"id": "https://letsencrypt.org/schema/01/statusRequest#",
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Schema for a statusRequest message",
"type": "object",
"required": ["type", "token"],
"properties": {
"type" : {
"enum" : [ "statusRequest" ]
},
"token" : {
"type": "string"
}
}
}"""
}
schemas = {name: json.loads(schema) for name, schema in schemas.iteritems()}
def acme_object_validate(j):
j = json.loads(j)
if not isinstance(j, dict):
raise jsonschema.ValidationError("this is not a dictionary object")
if not j.has_key("type"):
raise jsonschema.ValidationError("missing type field")
jsonschema.validate(j, schemas[j["type"]])

View File

@@ -1,10 +1,13 @@
from trustify.client import logger
#import logger
class Challenge(object):
def __init__(self, configurator):
self.config = configurator
def perform(self, quiet=True):
logger.error("Error - base class challenge.perform()")
def generate_response(self):
logger.error("Error - base class challenge.generate_response()")
def clean(self):
logger.error("Error - base class challenge.clean()")

View File

@@ -18,6 +18,7 @@ from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from trustify.client.acme import acme_object_validate
from trustify.client.sni_challenge import SNI_Challenge
from trustify.client.payment_challenge import Payment_Challenge
from trustify.client import configurator
@@ -77,6 +78,7 @@ class Client(object):
# Display screen to select domains to validate
self.names = self.filter_names(self.names)
self.names = [self.names[0]]
# Display choice of CA screen
# TODO: Use correct server depending on CA
@@ -87,17 +89,14 @@ class Client(object):
logger.info("Loading mod_ssl into Apache Server")
self.config.enable_mod("ssl")
key_pem, csr_pem = self.get_key_csr_pem()
#r, k = self.send_request(key_pem, csr_pem, self.names)
key_pem, csr_der = self.get_key_csr_pem()
challenge_dict = self.send(self.challenge_request(self.names))
challenge_dict = self.is_expected_msg(challenge_dict, "challenge")
if not challenge_dict:
logger.fatal("Unable to retreive Challenge message from server")
sys.exit(1)
print challenge_dict
#assert self.is_challenge(challenge_dict)
@@ -105,6 +104,9 @@ class Client(object):
responses, challenge_objs = self.verify_identity(challenge_dict)
# Find set of virtual hosts to deploy certificates to
vhost = self.get_virtual_hosts(self.names)
authorization_dict = self.send(self.authorization_request(challenge_dict["sessionID"], self.names[0], challenge_dict["nonce"], responses))
authorization_dict = self.is_expected_msg(authorization_dict, "authorization")
@@ -113,49 +115,87 @@ class Client(object):
logger.fatal("Failed Authorization procedure - cleaning up challenges")
sys.exit(1)
certificate_dict = self.send(self.certificate_request(self.names, self.key_file, self.key_file))
certificate_dict = self.send(self.certificate_request(csr_der, self.key_file))
certifcate_dict = self.is_expected_msg(certificate_dict, "certificate")
certificate_dict = self.is_expected_msg(certificate_dict, "certificate")
sys.exit()
# Install Certificate
self.cleanup_challenges(challenge_objs)
self.install_certificate(certificate_dict, vhost)
# Perform optimal config changes
# challenges = self.challenge_factory(r)
# # Find set of virtual hosts to deploy certificates to
# vhost = self.get_virtual_hosts(self.names)
# # Perform all "client knows first" challenges
# for challenge in challenges:
# if not challenge.perform(quiet=self.curses):
# # TODO: In this case the client should probably send a failure
# # to the server.
# logger.fatal("challenge failed")
# sys.exit(1)
# logger.info("Configured Apache for challenges; waiting for verification...")
# r = self.notify_server_of_completion(r, k)
# r = self.check_payment(r, k)
# self.handle_verification_response(r, challenges, vhost)
self.config.save("Completed Augeas Authentication")
return
def certificate_request(self, names, auth_key, cert_key):
logger.info("Preparing and sending CSR for %s" % names[0])
logger.info(csr_pem)
def revoke(self, cert_file):
x = M2Crypto.X509.load_cert(cert_file)
cert_der = x.as_der()
#self.find_key_for_cert()
self.send(self.revoke_request(cert_der))
def revoke_request(self, cert_der):
return {"type":"revokeRequest", "certificate":jose.b64encode_url(cert_der), "signature":self.create_sig(cert_der)}
def convert_b64_cert_to_pem(self, b64_der_cert):
x = M2Crypto.X509.load_cert_der_string(jose.b64decode_url(b64_der_cert))
return x.as_pem()
def install_certificate(self, certificate_dict, vhost):
cert_chain_abspath = None
cert_fd, cert_fn = trustify_util.unique_file(cert_file, 644)
cert_fd.write(self.convert_b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.close()
logger.info("Server issued certificate; certificate written to %s" % cert_fn)
if certificate_dict.get("chain", None):
chain_fd, chain_fn = trustify_util.unique_file(chain_file, 644)
for c in certificate_dict.get("chain", []):
chain_fd.write(self.convert_b64_cert_to_pem(c))
chain_fd.close()
logger.info("Cert chain written to %s" % chain_fn)
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(chain_fn)
for host in vhost:
self.config.deploy_cert(host, os.path.abspath(cert_fn), os.path.abspath(self.key_file), cert_chain_abspath)
# Enable any vhost that was issued to, but not enabled
if not host.enabled:
logger.info("Enabling Site " + host.file)
self.config.enable_site(host)
# sites may have been enabled / final cleanup
self.config.restart(quiet=self.curses)
if self.curses:
dialog.Dialog().msgbox("\nCongratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!", width=70)
if self.by_default():
self.config.enable_mod("rewrite")
self.redirect_to_ssl(vhost)
self.config.restart(quiet=self.curses)
else:
logger.info("Congratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!")
def certificate_request(self, csr_der, key):
logger.info("Preparing and sending CSR..")
return {"type":"certificateRequest", "csr":jose.b64encode_url(csr_der), "signature":self.create_sig(csr_der)}
def cleanup_challenges(self, challenge_objs):
logger.info("Cleaning up challenges...")
for c in challenge_objs:
c.cleanup()
def is_expected_msg(self, msg_dict, expected, delay=5, rounds = 20):
def is_expected_msg(self, msg_dict, expected, delay=3, rounds = 20):
for i in range(rounds):
if msg_dict["type"] == expected:
return msg_dict
@@ -173,9 +213,8 @@ class Client(object):
def authorization_request(self, id, name, server_nonce, responses):
auth_req = {"type":"authorizationRequest", "sessionID":id, "nonce":server_nonce}
auth_req["signature"] = self.create_authorization_sig(name + jose.b64decode_url(server_nonce))
auth_req["signature"] = self.create_sig(name + jose.b64decode_url(server_nonce))
auth_req["responses"] = responses
print auth_req
return auth_req
def status_request(self, token):
@@ -186,7 +225,7 @@ class Client(object):
return "0" + s
return s
def create_authorization_sig(self, msg, signer_nonce = None, signer_nonce_len = NONCE_SIZE):
def create_sig(self, msg, signer_nonce = None, signer_nonce_len = NONCE_SIZE):
# DOES prepend signer_nonce to message
# TODO: Change this over to M2Crypto... PKey
# Protect against crypto unicode errors... is this sufficient? Do I need to escape?
@@ -197,8 +236,8 @@ class Client(object):
h = SHA256.new(signer_nonce + msg)
signer = PKCS1_v1_5.new(key)
signature = signer.sign(h)
print "signing:", signer_nonce + msg
print "signature:", signature
#print "signing:", signer_nonce + msg
#print "signature:", signature
n, e = key.n, key.e
n_bytes = binascii.unhexlify(self.__leading_zeros(hex(n)[2:].replace("L", "")))
e_bytes = binascii.unhexlify(self.__leading_zeros(hex(e)[2:].replace("L", "")))
@@ -218,11 +257,7 @@ class Client(object):
def verify_identity(self, c):
path = self.gen_challenge_path(c["challenges"], c.get("combinations", None))
logger.info("Peforming the following challenges:")
print c["challenges"]
print c
for chall in path:
logger.info(c["challenges"][chall])
# Every indicies element is a list of integers referring to which challenges in the master list
# the challenge object satisfies
# Single Challenge objects that can satisfy multiple server challenges
@@ -307,57 +342,11 @@ class Client(object):
return True
def send(self, json_obj):
#validate(json.dumps(json_obj))
acme_object_validate(json.dumps(json_obj))
response = urllib2.urlopen(self.server_url, json.dumps(json_obj)).read()
#validate(response)
acme_object_validate(response)
return json.loads(response)
def handle_verification_response(self, r, challenges, vhost):
if r.success.IsInitialized():
# Allow Challenges to cleanup
for chall in challenges:
chall.cleanup()
cert_chain_abspath = None
cert_fd, cert_fn = trustify_util.unique_file(cert_file, 644)
cert_fd.write(r.success.certificate)
cert_fd.close()
logger.info("Server issued certificate; certificate written to %s" % cert_fn)
if r.success.chain:
chain_fd, chain_fn = trustify_util.unique_file(chain_file, 644)
chain_fd.write(r.success.chain)
chain_fd.close()
logger.info("Cert chain written to %s" % chain_fn)
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(chain_fn)
for host in vhost:
self.config.deploy_cert(host, os.path.abspath(cert_fn), os.path.abspath(self.key_file), cert_chain_abspath)
# Enable any vhost that was issued to, but not enabled
if not host.enabled:
logger.info("Enabling Site " + host.file)
self.config.enable_site(host)
# sites may have been enabled / final cleanup
self.config.restart(quiet=self.curses)
if self.curses:
dialog.Dialog().msgbox("\nCongratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!", width=70)
if self.by_default():
self.config.enable_mod("rewrite")
self.redirect_to_ssl(vhost)
self.config.restart(quiet=self.curses)
else:
logger.info("Congratulations! You have successfully enabled " + self.gen_https_names(self.names) + "!")
elif r.failure.IsInitialized():
logger.fatal("Server reported failure.")
sys.exit(1)
else:
logger.fatal("Unexpected server verification response!")
sys.exit(43)
def all_payment_challenge(self, r):
if not r.challenge:
@@ -470,51 +459,13 @@ class Client(object):
return challenge_objs, challenge_obj_indicies
def send_request(self, key_pem, csr_pem, names):
k = chocolatemessage()
m = chocolatemessage()
self.init_message(k)
self.init_message(m)
logger.info("Creating request; generating hashcash...")
self.make_request(m, csr_pem, quiet=self.curses)
self.sign_message(key_pem, m)
logger.info("Created request; sending to server...")
logger.debug(m)
r = self.decode(self.do(self.upstream, m))
logger.debug(r)
while r.proceed.IsInitialized():
if r.proceed.polldelay > 60: r.proceed.polldelay = 60
logger.info("Waiting %d seconds..." % r.proceed.polldelay)
time.sleep(r.proceed.polldelay)
k.session = r.session
r = self.decode(self.do(self.upstream, k))
logger.debug(r)
if r.failure.IsInitialized():
logger.fatal("Chocolate Server reported failure.")
sys.exit(1)
return r, k
def make_request(self, m, csr_pem, quiet=False):
m.request.recipient = self.server
m.request.timestamp = int(time.time())
m.request.csr = csr_pem
hashcash_cmd = ["hashcash", "-P", "-m", "-z", "12", "-b", `difficulty`, "-r", self.server]
if quiet:
hashcash = subprocess.Popen(hashcash_cmd, preexec_fn=trustify_util.drop_privs, shell= False, stdout=subprocess.PIPE, stderr=open("/dev/null", "w")).communicate()[0].rstrip()
else:
hashcash = subprocess.Popen(hashcash_cmd, preexec_fn=trustify_util.drop_privs, shell= False, stdout=subprocess.PIPE).communicate()[0].rstrip()
if hashcash: m.request.clientpuzzle = hashcash
def get_key_csr_pem(self):
def get_key_csr_pem(self, csr_return_format = 'der'):
"""
Returns key and CSR in pem form, using provided files or generating new files if
necessary
Returns key and CSR using provided files or generating new files if necessary.
Both will be saved in pem format on the filesystem. The CSR can
optionally be returned in DER format as the CSR cannot be loaded back into
M2Crypto.
"""
key_pem = None
csr_pem = None
@@ -534,7 +485,7 @@ class Client(object):
sys.exit(1)
if not self.csr_file:
csr_pem = trustify_util.make_csr(self.key_file, self.names)
csr_pem, csr_der = trustify_util.make_csr(self.key_file, self.names)
# Save CSR
trustify_util.make_or_verify_dir(CERT_DIR, 0755)
csr_f, self.csr_file = trustify_util.unique_file(CERT_DIR + "csr-trustify.pem", 0644)
@@ -548,9 +499,13 @@ class Client(object):
logger.fatal("Unable to open CSR file: %s" % self.csr_file)
sys.exit(1)
if csr_return_format == 'der':
return key_pem, csr_der
return key_pem, csr_pem
# based on M2Crypto unit test written by Toby Allsopp
def make_key(self, bits=RSA_KEY_SIZE):
@@ -581,20 +536,7 @@ class Client(object):
privkey = M2Crypto.RSA.load_key_string(key)
return privkey.sign(hashlib.sha256(data).digest(), 'sha256')
def do(self, upstream, m):
u = urllib2.urlopen(upstream, m.SerializeToString())
return u.read()
def decode(self, m):
return (chocolatemessage.FromString(m))
def init_message(self, m):
m.chocolateversion = 1
m.session = ""
def sign_message(self, key, m):
m.request.sig = self.__rsa_sign(key, ("(%d) (%s) (%s)" % (m.request.timestamp, m.request.recipient, m.request.csr)))
def filter_names(self, names):
d = dialog.Dialog()
choices = [(n, "", 1) for n in names]

View File

@@ -12,10 +12,10 @@ import errno
from trustify.client.CONFIG import SERVER_ROOT, BACKUP_DIR
from trustify.client.CONFIG import REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR
from trustify.client.CONFIG import TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR
from trustify.client.CONFIG import OPTIONS_SSL_CONF
from trustify.client.CONFIG import OPTIONS_SSL_CONF, TRUSTIFY_VHOST_EXT
from trustify.client import logger, trustify_util
#from CONFIG import SERVER_ROOT, BACKUP_DIR, REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR, TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR, OPTIONS_SSL_CONF, TRUSTIFY_VHOST_EXT
#import logger, trustify_util
#from CONFIG import SERVER_ROOT, BACKUP_DIR, REWRITE_HTTPS_ARGS, CONFIG_DIR, WORK_DIR, TEMP_CHECKPOINT_DIR, IN_PROGRESS_DIR, OPTIONS_SSL_CONF
# Question: Am I missing any attacks that can result from modifying CONFIG file?
# Configurator should be turned into a Singleton
@@ -530,11 +530,11 @@ class Configurator(object):
def make_vhost_ssl(self, nonssl_vhost):
"""
Duplicates vhost and adds default ssl options
New vhost will reside as (nonssl_vhost.path)-trustify-ssl
New vhost will reside as (nonssl_vhost.path) + TRUSTIFY_VHOST_EXT
"""
avail_fp = nonssl_vhost.file
# Copy file
ssl_fp = avail_fp + "-trustify-ssl"
ssl_fp = avail_fp + TRUSTIFY_VHOST_EXT
orig_file = open(avail_fp, 'r')
# First register the creation so that it is properly removed if
@@ -955,7 +955,7 @@ LogLevel warn \n\
for fp in filepaths:
# Files are registered before they are added... so check to see if file
# exists first
if os.path.isfile(fp):
if os.path.lexists(fp):
os.remove(fp)
else:
logger.warn("File: %s - Could not be found to be deleted\nProgram was probably shut down unexpectedly, in which case this is not a problem" % fp)

View File

@@ -0,0 +1,72 @@
from trustify.client.challenge import Challenge
from trustify.client import logger
from trustify.client.CONFIG import RECOVERY_TOKEN_EXT
# TODO: Replace urllib2 because of lack of certificate validation checks
import dialog, urllib2
class RecoveryContact(Challenge):
def __init__(self, activationURL = "", successURL = "", contact = "", poll_delay = 3):
self.token = ""
self.activationURL = activationURL
self.successURL = successURL
self.contact = contact
self.poll_delay = poll_delay
def perform(self, quiet = True):
d = dialog.Dialog()
if quiet:
if self.successURL:
d.infobox(self.get_display_string())
return self.poll(10, quiet)
else:
exit, self.token = d.inputbox(self.get_display_string()))
if exit != d.OK:
return False
else:
print self.get_display_string()
if successURL:
return self.poll(10, quiet)
else:
self.token = raw_input("Enter the recovery token:")
return True
def cleanup(self):
return
def get_display_string(self):
string = "Recovery Contact Challenge: "
if self.activationURL:
string += "Proceed to the URL to continue " + self.activationURL
if self.activationURL and self.contact:
string += " or respond to the recovery email sent to " + self.contact
elif self.contact:
string += "Recovery email sent to" + self.contact
def poll(self, rounds = 10, quiet = True):
for i in range(rounds):
if urllib2.urlopen(self.successURL).getcode() != 200:
time.sleep(self.poll_delay)
else:
return True
if self.prompt_continue(quiet):
return self.poll(rounds, quiet)
else:
return False
def prompt_continue(self, quiet = True):
prompt = "You have not completed the challenge yet, would you like to continue?"
if quiet:
ans = dialog.Dialog().yesno(prompt, width=70)
else:
ans = raw_input(prompt + "y/n")
return ans.startswith('y') or ans.startswith('Y')
def generate_response(self):
if self.token == "":
return {"type":"recoveryContact"}
return {"type":"recoveryContact", "token":self.token}

View File

@@ -0,0 +1,26 @@
from trustify.client.challenge import Challenge
from trustify.client import logger
from trustify.client.CONFIG import RECOVERY_TOKEN_EXT
import dialog
class RecoveryToken(Challenge):
def __init__(self):
self.token = ""
def perform(self, quiet = True):
if quiet:
cancel, self.token = dialog.Dialog().inputbox("Please Input Recovery Token")
if cancel == 1:
return False
else:
self.token = raw_input("Enter the Recovery Token: ")
return True
def cleanup(self):
pass
def generate_response(self):
return {"type":"recoveryToken", "token":self.token}

View File

@@ -11,7 +11,7 @@ from trustify.client import logger
def make_csr(key_file, domains):
"""
Returns new CSR in PEM form using key_file containing all domains
Returns new CSR in PEM and DER form using key_file containing all domains
"""
assert domains, "Must provide one or more hostnames for the CSR."
rsa_key = M2Crypto.RSA.load_key(key_file)
@@ -32,7 +32,7 @@ def make_csr(key_file, domains):
assert x.verify(pk)
pk2 = x.get_pubkey()
assert x.verify(pk2)
return x.as_pem()
return x.as_pem(), x.as_der()
def make_ss_cert(key_file, domains):
"""