diff --git a/.pylintrc b/.pylintrc index 345e26af1..5302133ad 100644 --- a/.pylintrc +++ b/.pylintrc @@ -82,7 +82,7 @@ required-attributes= bad-functions=map,filter,apply,input,file # Good variable names which should always be accepted, separated by a comma -good-names=i,j,k,ex,Run,_,fd +good-names=f,i,j,k,ex,Run,_,fd # Bad variable names which should always be refused, separated by a comma bad-names=foo,bar,baz,toto,tutu,tata diff --git a/letsencrypt/cli.py b/letsencrypt/cli.py index defa7633d..fb9a7244a 100644 --- a/letsencrypt/cli.py +++ b/letsencrypt/cli.py @@ -98,13 +98,18 @@ def run(args, config, plugins): return "Configurator could not be determined" acme, doms = _common_run(args, config, acc, authenticator, installer) - cert_key, cert_path, chain_path = acme.obtain_certificate(doms) - acme.deploy_certificate(doms, cert_key, cert_path, chain_path) + # TODO: Handle errors from _common_run? + lineage = acme.obtain_and_enroll_certificate(doms, authenticator, + installer, plugins) + if not lineage: + return "Certificate could not be obtained" + acme.deploy_certificate(doms, lineage) acme.enhance_config(doms, args.redirect) def auth(args, config, plugins): """Obtain a certificate (no install).""" + # XXX: Update for renewer / RenewableCert acc = _account_init(args, config) if acc is None: return None @@ -119,13 +124,17 @@ def auth(args, config, plugins): else: installer = None + # TODO: Handle errors from _common_run? acme, doms = _common_run( args, config, acc, authenticator=authenticator, installer=installer) - acme.obtain_certificate(doms) + if not acme.obtain_and_enroll_certificate(doms, authenticator, installer, + plugins): + return "Certificate could not be obtained" def install(args, config, plugins): """Install (no auth).""" + # XXX: Update for renewer/RenewableCert acc = _account_init(args, config) if acc is None: return None @@ -136,7 +145,8 @@ def install(args, config, plugins): acme, doms = _common_run( args, config, acc, authenticator=None, installer=installer) assert args.cert_path is not None - acme.deploy_certificate(doms, acc.key, args.cert_path, args.chain_path) + # XXX: This API has changed as a result of RenewableCert! + # acme.deploy_certificate(doms, acc.key, args.cert_path, args.chain_path) acme.enhance_config(doms, args.redirect) @@ -330,6 +340,9 @@ def _paths_parser(parser): add("--chain-path", default=flag_default("chain_path"), help=config_help("chain_path")) + add("--enroll-autorenew", default=None, action="store_true", + help=config_help("enroll_autorenew")) + return parser diff --git a/letsencrypt/client.py b/letsencrypt/client.py index ae1667dfa..58bdb6fda 100644 --- a/letsencrypt/client.py +++ b/letsencrypt/client.py @@ -19,6 +19,7 @@ from letsencrypt import le_util from letsencrypt import network2 from letsencrypt import reverter from letsencrypt import revoker +from letsencrypt import storage from letsencrypt.display import ops as display_ops from letsencrypt.display import enhancements @@ -67,6 +68,10 @@ class Client(object): self.config = config + # TODO: Check if self.config.enroll_autorenew is None. If + # so, set it based to the default: figure out if dv_auth is + # standalone (then default is False, otherwise default is True) + if dv_auth is not None: cont_auth = continuity_auth.ContinuityAuthenticator(config, installer) @@ -95,7 +100,7 @@ class Client(object): self.account.save() - def obtain_certificate(self, domains, csr=None): + def _obtain_certificate(self, domains, csr=None): """Obtains a certificate from the ACME server. :meth:`.register` must be called before :meth:`.obtain_certificate` @@ -108,8 +113,8 @@ class Client(object): this CSR can be different than self.authkey :type csr: :class:`CSR` - :returns: cert_key, cert_path, chain_path - :rtype: `tuple` of (:class:`letsencrypt.le_util.Key`, str, str) + :returns: cert_pem, key_pem, chain_pem + :rtype: `tuple` of (str, str, str) """ if self.auth_handler is None: @@ -136,14 +141,55 @@ class Client(object): M2Crypto.X509.load_request_der_string(csr.data)), authzr) - # Save Certificate - cert_path, chain_path = self.save_certificate( - certr, self.config.cert_path, self.config.chain_path) + cert_pem = certr.body.as_pem() + chain_pem = None + if certr.cert_chain_uri is not None: + chain_pem = self.network.fetch_chain(certr) - revoker.Revoker.store_cert_key( - cert_path, self.account.key.file, self.config) + if chain_pem is None: + # XXX: just to stop RenewableCert from complaining; this is + # probably not a good solution + chain_pem = "" + else: + chain_pem = chain_pem.as_pem() - return cert_key, cert_path, chain_path + return cert_pem, cert_key.pem, chain_pem + + def obtain_and_enroll_certificate(self, domains, authenticator, installer, + plugins, csr=None): + """Get a new certificate for the specified domains using the specified + authenticator and installer, and then create a new renewable lineage + containing it. + + :param list domains: Domains to request. + :param authenticator: The authenticator to use. + :type authenticator: :class:`letsencrypt.interfaces.IAuthenticator` + + :param installer: The installer to use. + :type installer: :class:`letsencrypt.interfaces.IInstaller` + + :param plugins: A PluginsFactory object. + + :param str csr: A preexisting CSR to use with this request. + + :returns: A new :class:`letsencrypt.storage.RenewableCert` instance + referred to the enrolled cert lineage, or False if the cert could + not be obtained. + """ + # TODO: fully identify object types in docstring. + cert_pem, privkey, chain_pem = self._obtain_certificate(domains, csr) + self.config.namespace.authenticator = plugins.find_init( + authenticator).name + if installer is not None: + self.config.namespace.installer = plugins.find_init(installer).name + return storage.RenewableCert.new_lineage(domains[0], cert_pem, + privkey, chain_pem, + vars(self.config.namespace)) + + def obtain_certificate(self, domains): + """Public method to obtain a certificate for the specified domains + using this client object. Returns the tuple (cert, privkey, chain).""" + return self._obtain_certificate(domains, None) def save_certificate(self, certr, cert_path, chain_path): # pylint: disable=no-self-use @@ -192,29 +238,32 @@ class Client(object): return os.path.abspath(act_cert_path), cert_chain_abspath - def deploy_certificate(self, domains, privkey, cert_path, chain_path=None): + def deploy_certificate(self, domains, lineage): """Install certificate :param list domains: list of domains to install the certificate - :param privkey: private key for certificate - :type privkey: :class:`letsencrypt.le_util.Key` - - :param str cert_path: certificate file path - :param str chain_path: chain file path - + :param lineage: RenewableCert object representing the certificate + :type lineage: :class:`letsencrypt.storage.RenewableCert` """ if self.installer is None: logging.warning("No installer specified, client is unable to deploy" "the certificate") raise errors.LetsEncryptClientError("No installer available") - chain_path = None if chain_path is None else os.path.abspath(chain_path) + # TODO: Is it possible not to have a chain at all? (The + # RenewableCert class currently doesn't support this case, but + # perhaps the CA can issue according to ACME without providing + # a chain, which would currently be a problem for instantiating + # RenewableCert, and subsequently also for this method.) for dom in domains: - self.installer.deploy_cert( - dom, os.path.abspath(cert_path), - os.path.abspath(privkey.file), chain_path) + # TODO: Provide a fullchain reference for installers like + # nginx that want it + self.installer.deploy_cert(dom, + lineage.cert, + lineage.privkey, + lineage.chain) self.installer.save("Deployed Let's Encrypt Certificate") # sites may have been enabled / final cleanup diff --git a/letsencrypt/constants.py b/letsencrypt/constants.py index 9ff0b128c..4b3b79e36 100644 --- a/letsencrypt/constants.py +++ b/letsencrypt/constants.py @@ -1,4 +1,5 @@ """Let's Encrypt constants.""" +import configobj import logging from acme import challenges @@ -26,6 +27,18 @@ CLI_DEFAULTS = dict( """Defaults for CLI flags and `.IConfig` attributes.""" +RENEWER_DEFAULTS = configobj.ConfigObj(dict( + renewer_config_file="/etc/letsencrypt/renewer.conf", + renewal_configs_dir="/etc/letsencrypt/configs", + archive_dir="/etc/letsencrypt/archive", + live_dir="/etc/letsencrypt/live", + renewer_enabled="yes", + renew_before_expiry="30 days", + deploy_before_expiry="20 days", +)) +"""Defaults for renewer script.""" + + EXCLUSIVE_CHALLENGES = frozenset([frozenset([ challenges.DVSNI, challenges.SimpleHTTPS])]) """Mutually exclusive challenges.""" @@ -71,3 +84,7 @@ IConfig.work_dir).""" NETSTAT = "/bin/netstat" """Location of netstat binary for checking whether a listener is already running on the specified port (Linux-specific).""" + +BOULDER_TEST_MODE_CHALLENGE_PORT = 5001 +"""Port that Boulder will connect on for validations in test mode.""" + diff --git a/letsencrypt/crypto_util.py b/letsencrypt/crypto_util.py index 94617eef6..1eb565289 100644 --- a/letsencrypt/crypto_util.py +++ b/letsencrypt/crypto_util.py @@ -72,7 +72,7 @@ def init_save_csr(privkey, names, cert_dir, csrname="csr-letsencrypt.pem"): csr_pem, csr_der = make_csr(privkey.pem, names) # Save CSR - le_util.make_or_verify_dir(cert_dir, 0o755) + le_util.make_or_verify_dir(cert_dir, 0o755, os.geteuid()) csr_f, csr_filename = le_util.unique_file( os.path.join(cert_dir, csrname), 0o644) csr_f.write(csr_pem) @@ -234,42 +234,78 @@ def make_ss_cert(key_str, domains, not_before=None, return cert.as_pem() -def _request_san(req): # TODO: implement directly in PyOpenSSL! +def _pyopenssl_cert_or_req_san(cert_or_req): + """Get Subject Alternative Names from certificate or CSR using pyOpenSSL. + + .. todo:: Implement directly in PyOpenSSL! + + :param cert_or_req: Certificate or CSR. + :type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`. + + :returns: A list of Subject Alternative Names. + :rtype: list + + """ # constants based on implementation of # OpenSSL.crypto.X509Error._subjectAltNameString parts_separator = ", " part_separator = ":" extension_short_name = "subjectAltName" + if hasattr(cert_or_req, 'get_extensions'): # X509Req + extensions = cert_or_req.get_extensions() + else: # X509 + extensions = [cert_or_req.get_extension(i) + for i in xrange(cert_or_req.get_extension_count())] + # pylint: disable=protected-access,no-member label = OpenSSL.crypto.X509Extension._prefixes[OpenSSL.crypto._lib.GEN_DNS] assert parts_separator not in label prefix = label + part_separator - extensions = [ext._subjectAltNameString().split(parts_separator) - for ext in req.get_extensions() - if ext.get_short_name() == extension_short_name] + san_extensions = [ + ext._subjectAltNameString().split(parts_separator) + for ext in extensions if ext.get_short_name() == extension_short_name] # WARNING: this function assumes that no SAN can include # parts_separator, hence the split! - return [part.split(part_separator)[1] for parts in extensions + return [part.split(part_separator)[1] for parts in san_extensions for part in parts if part.startswith(prefix)] -def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM): - """Get list of Subject Alternative Names from signing request. - - :param str csr: Certificate Signing Request in PEM format (must contain - one or more subjectAlternativeNames, or the function will fail, - raising ValueError) - - :returns: List of referenced subject alternative names - :rtype: list - - """ +def _get_sans_from_cert_or_req( + cert_or_req_str, load_func, typ=OpenSSL.crypto.FILETYPE_PEM): try: - request = OpenSSL.crypto.load_certificate_request(typ, csr) + cert_or_req = load_func(typ, cert_or_req_str) except OpenSSL.crypto.Error as error: logging.exception(error) raise - return _request_san(request) + return _pyopenssl_cert_or_req_san(cert_or_req) + + +def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM): + """Get a list of Subject Alternative Names from a certificate. + + :param str csr: Certificate (encoded). + :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` + + :returns: A list of Subject Alternative Names. + :rtype: list + + """ + return _get_sans_from_cert_or_req( + cert, OpenSSL.crypto.load_certificate, typ) + + +def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM): + """Get a list of Subject Alternative Names from a CSR. + + :param str csr: CSR (encoded). + :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` + + :returns: A list of Subject Alternative Names. + :rtype: list + + """ + return _get_sans_from_cert_or_req( + csr, OpenSSL.crypto.load_certificate_request, typ) diff --git a/letsencrypt/interfaces.py b/letsencrypt/interfaces.py index 609b9410a..e154bd5d3 100644 --- a/letsencrypt/interfaces.py +++ b/letsencrypt/interfaces.py @@ -175,6 +175,11 @@ class IConfig(zope.interface.Interface): le_vhost_ext = zope.interface.Attribute( "SSL vhost configuration extension.") + + enroll_autorenew = zope.interface.Attribute( + "Register this certificate in the database to be renewed" + " automatically.") + cert_path = zope.interface.Attribute("Let's Encrypt certificate file path.") chain_path = zope.interface.Attribute("Let's Encrypt chain file path.") diff --git a/letsencrypt/le_util.py b/letsencrypt/le_util.py index 27d795749..ec0be7514 100644 --- a/letsencrypt/le_util.py +++ b/letsencrypt/le_util.py @@ -60,7 +60,7 @@ def unique_file(path, mode=0o777): :param str path: path/filename.ext :param int mode: File mode - :return: tuple of file object and file name + :returns: tuple of file object and file name """ path, tail = os.path.split(path) @@ -70,8 +70,45 @@ def unique_file(path, mode=0o777): try: file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode) return os.fdopen(file_d, "w"), fname - except OSError: - pass + except OSError as exception: + # "File exists," is okay, try a different name. + if exception.errno != errno.EEXIST: + raise + count += 1 + + +def unique_lineage_name(path, filename, mode=0o777): + """Safely finds a unique file for writing only (by default). Uses a + file lineage convention. + + :param str path: directory path + :param str filename: proposed filename + :param int mode: file mode + + :returns: tuple of file object and file name (which may be modified from + the requested one by appending digits to ensure uniqueness) + + :raises OSError: if writing files fails for an unanticipated reason, + such as a full disk or a lack of permission to write to specified + location. + + """ + fname = os.path.join(path, "%s.conf" % (filename)) + try: + file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode) + return os.fdopen(file_d, "w"), fname + except OSError as err: + if err.errno != errno.EEXIST: + raise err + count = 1 + while True: + fname = os.path.join(path, "%s-%04d.conf" % (filename, count)) + try: + file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode) + return os.fdopen(file_d, "w"), fname + except OSError as err: + if err.errno != errno.EEXIST: + raise err count += 1 diff --git a/letsencrypt/notify.py b/letsencrypt/notify.py new file mode 100644 index 000000000..b4eec938f --- /dev/null +++ b/letsencrypt/notify.py @@ -0,0 +1,30 @@ +"""Send e-mail notification to system administrators.""" + +import email +import smtplib +import socket +import subprocess + + +def notify(subject, whom, what): + """Try to notify the addressee (whom) by e-mail, with Subject: + defined by subject and message body by what.""" + msg = email.message_from_string(what) + msg.add_header("From", "Let's Encrypt renewal agent ") + msg.add_header("To", whom) + msg.add_header("Subject", subject) + msg = msg.as_string() + try: + lmtp = smtplib.LMTP() + lmtp.connect() + lmtp.sendmail("root", [whom], msg) + except (smtplib.SMTPHeloError, smtplib.SMTPRecipientsRefused, + smtplib.SMTPSenderRefused, smtplib.SMTPDataError, socket.error): + # We should try using /usr/sbin/sendmail in this case + try: + proc = subprocess.Popen(["/usr/sbin/sendmail", "-t"], + stdin=subprocess.PIPE) + proc.communicate(msg) + except OSError: + return False + return True diff --git a/letsencrypt/plugins/standalone/authenticator.py b/letsencrypt/plugins/standalone/authenticator.py index 7d5c3ea6e..3947c1e3e 100644 --- a/letsencrypt/plugins/standalone/authenticator.py +++ b/letsencrypt/plugins/standalone/authenticator.py @@ -15,6 +15,7 @@ import zope.interface from acme import challenges from letsencrypt import achallenges +from letsencrypt import constants from letsencrypt import interfaces from letsencrypt.plugins import common @@ -378,7 +379,10 @@ class StandaloneAuthenticator(common.Plugin): results_if_failure.append(False) if not self.tasks: raise ValueError("nothing for .perform() to do") - if self.already_listening(challenges.DVSNI.PORT): + port = challenges.DVSNI.PORT + if self.config and self.config.test_mode: + port = constants.BOULDER_TEST_MODE_CHALLENGE_PORT + if self.already_listening(port): # If we know a process is already listening on this port, # tell the user, and don't even attempt to bind it. (This # test is Linux-specific and won't indicate that the port @@ -386,7 +390,7 @@ class StandaloneAuthenticator(common.Plugin): return results_if_failure # Try to do the authentication; note that this creates # the listener subprocess via os.fork() - if self.start_listener(challenges.DVSNI.PORT, key): + if self.start_listener(port, key): return results_if_success else: # TODO: This should probably raise a DVAuthError exception diff --git a/letsencrypt/renewer.py b/letsencrypt/renewer.py new file mode 100644 index 000000000..c436c2ccd --- /dev/null +++ b/letsencrypt/renewer.py @@ -0,0 +1,140 @@ +"""Renewer tool to handle autorenewal and autodeployment of renewed +certs within lineages of successor certificates, according to +configuration.""" + +# TODO: sanity checking consistency, validity, freshness? + +# TODO: call new installer API to restart servers after deployment + +import copy +import os + +import configobj + +from letsencrypt import configuration +from letsencrypt import constants +from letsencrypt import client +from letsencrypt import crypto_util +from letsencrypt import notify +from letsencrypt import storage +from letsencrypt.plugins import disco as plugins_disco + + +class AttrDict(dict): + """A trick to allow accessing dictionary keys as object + attributes.""" + def __init__(self, *args, **kwargs): + super(AttrDict, self).__init__(*args, **kwargs) + self.__dict__ = self + + +def renew(cert, old_version): + """Perform automated renewal of the referenced cert, if possible. + + :param class:`letsencrypt.storage.RenewableCert` cert: the certificate + lineage to attempt to renew. + :param int old_version: the version of the certificate lineage relative + to which the renewal should be attempted. + + :returns: int referring to newly created version of this cert lineage, + or False if renewal was not successful.""" + + # TODO: handle partial success (some names can be renewed but not + # others) + # TODO: handle obligatory key rotation vs. optional key rotation vs. + # requested key rotation + if "renewalparams" not in cert.configfile: + # TODO: notify user? + return False + renewalparams = cert.configfile["renewalparams"] + if "authenticator" not in renewalparams: + # TODO: notify user? + return False + # Instantiate the appropriate authenticator + plugins = plugins_disco.PluginsRegistry.find_all() + config = configuration.NamespaceConfig(AttrDict(renewalparams)) + # XXX: this loses type data (for example, the fact that key_size + # was an int, not a str) + config.rsa_key_size = int(config.rsa_key_size) + try: + authenticator = plugins[renewalparams["authenticator"]] + except KeyError: + # TODO: Notify user? (authenticator could not be found) + return False + authenticator = authenticator.init(config) + + authenticator.prepare() + account = client.determine_account(config) + # TODO: are there other ways to get the right account object, e.g. + # based on the email parameter that might be present in + # renewalparams? + + our_client = client.Client(config, account, authenticator, None) + with open(cert.version("cert", old_version)) as f: + sans = crypto_util.get_sans_from_cert(f.read()) + new_cert, new_key, new_chain = our_client.obtain_certificate(sans) + if new_cert and new_key and new_chain: + # XXX: Assumes that there was no key change. We need logic + # for figuring out whether there was or not. Probably + # best is to have obtain_certificate return None for + # new_key if the old key is to be used (since save_successor + # already understands this distinction!) + return cert.save_successor(old_version, new_cert, new_key, new_chain) + # TODO: Notify results + else: + # TODO: Notify negative results + return False + # TODO: Consider the case where the renewal was partially successful + # (where fewer than all names were renewed) + + +def main(config=None): + """main function for autorenewer script.""" + # TODO: Distinguish automated invocation from manual invocation, + # perhaps by looking at sys.argv[0] and inhibiting automated + # invocations if /etc/letsencrypt/renewal.conf defaults have + # turned it off. (The boolean parameter should probably be + # called renewer_enabled.) + + # Merge supplied config, if provided, on top of builtin defaults + defaults_copy = copy.deepcopy(constants.RENEWER_DEFAULTS) + defaults_copy.merge(config if config is not None else configobj.ConfigObj()) + config = defaults_copy + # Now attempt to read the renewer config file and augment or replace + # the renewer defaults with any options contained in that file. If + # renewer_config_file is undefined or if the file is nonexistent or + # empty, this .merge() will have no effect. TODO: when we have a more + # elaborate renewer command line, we will presumably also be able to + # specify a config file on the command line, which, if provided, should + # take precedence over this one. + config.merge(configobj.ConfigObj(config.get("renewer_config_file", ""))) + + for i in os.listdir(config["renewal_configs_dir"]): + print "Processing", i + if not i.endswith(".conf"): + continue + rc_config = configobj.ConfigObj( + os.path.join(config["renewal_configs_dir"], i)) + try: + cert = storage.RenewableCert(rc_config) + except ValueError: + # This indicates an invalid renewal configuration file, such + # as one missing a required parameter (in the future, perhaps + # also one that is internally inconsistent or is missing a + # required parameter). As a TODO, maybe we should warn the + # user about the existence of an invalid or corrupt renewal + # config rather than simply ignoring it. + continue + if cert.should_autodeploy(): + cert.update_all_links_to(cert.latest_common_version()) + # TODO: restart web server (invoke IInstaller.restart() method) + notify.notify("Autodeployed a cert!!!", "root", "It worked!") + # TODO: explain what happened + if cert.should_autorenew(): + # Note: not cert.current_version() because the basis for + # the renewal is the latest version, even if it hasn't been + # deployed yet! + old_version = cert.latest_common_version() + renew(cert, old_version) + notify.notify("Autorenewed a cert!!!", "root", "It worked!") + # TODO: explain what happened diff --git a/letsencrypt/storage.py b/letsencrypt/storage.py new file mode 100644 index 000000000..1fb17c561 --- /dev/null +++ b/letsencrypt/storage.py @@ -0,0 +1,641 @@ +"""The RenewableCert class, representing renewable lineages of +certificates and storing the associated cert data and metadata.""" + +import copy +import datetime +import os +import re +import time + +import configobj +import OpenSSL +import parsedatetime +import pytz +import pyrfc3339 + +from letsencrypt import constants +from letsencrypt import le_util + +ALL_FOUR = ("cert", "privkey", "chain", "fullchain") + + +def parse_time_interval(interval, textparser=parsedatetime.Calendar()): + """Parse the time specified time interval. + + The interval can be in the English-language format understood by + parsedatetime, e.g., '10 days', '3 weeks', '6 months', '9 hours', + or a sequence of such intervals like '6 months 1 week' or '3 days + 12 hours'. If an integer is found with no associated unit, it is + interpreted by default as a number of days. + + :param str interval: the time interval to parse. + + :returns: the interpretation of the time interval. + :rtype: :class:`datetime.timedelta`""" + + if interval.strip().isdigit(): + interval += " days" + return datetime.timedelta(0, time.mktime(textparser.parse( + interval, time.localtime(0))[0])) + + +class RenewableCert(object): # pylint: disable=too-many-instance-attributes + """Represents a lineage of certificates that is under the management + of the Let's Encrypt client, indicated by the existence of an + associated renewal configuration file. + + Note that the notion of "current version" for a lineage is maintained + on disk in the structure of symbolic links, and is not explicitly + stored in any instance variable in this object. The RenewableCert + object is able to determine information about the current (or other) + version by accessing data on disk, but does not inherently know any + of this information except by examining the symbolic links as needed. + The instance variables mentioned below point to symlinks that reflect + the notion of "current version" of each managed object, and it is + these paths that should be used when configuring servers to use the + certificate managed in a lineage. These paths are normally within + the "live" directory, and their symlink targets -- the actual cert + files -- are normally found within the "archive" directory. + + :ivar cert: The path to the symlink representing the current version + of the certificate managed by this lineage. + :type cert: str + + :ivar privkey: The path to the symlink representing the current version + of the private key managed by this lineage. + :type privkey: str + + :ivar chain: The path to the symlink representing the current version + of the chain managed by this lineage. + :type chain: str + + :ivar fullchain: The path to the symlink representing the current version + of the fullchain (combined chain and cert) managed by this lineage. + :type fullchain: str + + :ivar configuration: The renewal configuration options associated with + this lineage, obtained from parsing the renewal configuration file + and/or systemwide defaults. + :type configuration: :class:`configobj.ConfigObj`""" + + def __init__(self, configfile, config_opts=None): + """Instantiate a RenewableCert object from an existing lineage. + + :param :class:`configobj.ConfigObj` configfile: an already-parsed + ConfigObj object made from reading the renewal config file that + defines this lineage. + :param :class:`configobj.ConfigObj` config_opts: systemwide defaults + for renewal properties not otherwise specified in the individual + renewal config file. + + :raises ValueError: if the configuration file's name didn't end in + ".conf", or the file is missing or broken. + :raises TypeError: if the provided renewal configuration isn't a + ConfigObj object.""" + + if isinstance(configfile, configobj.ConfigObj): + if not os.path.basename(configfile.filename).endswith(".conf"): + raise ValueError("renewal config file name must end in .conf") + self.lineagename = os.path.basename( + configfile.filename)[:-len(".conf")] + else: + raise TypeError("RenewableCert config must be ConfigObj object") + + # self.configuration should be used to read parameters that + # may have been chosen based on default values from the + # systemwide renewal configuration; self.configfile should be + # used to make and save changes. + self.configfile = configfile + # TODO: Do we actually use anything from defaults and do we want to + # read further defaults from the systemwide renewal configuration + # file at this stage? + defaults_copy = copy.deepcopy(constants.RENEWER_DEFAULTS) + defaults_copy.merge(config_opts if config_opts is not None + else configobj.ConfigObj()) + self.configuration = defaults_copy + self.configuration.merge(self.configfile) + + if not all(x in self.configuration for x in ALL_FOUR): + raise ValueError("renewal config file {0} is missing a required " + "file reference".format(configfile)) + + self.cert = self.configuration["cert"] + self.privkey = self.configuration["privkey"] + self.chain = self.configuration["chain"] + self.fullchain = self.configuration["fullchain"] + + def consistent(self): + """Are the files associated with this lineage self-consistent? + + :returns: whether the files stored in connection with this + lineage appear to be correct and consistent with one another. + :rtype: bool""" + + # Each element must be referenced with an absolute path + if any(not os.path.isabs(x) for x in + (self.cert, self.privkey, self.chain, self.fullchain)): + return False + + # Each element must exist and be a symbolic link + if any(not os.path.islink(x) for x in + (self.cert, self.privkey, self.chain, self.fullchain)): + return False + for kind in ALL_FOUR: + link = getattr(self, kind) + where = os.path.dirname(link) + target = os.readlink(link) + if not os.path.isabs(target): + target = os.path.join(where, target) + + # Each element's link must point within the cert lineage's + # directory within the official archive directory + desired_directory = os.path.join( + self.configuration["archive_dir"], self.lineagename) + if not os.path.samefile(os.path.dirname(target), + desired_directory): + return False + + # The link must point to a file that exists + if not os.path.exists(target): + return False + + # The link must point to a file that follows the archive + # naming convention + pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind)) + if not pattern.match(os.path.basename(target)): + return False + + # It is NOT required that the link's target be a regular + # file (it may itself be a symlink). But we should probably + # do a recursive check that ultimately the target does + # exist? + # XXX: Additional possible consistency checks (e.g. + # cryptographic validation of the chain being a chain, + # the chain matching the cert, and the cert matching + # the subject key) + # XXX: All four of the targets are in the same directory + # (This check is redundant with the check that they + # are all in the desired directory!) + # len(set(os.path.basename(self.current_target(x) + # for x in ALL_FOUR))) == 1 + return True + + def fix(self): + """Attempt to fix defects or inconsistencies in this lineage. + (Currently unimplemented.)""" + # TODO: Figure out what kinds of fixes are possible. For + # example, checking if there is a valid version that + # we can update the symlinks to. (Maybe involve + # parsing keys and certs to see if they exist and + # if a key corresponds to the subject key of a cert?) + + # TODO: In general, the symlink-reading functions below are not + # cautious enough about the possibility that links or their + # targets may not exist. (This shouldn't happen, but might + # happen as a result of random tampering by a sysadmin, or + # filesystem errors, or crashes.) + + def current_target(self, kind): + """Returns full path to which the specified item currently points. + + :param str kind: the lineage member item ("cert", "privkey", + "chain", or "fullchain") + + :returns: the path to the current version of the specified member. + :rtype: str""" + + if kind not in ALL_FOUR: + raise ValueError("unknown kind of item") + link = getattr(self, kind) + if not os.path.exists(link): + return None + target = os.readlink(link) + if not os.path.isabs(target): + target = os.path.join(os.path.dirname(link), target) + return target + + def current_version(self, kind): + """Returns numerical version of the specified item. + + For example, if kind + is "chain" and the current chain link points to a file named + "chain7.pem", returns the integer 7. + + :param str kind: the lineage member item ("cert", "privkey", + "chain", or "fullchain") + + :returns: the current version of the specified member. + :rtype: int""" + + if kind not in ALL_FOUR: + raise ValueError("unknown kind of item") + pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind)) + target = self.current_target(kind) + if target is None or not os.path.exists(target): + target = "" + matches = pattern.match(os.path.basename(target)) + if matches: + return int(matches.groups()[0]) + else: + return None + + def version(self, kind, version): + """The filename that corresponds to the specified version and kind. + + Warning: the specified version may not exist in this lineage. There + is no guarantee that the file path returned by this method actually + exists. + + :param str kind: the lineage member item ("cert", "privkey", + "chain", or "fullchain") + :param int version: the desired version + + :returns: the path to the specified version of the specified member. + :rtype: str""" + + if kind not in ALL_FOUR: + raise ValueError("unknown kind of item") + where = os.path.dirname(self.current_target(kind)) + return os.path.join(where, "{0}{1}.pem".format(kind, version)) + + def available_versions(self, kind): + """Which alternative versions of the specified kind of item exist? + + The archive directory where the current version is stored is + consulted to obtain the list of alternatives. + + :param str kind: the lineage member item ("cert", "privkey", + "chain", or "fullchain") + + :returns: all of the version numbers that currently exist + :rtype: list of int""" + + if kind not in ALL_FOUR: + raise ValueError("unknown kind of item") + where = os.path.dirname(self.current_target(kind)) + files = os.listdir(where) + pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind)) + matches = [pattern.match(f) for f in files] + return sorted([int(m.groups()[0]) for m in matches if m]) + + def newest_available_version(self, kind): + """What is the newest available version of the specified kind of item? + + :param str kind: the lineage member item ("cert", "privkey", + "chain", or "fullchain") + + :returns: the newest available version of this member + :rtype: int""" + + return max(self.available_versions(kind)) + + def latest_common_version(self): + """What is the newest version for which all items are available? + + :returns: the newest available version for which all members (cert, + privkey, chain, and fullchain) exist + :rtype: int""" + + # TODO: this can raise ValueError if there is no version overlap + # (it should probably return None instead) + # TODO: this can raise a spurious AttributeError if the current + # link for any kind is missing (it should probably return None) + versions = [self.available_versions(x) for x in ALL_FOUR] + return max(n for n in versions[0] if all(n in v for v in versions[1:])) + + def next_free_version(self): + """What is the smallest version newer than all full or partial versions? + + :returns: the smallest version number that is larger than any version + of any item currently stored in this lineage + :rtype: int + """ + + # TODO: consider locking/mutual exclusion between updating processes + # This isn't self.latest_common_version() + 1 because we don't want + # collide with a version that might exist for one file type but not + # for the others. + return max(self.newest_available_version(x) for x in ALL_FOUR) + 1 + + def has_pending_deployment(self): + """Is there a later version of all of the managed items? + + :returns: True if there is a complete version of this lineage with + a larger version number than the current version, and False + otherwise + :rtype: bool""" + + # TODO: consider whether to assume consistency or treat + # inconsistent/consistent versions differently + smallest_current = min(self.current_version(x) for x in ALL_FOUR) + return smallest_current < self.latest_common_version() + + def update_link_to(self, kind, version): + """Make the specified item point at the specified version. + + (Note that this method doesn't verify that the specified version + exists.) + + :param str kind: the lineage member item ("cert", "privkey", + "chain", or "fullchain") + :param int version: the desired version""" + + if kind not in ALL_FOUR: + raise ValueError("unknown kind of item") + link = getattr(self, kind) + filename = "{0}{1}.pem".format(kind, version) + # Relative rather than absolute target directory + target_directory = os.path.dirname(os.readlink(link)) + # TODO: it could be safer to make the link first under a temporary + # filename, then unlink the old link, then rename the new link + # to the old link; this ensures that this process is able to + # create symlinks. + # TODO: we might also want to check consistency of related links + # for the other corresponding items + os.unlink(link) + os.symlink(os.path.join(target_directory, filename), link) + + def update_all_links_to(self, version): + """Change all member objects to point to the specified version. + + :param int version: the desired version""" + + for kind in ALL_FOUR: + self.update_link_to(kind, version) + + def _notafterbefore(self, method, version): + """Internal helper function for finding notbefore/notafter.""" + if version is None: + target = self.current_target("cert") + else: + target = self.version("cert", version) + pem = open(target).read() + x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + pem) + i = method(x509) + return pyrfc3339.parse(i[0:4] + "-" + i[4:6] + "-" + i[6:8] + "T" + + i[8:10] + ":" + i[10:12] + ":" + i[12:]) + + def notbefore(self, version=None): + """When does the specified cert version start being valid? + + (If no version is specified, use the current version.) + + :param int version: the desired version number + + :returns: the notBefore value from the specified cert version in this + lineage + :rtype: :class:`datetime.datetime`""" + + return self._notafterbefore(lambda x509: x509.get_notBefore(), version) + + def notafter(self, version=None): + """When does the specified cert version stop being valid? + + (If no version is specified, use the current version.) + + :param int version: the desired version number + + :returns: the notAfter value from the specified cert version in this + lineage + :rtype: :class:`datetime.datetime`""" + + return self._notafterbefore(lambda x509: x509.get_notAfter(), version) + + def should_autodeploy(self): + """Should this lineage now automatically deploy a newer version? + + This is a policy question and does not only depend on whether there + is a newer version of the cert. (This considers whether autodeployment + is enabled, whether a relevant newer version exists, and whether the + time interval for autodeployment has been reached.) + + :returns: whether the lineage now ought to autodeploy an existing + newer cert version + :rtype: bool""" + + if ("autodeploy" not in self.configuration or + self.configuration.as_bool("autodeploy")): + if self.has_pending_deployment(): + interval = self.configuration.get("deploy_before_expiry", + "5 days") + autodeploy_interval = parse_time_interval(interval) + expiry = self.notafter() + now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) + remaining = expiry - now + if remaining < autodeploy_interval: + return True + return False + + def ocsp_revoked(self, version=None): + # pylint: disable=no-self-use,unused-argument + """Is the specified cert version revoked according to OCSP? + + Also returns True if the cert version is declared as intended to be + revoked according to Let's Encrypt OCSP extensions. (If no version + is specified, uses the current version.) + + This method is not yet implemented and currently always returns False. + + :param int version: the desired version number + + :returns: whether the certificate is or will be revoked + :rtype: bool""" + + # XXX: This query and its associated network service aren't + # implemented yet, so we currently return False (indicating that the + # certificate is not revoked). + return False + + def should_autorenew(self): + """Should we now try to autorenew the most recent cert version? + + This is a policy question and does not only depend on whether the + cert is expired. (This considers whether autorenewal is enabled, + whether the cert is revoked, and whether the time interval for + autorenewal has been reached.) + + Note that this examines the numerically most recent cert version, + not the currently deployed version. + + :returns: whether an attempt should now be made to autorenew the + most current cert version in this lineage + :rtype: bool""" + + if ("autorenew" not in self.configuration + or self.configuration.as_bool("autorenew")): + # Consider whether to attempt to autorenew this cert now + + # Renewals on the basis of revocation + if self.ocsp_revoked(self.latest_common_version()): + return True + + # Renewals on the basis of expiry time + interval = self.configuration.get("renew_before_expiry", "10 days") + autorenew_interval = parse_time_interval(interval) + expiry = self.notafter(self.latest_common_version()) + now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) + remaining = expiry - now + if remaining < autorenew_interval: + return True + return False + + @classmethod + def new_lineage(cls, lineagename, cert, privkey, chain, + renewalparams=None, config=None): + # pylint: disable=too-many-locals,too-many-arguments + """Create a new certificate lineage. + + Attempts to create a certificate lineage -- enrolled for potential + future renewal -- with the (suggested) lineage name lineagename, + and the associated cert, privkey, and chain (the associated + fullchain will be created automatically). Optional configurator + and renewalparams record the configuration that was originally + used to obtain this cert, so that it can be reused later during + automated renewal. + + Returns a new RenewableCert object referring to the created + lineage. (The actual lineage name, as well as all the relevant + file paths, will be available within this object.) + + :param str lineagename: the suggested name for this lineage + (normally the current cert's first subject DNS name) + :param str cert: the initial certificate version in PEM format + :param str privkey: the private key in PEM format + :param str chain: the certificate chain in PEM format + :param :class:`configobj.ConfigObj` renewalparams: parameters that + should be used when instantiating authenticator and installer + objects in the future to attempt to renew this cert or deploy + new versions of it + :param :class:`configobj.ConfigObj` config: renewal configuration + defaults, affecting, for example, the locations of the + directories where the associated files will be saved + + :returns: the newly-created RenewalCert object + :rtype: :class:`storage.renewableCert`""" + + defaults_copy = copy.deepcopy(constants.RENEWER_DEFAULTS) + defaults_copy.merge(config if config is not None + else configobj.ConfigObj()) + config = defaults_copy + # This attempts to read the renewer config file and augment or replace + # the renewer defaults with any options contained in that file. If + # renewer_config_file is undefined or if the file is nonexistent or + # empty, this .merge() will have no effect. + config.merge(configobj.ConfigObj(config.get("renewer_config_file", ""))) + + # Examine the configuration and find the new lineage's name + configs_dir = config["renewal_configs_dir"] + archive_dir = config["archive_dir"] + live_dir = config["live_dir"] + for i in (configs_dir, archive_dir, live_dir): + if not os.path.exists(i): + os.makedirs(i, 0700) + config_file, config_filename = le_util.unique_lineage_name(configs_dir, + lineagename) + if not config_filename.endswith(".conf"): + raise ValueError("renewal config file name must end in .conf") + + # Determine where on disk everything will go + # lineagename will now potentially be modified based on which + # renewal configuration file could actually be created + lineagename = os.path.basename(config_filename)[:-len(".conf")] + archive = os.path.join(archive_dir, lineagename) + live_dir = os.path.join(live_dir, lineagename) + if os.path.exists(archive): + raise ValueError("archive directory exists for " + lineagename) + if os.path.exists(live_dir): + raise ValueError("live directory exists for " + lineagename) + os.mkdir(archive) + os.mkdir(live_dir) + relative_archive = os.path.join("..", "..", "archive", lineagename) + + # Put the data into the appropriate files on disk + target = dict([(kind, os.path.join(live_dir, kind + ".pem")) + for kind in ALL_FOUR]) + for kind in ALL_FOUR: + os.symlink(os.path.join(relative_archive, kind + "1.pem"), + target[kind]) + with open(target["cert"], "w") as f: + f.write(cert) + with open(target["privkey"], "w") as f: + f.write(privkey) + # XXX: Let's make sure to get the file permissions right here + with open(target["chain"], "w") as f: + f.write(chain) + with open(target["fullchain"], "w") as f: + f.write(cert + chain) + + # Document what we've done in a new renewal config file + config_file.close() + new_config = configobj.ConfigObj(config_filename, create_empty=True) + for kind in ALL_FOUR: + new_config[kind] = target[kind] + if renewalparams: + new_config["renewalparams"] = renewalparams + new_config.comments["renewalparams"] = ["", + "Options and defaults used" + " in the renewal process"] + # TODO: add human-readable comments explaining other available + # parameters + new_config.write() + return cls(new_config, config) + + + def save_successor(self, prior_version, new_cert, new_privkey, new_chain): + """Save new cert and chain as a successor of a prior version. + + Returns the new version number that was created. Note: does NOT + update links to deploy this version. + + :param int prior_version: the old version to which this version is + regarded as a successor (used to choose a privkey, if the key + has not changed, but otherwise this information is not permanently + recorded anywhere) + :param str new_cert: the new certificate, in PEM format + :param str new_privkey: the new private key, in PEM format, or None, + if the private key has not changed + :param str new_chain: the new chain, in PEM format + + :returns: the new version number that was created + :rtype: int""" + + # XXX: assumes official archive location rather than examining links + # XXX: consider using os.open for availablity of os.O_EXCL + # XXX: ensure file permissions are correct; also create directories + # if needed (ensuring their permissions are correct) + # Figure out what the new version is and hence where to save things + + target_version = self.next_free_version() + archive = self.configuration["archive_dir"] + prefix = os.path.join(archive, self.lineagename) + target = dict( + [(kind, + os.path.join(prefix, "{0}{1}.pem".format(kind, target_version))) + for kind in ALL_FOUR]) + + # Distinguish the cases where the privkey has changed and where it + # has not changed (in the latter case, making an appropriate symlink + # to an earlier privkey version) + if new_privkey is None: + # The behavior below keeps the prior key by creating a new + # symlink to the old key or the target of the old key symlink. + old_privkey = os.path.join( + prefix, "privkey{0}.pem".format(prior_version)) + if os.path.islink(old_privkey): + old_privkey = os.readlink(old_privkey) + else: + old_privkey = "privkey{0}.pem".format(prior_version) + os.symlink(old_privkey, target["privkey"]) + else: + with open(target["privkey"], "w") as f: + f.write(new_privkey) + + # Save everything else + with open(target["cert"], "w") as f: + f.write(new_cert) + with open(target["chain"], "w") as f: + f.write(new_chain) + with open(target["fullchain"], "w") as f: + f.write(new_cert + new_chain) + return target_version diff --git a/letsencrypt/tests/crypto_util_test.py b/letsencrypt/tests/crypto_util_test.py index 92cb4014b..e2d996640 100644 --- a/letsencrypt/tests/crypto_util_test.py +++ b/letsencrypt/tests/crypto_util_test.py @@ -15,6 +15,10 @@ RSA256_KEY = pkg_resources.resource_string( 'acme.jose', os.path.join('testdata', 'rsa256_key.pem')) RSA512_KEY = pkg_resources.resource_string( 'acme.jose', os.path.join('testdata', 'rsa512_key.pem')) +CERT = pkg_resources.resource_string( + 'letsencrypt.tests', os.path.join('testdata', 'cert.pem')) +SAN_CERT = pkg_resources.resource_string( + 'letsencrypt.tests', os.path.join('testdata', 'cert-san.pem')) class InitSaveKeyTest(unittest.TestCase): @@ -68,6 +72,27 @@ class InitSaveCSRTest(unittest.TestCase): self.assertEqual(csr.data, 'csr_der') self.assertTrue('csr-letsencrypt.pem' in csr.file) + +class MakeCSRTest(unittest.TestCase): + """Tests for letsencrypt.crypto_util.make_csr.""" + + @classmethod + def _call(cls, *args, **kwargs): + from letsencrypt.crypto_util import make_csr + return make_csr(*args, **kwargs) + + def test_san(self): + from letsencrypt.crypto_util import get_sans_from_csr + # TODO: Fails for RSA256_KEY + csr_pem, csr_der = self._call( + RSA512_KEY, ['example.com', 'www.example.com']) + self.assertEqual( + ['example.com', 'www.example.com'], get_sans_from_csr(csr_pem)) + self.assertEqual( + ['example.com', 'www.example.com'], get_sans_from_csr( + csr_der, OpenSSL.crypto.FILETYPE_ASN1)) + + class ValidCSRTest(unittest.TestCase): """Tests for letsencrypt.crypto_util.valid_csr.""" @@ -151,7 +176,26 @@ class MakeSSCertTest(unittest.TestCase): make_ss_cert(RSA512_KEY, ['example.com', 'www.example.com']) -class GetSansFromCsrTest(unittest.TestCase): +class GetSANsFromCertTest(unittest.TestCase): + """Tests for letsencrypt.crypto_util.get_sans_from_cert.""" + + @classmethod + def _call(cls, *args, **kwargs): + from letsencrypt.crypto_util import get_sans_from_cert + return get_sans_from_cert(*args, **kwargs) + + def test_single(self): + self.assertEqual([], self._call(pkg_resources.resource_string( + __name__, os.path.join('testdata', 'cert.pem')))) + + def test_san(self): + self.assertEqual( + ['example.com', 'www.example.com'], + self._call(pkg_resources.resource_string( + __name__, os.path.join('testdata', 'cert-san.pem')))) + + +class GetSANsFromCSRTest(unittest.TestCase): """Tests for letsencrypt.crypto_util.get_sans_from_csr.""" def test_extract_one_san(self): from letsencrypt.crypto_util import get_sans_from_csr diff --git a/letsencrypt/tests/le_util_test.py b/letsencrypt/tests/le_util_test.py index c9da155c5..3f5f08c4c 100644 --- a/letsencrypt/tests/le_util_test.py +++ b/letsencrypt/tests/le_util_test.py @@ -1,4 +1,5 @@ """Tests for letsencrypt.le_util.""" +import errno import os import shutil import stat @@ -78,7 +79,7 @@ class CheckPermissionsTest(unittest.TestCase): class UniqueFileTest(unittest.TestCase): - """Tests for letsencrypt.class.le_util.unique_file.""" + """Tests for letsencrypt.le_util.unique_file.""" def setUp(self): self.root_path = tempfile.mkdtemp() @@ -122,5 +123,45 @@ class UniqueFileTest(unittest.TestCase): self.assertTrue(basename3.endswith('foo.txt')) +class UniqueLineageNameTest(unittest.TestCase): + """Tests for letsencrypt.le_util.unique_lineage_name.""" + + def setUp(self): + self.root_path = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.root_path, ignore_errors=True) + + def _call(self, filename, mode=0o777): + from letsencrypt.le_util import unique_lineage_name + return unique_lineage_name(self.root_path, filename, mode) + + def test_basic(self): + f, name = self._call("wow") + self.assertTrue(isinstance(f, file)) + self.assertTrue(isinstance(name, str)) + + def test_multiple(self): + for _ in range(10): + f, name = self._call("wow") + self.assertTrue(isinstance(f, file)) + self.assertTrue(isinstance(name, str)) + self.assertTrue("wow-0009.conf" in name) + + @mock.patch("letsencrypt.le_util.os.fdopen") + def test_failure(self, mock_fdopen): + err = OSError("whoops") + err.errno = errno.EIO + mock_fdopen.side_effect = err + self.assertRaises(OSError, self._call, "wow") + + @mock.patch("letsencrypt.le_util.os.fdopen") + def test_subsequent_failure(self, mock_fdopen): + self._call("wow") + err = OSError("whoops") + err.errno = errno.EIO + mock_fdopen.side_effect = err + self.assertRaises(OSError, self._call, "wow") + if __name__ == '__main__': unittest.main() # pragma: no cover diff --git a/letsencrypt/tests/notify_test.py b/letsencrypt/tests/notify_test.py new file mode 100644 index 000000000..c6f76c42e --- /dev/null +++ b/letsencrypt/tests/notify_test.py @@ -0,0 +1,51 @@ +"""Tests for letsencrypt/notify.py""" + +import mock +import socket +import unittest + +class NotifyTests(unittest.TestCase): + """Tests for the notifier.""" + + @mock.patch("letsencrypt.notify.smtplib.LMTP") + def test_smtp_success(self, mock_lmtp): + from letsencrypt.notify import notify + lmtp_obj = mock.MagicMock() + mock_lmtp.return_value = lmtp_obj + self.assertTrue(notify("Goose", "auntrhody@example.com", + "The old grey goose is dead.")) + self.assertEqual(lmtp_obj.connect.call_count, 1) + self.assertEqual(lmtp_obj.sendmail.call_count, 1) + + @mock.patch("letsencrypt.notify.smtplib.LMTP") + @mock.patch("letsencrypt.notify.subprocess.Popen") + def test_smtp_failure(self, mock_popen, mock_lmtp): + from letsencrypt.notify import notify + lmtp_obj = mock.MagicMock() + mock_lmtp.return_value = lmtp_obj + lmtp_obj.sendmail.side_effect = socket.error(17) + proc = mock.MagicMock() + mock_popen.return_value = proc + self.assertTrue(notify("Goose", "auntrhody@example.com", + "The old grey goose is dead.")) + self.assertEqual(lmtp_obj.sendmail.call_count, 1) + self.assertEqual(proc.communicate.call_count, 1) + + @mock.patch("letsencrypt.notify.smtplib.LMTP") + @mock.patch("letsencrypt.notify.subprocess.Popen") + def test_everything_fails(self, mock_popen, mock_lmtp): + from letsencrypt.notify import notify + lmtp_obj = mock.MagicMock() + mock_lmtp.return_value = lmtp_obj + lmtp_obj.sendmail.side_effect = socket.error(17) + proc = mock.MagicMock() + mock_popen.return_value = proc + proc.communicate.side_effect = OSError("What we have here is a " + "failure to communicate.") + self.assertFalse(notify("Goose", "auntrhody@example.com", + "The old grey goose is dead.")) + self.assertEqual(lmtp_obj.sendmail.call_count, 1) + self.assertEqual(proc.communicate.call_count, 1) + +if __name__ == "__main__": + unittest.main() # pragma: no cover diff --git a/letsencrypt/tests/renewer_test.py b/letsencrypt/tests/renewer_test.py new file mode 100644 index 000000000..4f1fe6bfc --- /dev/null +++ b/letsencrypt/tests/renewer_test.py @@ -0,0 +1,650 @@ +"""Tests for letsencrypt/renewer.py""" + +import datetime +import os +import tempfile +import pkg_resources +import shutil +import unittest + +import configobj +import mock +import pytz + +from letsencrypt.storage import ALL_FOUR + +def unlink_all(rc_object): + """Unlink all four items associated with this RenewableCert. + (Helper function.)""" + for kind in ALL_FOUR: + os.unlink(getattr(rc_object, kind)) + +def fill_with_sample_data(rc_object): + """Put dummy data into all four files of this RenewableCert. + (Helper function.)""" + for kind in ALL_FOUR: + with open(getattr(rc_object, kind), "w") as f: + f.write(kind) + +class RenewableCertTests(unittest.TestCase): + # pylint: disable=too-many-public-methods + """Tests for the RenewableCert class as well as other functions + within renewer.py.""" + def setUp(self): + from letsencrypt import storage + self.tempdir = tempfile.mkdtemp() + os.makedirs(os.path.join(self.tempdir, "live", "example.org")) + os.makedirs(os.path.join(self.tempdir, "archive", "example.org")) + os.makedirs(os.path.join(self.tempdir, "configs")) + defaults = configobj.ConfigObj() + defaults["live_dir"] = os.path.join(self.tempdir, "live") + defaults["archive_dir"] = os.path.join(self.tempdir, "archive") + defaults["renewal_configs_dir"] = os.path.join(self.tempdir, + "configs") + config = configobj.ConfigObj() + for kind in ALL_FOUR: + config[kind] = os.path.join(self.tempdir, "live", "example.org", + kind + ".pem") + config.filename = os.path.join(self.tempdir, "configs", + "example.org.conf") + self.defaults = defaults # for main() test + self.test_rc = storage.RenewableCert(config, defaults) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_initialization(self): + self.assertEqual(self.test_rc.lineagename, "example.org") + for kind in ALL_FOUR: + self.assertEqual( + getattr(self.test_rc, kind), os.path.join( + self.tempdir, "live", "example.org", kind + ".pem")) + + def test_renewal_bad_config(self): + """Test that the RenewableCert constructor will complain if + the renewal configuration file doesn't end in ".conf" or if it + isn't a ConfigObj.""" + from letsencrypt import storage + defaults = configobj.ConfigObj() + config = configobj.ConfigObj() + # These files don't exist and aren't created here; the point of the test + # is to confirm that the constructor rejects them outright because of + # the configfile's name. + for kind in ALL_FOUR: + config["cert"] = "nonexistent_" + kind + ".pem" + config.filename = "nonexistent_sillyfile" + self.assertRaises(ValueError, storage.RenewableCert, config, defaults) + self.assertRaises(TypeError, storage.RenewableCert, "fun", defaults) + + def test_renewal_incomplete_config(self): + """Test that the RenewableCert constructor will complain if + the renewal configuration file is missing a required file element.""" + from letsencrypt import storage + defaults = configobj.ConfigObj() + config = configobj.ConfigObj() + config["cert"] = "imaginary_cert.pem" + # Here the required privkey is missing. + config["chain"] = "imaginary_chain.pem" + config["fullchain"] = "imaginary_fullchain.pem" + config.filename = "imaginary_config.conf" + self.assertRaises(ValueError, storage.RenewableCert, config, defaults) + + def test_consistent(self): # pylint: disable=too-many-statements + oldcert = self.test_rc.cert + self.test_rc.cert = "relative/path" + # Absolute path for item requirement + self.assertFalse(self.test_rc.consistent()) + self.test_rc.cert = oldcert + # Items must exist requirement + self.assertFalse(self.test_rc.consistent()) + # Items must be symlinks requirements + fill_with_sample_data(self.test_rc) + self.assertFalse(self.test_rc.consistent()) + unlink_all(self.test_rc) + # Items must point to desired place if they are relative + for kind in ALL_FOUR: + os.symlink(os.path.join("..", kind + "17.pem"), + getattr(self.test_rc, kind)) + self.assertFalse(self.test_rc.consistent()) + unlink_all(self.test_rc) + # Items must point to desired place if they are absolute + for kind in ALL_FOUR: + os.symlink(os.path.join(self.tempdir, kind + "17.pem"), + getattr(self.test_rc, kind)) + self.assertFalse(self.test_rc.consistent()) + unlink_all(self.test_rc) + # Items must point to things that exist + for kind in ALL_FOUR: + os.symlink(os.path.join("..", "..", "archive", "example.org", + kind + "17.pem"), + getattr(self.test_rc, kind)) + self.assertFalse(self.test_rc.consistent()) + # This version should work + fill_with_sample_data(self.test_rc) + self.assertTrue(self.test_rc.consistent()) + # Items must point to things that follow the naming convention + os.unlink(self.test_rc.fullchain) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "fullchain_17.pem"), self.test_rc.fullchain) + with open(self.test_rc.fullchain, "w") as f: + f.write("wrongly-named fullchain") + self.assertFalse(self.test_rc.consistent()) + + def test_current_target(self): + # Relative path logic + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert17.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write("cert") + self.assertTrue(os.path.samefile(self.test_rc.current_target("cert"), + os.path.join(self.tempdir, "archive", + "example.org", + "cert17.pem"))) + # Absolute path logic + os.unlink(self.test_rc.cert) + os.symlink(os.path.join(self.tempdir, "archive", "example.org", + "cert17.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write("cert") + self.assertTrue(os.path.samefile(self.test_rc.current_target("cert"), + os.path.join(self.tempdir, "archive", + "example.org", + "cert17.pem"))) + + def test_current_version(self): + for ver in (1, 5, 10, 20): + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert{0}.pem".format(ver)), + self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write("cert") + os.unlink(self.test_rc.cert) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert10.pem"), self.test_rc.cert) + self.assertEqual(self.test_rc.current_version("cert"), 10) + + def test_no_current_version(self): + self.assertEqual(self.test_rc.current_version("cert"), None) + + def test_latest_and_next_versions(self): + for ver in range(1, 6): + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.assertEqual(self.test_rc.latest_common_version(), 5) + self.assertEqual(self.test_rc.next_free_version(), 6) + # Having one kind of file of a later version doesn't change the + # result + os.unlink(self.test_rc.privkey) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "privkey7.pem"), self.test_rc.privkey) + with open(self.test_rc.privkey, "w") as f: + f.write("privkey") + self.assertEqual(self.test_rc.latest_common_version(), 5) + # ... although it does change the next free version + self.assertEqual(self.test_rc.next_free_version(), 8) + # Nor does having three out of four change the result + os.unlink(self.test_rc.cert) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert7.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write("cert") + os.unlink(self.test_rc.fullchain) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "fullchain7.pem"), self.test_rc.fullchain) + with open(self.test_rc.fullchain, "w") as f: + f.write("fullchain") + self.assertEqual(self.test_rc.latest_common_version(), 5) + # If we have everything from a much later version, it does change + # the result + ver = 17 + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.assertEqual(self.test_rc.latest_common_version(), 17) + self.assertEqual(self.test_rc.next_free_version(), 18) + + def test_update_link_to(self): + for ver in range(1, 6): + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.assertEqual(ver, self.test_rc.current_version(kind)) + self.test_rc.update_link_to("cert", 3) + self.test_rc.update_link_to("privkey", 2) + self.assertEqual(3, self.test_rc.current_version("cert")) + self.assertEqual(2, self.test_rc.current_version("privkey")) + self.assertEqual(5, self.test_rc.current_version("chain")) + self.assertEqual(5, self.test_rc.current_version("fullchain")) + # Currently we are allowed to update to a version that doesn't exist + self.test_rc.update_link_to("chain", 3000) + # However, current_version doesn't allow querying the resulting + # version (because it's a broken link). + self.assertEqual(os.path.basename(os.readlink(self.test_rc.chain)), + "chain3000.pem") + + def test_version(self): + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert12.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write("cert") + # TODO: We should probably test that the directory is still the + # same, but it's tricky because we can get an absolute + # path out when we put a relative path in. + self.assertEqual("cert8.pem", + os.path.basename(self.test_rc.version("cert", 8))) + + def test_update_all_links_to(self): + for ver in range(1, 6): + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.assertEqual(ver, self.test_rc.current_version(kind)) + self.assertEqual(self.test_rc.latest_common_version(), 5) + for ver in range(1, 6): + self.test_rc.update_all_links_to(ver) + for kind in ALL_FOUR: + self.assertEqual(ver, self.test_rc.current_version(kind)) + self.assertEqual(self.test_rc.latest_common_version(), 5) + + def test_has_pending_deployment(self): + for ver in range(1, 6): + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.assertEqual(ver, self.test_rc.current_version(kind)) + for ver in range(1, 6): + self.test_rc.update_all_links_to(ver) + for kind in ALL_FOUR: + self.assertEqual(ver, self.test_rc.current_version(kind)) + if ver < 5: + self.assertTrue(self.test_rc.has_pending_deployment()) + else: + self.assertFalse(self.test_rc.has_pending_deployment()) + + def _test_notafterbefore(self, function, timestamp): + test_cert = pkg_resources.resource_string( + "letsencrypt.tests", "testdata/cert.pem") + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert12.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write(test_cert) + desired_time = datetime.datetime.utcfromtimestamp(timestamp) + desired_time = desired_time.replace(tzinfo=pytz.UTC) + for result in (function(), function(12)): + self.assertEqual(result, desired_time) + self.assertEqual(result.utcoffset(), datetime.timedelta(0)) + + def test_notbefore(self): + self._test_notafterbefore(self.test_rc.notbefore, 1418337285) + # 2014-12-11 22:34:45+00:00 = Unix time 1418337285 + + def test_notafter(self): + self._test_notafterbefore(self.test_rc.notafter, 1418942085) + # 2014-12-18 22:34:45+00:00 = Unix time 1418942085 + + @mock.patch("letsencrypt.storage.datetime") + def test_time_interval_judgments(self, mock_datetime): + """Test should_autodeploy() and should_autorenew() on the basis + of expiry time windows.""" + test_cert = pkg_resources.resource_string( + "letsencrypt.tests", "testdata/cert.pem") + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}12.pem".format(kind)), where) + with open(where, "w") as f: + f.write(kind) + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}11.pem".format(kind)), where) + with open(where, "w") as f: + f.write(kind) + self.test_rc.update_all_links_to(12) + with open(self.test_rc.cert, "w") as f: + f.write(test_cert) + self.test_rc.update_all_links_to(11) + with open(self.test_rc.cert, "w") as f: + f.write(test_cert) + + mock_datetime.timedelta = datetime.timedelta + + for (current_time, interval, result) in [ + # 2014-12-13 12:00:00+00:00 (about 5 days prior to expiry) + # Times that should result in autorenewal/autodeployment + (1418472000, "2 months", True), (1418472000, "1 week", True), + # Times that should not + (1418472000, "4 days", False), (1418472000, "2 days", False), + # 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry) + # Times that should result in autorenewal/autodeployment + (1241179200, "7 years", True), + (1241179200, "11 years 2 months", True), + # Times that should not + (1241179200, "8 hours", False), (1241179200, "2 days", False), + (1241179200, "40 days", False), (1241179200, "9 months", False), + # 2015-01-01 (after expiry has already happened, so all + # intervals should cause autorenewal/autodeployment) + (1420070400, "0 seconds", True), + (1420070400, "10 seconds", True), + (1420070400, "10 minutes", True), + (1420070400, "10 weeks", True), (1420070400, "10 months", True), + (1420070400, "10 years", True), (1420070400, "99 months", True), + ]: + sometime = datetime.datetime.utcfromtimestamp(current_time) + mock_datetime.datetime.utcnow.return_value = sometime + self.test_rc.configuration["deploy_before_expiry"] = interval + self.test_rc.configuration["renew_before_expiry"] = interval + self.assertEqual(self.test_rc.should_autodeploy(), result) + self.assertEqual(self.test_rc.should_autorenew(), result) + + def test_should_autodeploy(self): + """Test should_autodeploy() on the basis of reasons other than + expiry time window.""" + # pylint: disable=too-many-statements + # Autodeployment turned off + self.test_rc.configuration["autodeploy"] = "0" + self.assertFalse(self.test_rc.should_autodeploy()) + self.test_rc.configuration["autodeploy"] = "1" + # No pending deployment + for ver in range(1, 6): + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.assertFalse(self.test_rc.should_autodeploy()) + + @mock.patch("letsencrypt.storage.RenewableCert.ocsp_revoked") + def test_should_autorenew(self, mock_ocsp): + """Test should_autorenew on the basis of reasons other than + expiry time window.""" + # pylint: disable=too-many-statements + # Autorenewal turned off + self.test_rc.configuration["autorenew"] = "0" + self.assertFalse(self.test_rc.should_autorenew()) + self.test_rc.configuration["autorenew"] = "1" + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}12.pem".format(kind)), where) + with open(where, "w") as f: + f.write(kind) + # Mandatory renewal on the basis of OCSP revocation + mock_ocsp.return_value = True + self.assertTrue(self.test_rc.should_autorenew()) + mock_ocsp.return_value = False + + def test_save_successor(self): + for ver in range(1, 6): + for kind in ALL_FOUR: + where = getattr(self.test_rc, kind) + if os.path.islink(where): + os.unlink(where) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "{0}{1}.pem".format(kind, ver)), where) + with open(where, "w") as f: + f.write(kind) + self.test_rc.update_all_links_to(3) + self.assertEqual(6, self.test_rc.save_successor(3, "new cert", None, + "new chain")) + with open(self.test_rc.version("cert", 6)) as f: + self.assertEqual(f.read(), "new cert") + with open(self.test_rc.version("chain", 6)) as f: + self.assertEqual(f.read(), "new chain") + with open(self.test_rc.version("fullchain", 6)) as f: + self.assertEqual(f.read(), "new cert" + "new chain") + # version 6 of the key should be a link back to version 3 + self.assertFalse(os.path.islink(self.test_rc.version("privkey", 3))) + self.assertTrue(os.path.islink(self.test_rc.version("privkey", 6))) + # Let's try two more updates + self.assertEqual(7, self.test_rc.save_successor(6, "again", None, + "newer chain")) + self.assertEqual(8, self.test_rc.save_successor(7, "hello", None, + "other chain")) + # All of the subsequent versions should link directly to the original + # privkey. + for i in (6, 7, 8): + self.assertTrue(os.path.islink(self.test_rc.version("privkey", i))) + self.assertEqual("privkey3.pem", os.path.basename(os.readlink( + self.test_rc.version("privkey", i)))) + + for kind in ALL_FOUR: + self.assertEqual(self.test_rc.available_versions(kind), range(1, 9)) + self.assertEqual(self.test_rc.current_version(kind), 3) + # Test updating from latest version rather than old version + self.test_rc.update_all_links_to(8) + self.assertEqual(9, self.test_rc.save_successor(8, "last", None, + "attempt")) + for kind in ALL_FOUR: + self.assertEqual(self.test_rc.available_versions(kind), + range(1, 10)) + self.assertEqual(self.test_rc.current_version(kind), 8) + with open(self.test_rc.version("fullchain", 9)) as f: + self.assertEqual(f.read(), "last" + "attempt") + # Test updating when providing a new privkey. The key should + # be saved in a new file rather than creating a new symlink. + self.assertEqual(10, self.test_rc.save_successor(9, "with", "a", + "key")) + self.assertTrue(os.path.exists(self.test_rc.version("privkey", 10))) + self.assertFalse(os.path.islink(self.test_rc.version("privkey", 10))) + + def test_new_lineage(self): + """Test for new_lineage() class method.""" + from letsencrypt import storage + config_dir = self.defaults["renewal_configs_dir"] + archive_dir = self.defaults["archive_dir"] + live_dir = self.defaults["live_dir"] + result = storage.RenewableCert.new_lineage("the-lineage.com", "cert", + "privkey", "chain", None, + self.defaults) + # This consistency check tests most relevant properties about the + # newly created cert lineage. + self.assertTrue(result.consistent()) + self.assertTrue(os.path.exists(os.path.join(config_dir, + "the-lineage.com.conf"))) + with open(result.fullchain) as f: + self.assertEqual(f.read(), "cert" + "chain") + # Let's do it again and make sure it makes a different lineage + result = storage.RenewableCert.new_lineage("the-lineage.com", "cert2", + "privkey2", "chain2", None, + self.defaults) + self.assertTrue(os.path.exists( + os.path.join(config_dir, "the-lineage.com-0001.conf"))) + # Now trigger the detection of already existing files + os.mkdir(os.path.join(live_dir, "the-lineage.com-0002")) + self.assertRaises(ValueError, storage.RenewableCert.new_lineage, + "the-lineage.com", "cert3", "privkey3", "chain3", + None, self.defaults) + os.mkdir(os.path.join(archive_dir, "other-example.com")) + self.assertRaises(ValueError, storage.RenewableCert.new_lineage, + "other-example.com", "cert4", "privkey4", "chain4", + None, self.defaults) + # Make sure it can accept renewal parameters + params = {"stuff": "properties of stuff", "great": "awesome"} + result = storage.RenewableCert.new_lineage("the-lineage.com", "cert2", + "privkey2", "chain2", + params, self.defaults) + # TODO: Conceivably we could test that the renewal parameters actually + # got saved + + def test_new_lineage_nonexistent_dirs(self): + """Test that directories can be created if they don't exist.""" + from letsencrypt import storage + config_dir = self.defaults["renewal_configs_dir"] + archive_dir = self.defaults["archive_dir"] + live_dir = self.defaults["live_dir"] + shutil.rmtree(config_dir) + shutil.rmtree(archive_dir) + shutil.rmtree(live_dir) + storage.RenewableCert.new_lineage("the-lineage.com", "cert2", + "privkey2", "chain2", + None, self.defaults) + self.assertTrue(os.path.exists( + os.path.join(config_dir, "the-lineage.com.conf"))) + self.assertTrue(os.path.exists( + os.path.join(live_dir, "the-lineage.com", "privkey.pem"))) + self.assertTrue(os.path.exists( + os.path.join(archive_dir, "the-lineage.com", "privkey1.pem"))) + + @mock.patch("letsencrypt.storage.le_util.unique_lineage_name") + def test_invalid_config_filename(self, mock_uln): + from letsencrypt import storage + mock_uln.return_value = "this_does_not_end_with_dot_conf", "yikes" + self.assertRaises(ValueError, storage.RenewableCert.new_lineage, + "example.com", "cert", "privkey", "chain", + None, self.defaults) + + def test_bad_kind(self): + self.assertRaises(ValueError, self.test_rc.current_target, "elephant") + self.assertRaises(ValueError, self.test_rc.current_version, "elephant") + self.assertRaises(ValueError, self.test_rc.version, "elephant", 17) + self.assertRaises(ValueError, self.test_rc.available_versions, + "elephant") + self.assertRaises(ValueError, self.test_rc.newest_available_version, + "elephant") + self.assertRaises(ValueError, self.test_rc.update_link_to, + "elephant", 17) + + def test_ocsp_revoked(self): + # XXX: This is currently hardcoded to False due to a lack of an + # OCSP server to test against. + self.assertFalse(self.test_rc.ocsp_revoked()) + + def test_parse_time_interval(self): + from letsencrypt import storage + # XXX: I'm not sure if intervals related to years and months + # take account of the current date (if so, some of these + # may fail in the future, like in leap years or even in + # months of different lengths!) + intended = {"": 0, "17 days": 17, "23": 23, "1 month": 31, + "7 weeks": 49, "1 year 1 day": 366, "1 year-1 day": 364, + "4 years": 1461} + for time in intended: + self.assertEqual(storage.parse_time_interval(time), + datetime.timedelta(intended[time])) + + @mock.patch("letsencrypt.renewer.plugins_disco") + @mock.patch("letsencrypt.client.determine_account") + @mock.patch("letsencrypt.client.Client") + def test_renew(self, mock_c, mock_da, mock_pd): + """Tests for renew().""" + from letsencrypt import renewer + + test_cert = pkg_resources.resource_string( + "letsencrypt.tests", "testdata/cert-san.pem") + for kind in ALL_FOUR: + os.symlink(os.path.join("..", "..", "archive", "example.org", + kind + "1.pem"), + getattr(self.test_rc, kind)) + fill_with_sample_data(self.test_rc) + with open(self.test_rc.cert, "w") as f: + f.write(test_cert) + + # Fails because renewalparams are missing + self.assertFalse(renewer.renew(self.test_rc, 1)) + self.test_rc.configfile["renewalparams"] = {"some": "stuff"} + # Fails because there's no authenticator specified + self.assertFalse(renewer.renew(self.test_rc, 1)) + self.test_rc.configfile["renewalparams"]["rsa_key_size"] = "2048" + self.test_rc.configfile["renewalparams"]["server"] = "acme.example.com" + self.test_rc.configfile["renewalparams"]["authenticator"] = "fake" + mock_auth = mock.MagicMock() + mock_pd.PluginsRegistry.find_all.return_value = {"apache": mock_auth} + # Fails because "fake" != "apache" + self.assertFalse(renewer.renew(self.test_rc, 1)) + self.test_rc.configfile["renewalparams"]["authenticator"] = "apache" + mock_client = mock.MagicMock() + mock_client.obtain_certificate.return_value = ("cert", "key", "chain") + mock_c.return_value = mock_client + self.assertEqual(2, renewer.renew(self.test_rc, 1)) + # TODO: We could also make several assertions about calls that should + # have been made to the mock functions here. + self.assertEqual(mock_da.call_count, 1) + mock_client.obtain_certificate.return_value = (None, None, None) + # This should fail because the renewal itself appears to fail + self.assertEqual(False, renewer.renew(self.test_rc, 1)) + + + @mock.patch("letsencrypt.renewer.notify") + @mock.patch("letsencrypt.storage.RenewableCert") + @mock.patch("letsencrypt.renewer.renew") + def test_main(self, mock_renew, mock_rc, mock_notify): + """Test for main() function.""" + from letsencrypt import renewer + mock_rc_instance = mock.MagicMock() + mock_rc_instance.should_autodeploy.return_value = True + mock_rc_instance.should_autorenew.return_value = True + mock_rc_instance.latest_common_version.return_value = 10 + mock_rc.return_value = mock_rc_instance + with open(os.path.join(self.defaults["renewal_configs_dir"], + "README"), "w") as f: + f.write("This is a README file to make sure that the renewer is") + f.write("able to correctly ignore files that don't end in .conf.") + with open(os.path.join(self.defaults["renewal_configs_dir"], + "example.org.conf"), "w") as f: + # This isn't actually parsed in this test; we have a separate + # test_initialization that tests the initialization, assuming + # that configobj can correctly parse the config file. + f.write("cert = cert.pem\nprivkey = privkey.pem\n") + f.write("chain = chain.pem\nfullchain = fullchain.pem\n") + with open(os.path.join(self.defaults["renewal_configs_dir"], + "example.com.conf"), "w") as f: + f.write("cert = cert.pem\nprivkey = privkey.pem\n") + f.write("chain = chain.pem\nfullchain = fullchain.pem\n") + renewer.main(self.defaults) + self.assertEqual(mock_rc.call_count, 2) + self.assertEqual(mock_rc_instance.update_all_links_to.call_count, 2) + self.assertEqual(mock_notify.notify.call_count, 4) + self.assertEqual(mock_renew.call_count, 2) + # If we have instances that don't need any work done, no work should + # be done (call counts associated with processing deployments or + # renewals should not increase). + mock_happy_instance = mock.MagicMock() + mock_happy_instance.should_autodeploy.return_value = False + mock_happy_instance.should_autorenew.return_value = False + mock_happy_instance.latest_common_version.return_value = 10 + mock_rc.return_value = mock_happy_instance + renewer.main(self.defaults) + self.assertEqual(mock_rc.call_count, 4) + self.assertEqual(mock_happy_instance.update_all_links_to.call_count, 0) + self.assertEqual(mock_notify.notify.call_count, 4) + self.assertEqual(mock_renew.call_count, 2) + + def test_bad_config_file(self): + from letsencrypt import renewer + with open(os.path.join(self.defaults["renewal_configs_dir"], + "bad.conf"), "w") as f: + f.write("incomplete = configfile\n") + renewer.main(self.defaults) + # The ValueError is caught inside and nothing happens. + +if __name__ == "__main__": + unittest.main() # pragma: no cover diff --git a/setup.py b/setup.py index 815403b5a..145b75a69 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ install_requires = [ 'jsonschema', 'mock', 'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304) + 'parsedatetime', 'psutil>=2.1.0', # net_connections introduced in 2.1.0 'pyasn1', # urllib3 InsecurePlatformWarning (#304) 'pycrypto', @@ -115,6 +116,7 @@ setup( entry_points={ 'console_scripts': [ 'letsencrypt = letsencrypt.cli:main', + 'letsencrypt-renewer = letsencrypt.renewer:main', 'jws = letsencrypt.acme.jose.jws:CLI.run', ], 'letsencrypt.plugins': [