1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

Merge pull request #404 from letsencrypt/renewer

Automatically renew certificates!
This commit is contained in:
Peter Eckersley
2015-05-26 15:56:11 -07:00
16 changed files with 1811 additions and 51 deletions

View File

@@ -82,7 +82,7 @@ required-attributes=
bad-functions=map,filter,apply,input,file
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_,fd
good-names=f,i,j,k,ex,Run,_,fd
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata

View File

@@ -98,13 +98,18 @@ def run(args, config, plugins):
return "Configurator could not be determined"
acme, doms = _common_run(args, config, acc, authenticator, installer)
cert_key, cert_path, chain_path = acme.obtain_certificate(doms)
acme.deploy_certificate(doms, cert_key, cert_path, chain_path)
# TODO: Handle errors from _common_run?
lineage = acme.obtain_and_enroll_certificate(doms, authenticator,
installer, plugins)
if not lineage:
return "Certificate could not be obtained"
acme.deploy_certificate(doms, lineage)
acme.enhance_config(doms, args.redirect)
def auth(args, config, plugins):
"""Obtain a certificate (no install)."""
# XXX: Update for renewer / RenewableCert
acc = _account_init(args, config)
if acc is None:
return None
@@ -119,13 +124,17 @@ def auth(args, config, plugins):
else:
installer = None
# TODO: Handle errors from _common_run?
acme, doms = _common_run(
args, config, acc, authenticator=authenticator, installer=installer)
acme.obtain_certificate(doms)
if not acme.obtain_and_enroll_certificate(doms, authenticator, installer,
plugins):
return "Certificate could not be obtained"
def install(args, config, plugins):
"""Install (no auth)."""
# XXX: Update for renewer/RenewableCert
acc = _account_init(args, config)
if acc is None:
return None
@@ -136,7 +145,8 @@ def install(args, config, plugins):
acme, doms = _common_run(
args, config, acc, authenticator=None, installer=installer)
assert args.cert_path is not None
acme.deploy_certificate(doms, acc.key, args.cert_path, args.chain_path)
# XXX: This API has changed as a result of RenewableCert!
# acme.deploy_certificate(doms, acc.key, args.cert_path, args.chain_path)
acme.enhance_config(doms, args.redirect)
@@ -330,6 +340,9 @@ def _paths_parser(parser):
add("--chain-path", default=flag_default("chain_path"),
help=config_help("chain_path"))
add("--enroll-autorenew", default=None, action="store_true",
help=config_help("enroll_autorenew"))
return parser

View File

@@ -19,6 +19,7 @@ from letsencrypt import le_util
from letsencrypt import network2
from letsencrypt import reverter
from letsencrypt import revoker
from letsencrypt import storage
from letsencrypt.display import ops as display_ops
from letsencrypt.display import enhancements
@@ -67,6 +68,10 @@ class Client(object):
self.config = config
# TODO: Check if self.config.enroll_autorenew is None. If
# so, set it based to the default: figure out if dv_auth is
# standalone (then default is False, otherwise default is True)
if dv_auth is not None:
cont_auth = continuity_auth.ContinuityAuthenticator(config,
installer)
@@ -95,7 +100,7 @@ class Client(object):
self.account.save()
def obtain_certificate(self, domains, csr=None):
def _obtain_certificate(self, domains, csr=None):
"""Obtains a certificate from the ACME server.
:meth:`.register` must be called before :meth:`.obtain_certificate`
@@ -108,8 +113,8 @@ class Client(object):
this CSR can be different than self.authkey
:type csr: :class:`CSR`
:returns: cert_key, cert_path, chain_path
:rtype: `tuple` of (:class:`letsencrypt.le_util.Key`, str, str)
:returns: cert_pem, key_pem, chain_pem
:rtype: `tuple` of (str, str, str)
"""
if self.auth_handler is None:
@@ -136,14 +141,55 @@ class Client(object):
M2Crypto.X509.load_request_der_string(csr.data)),
authzr)
# Save Certificate
cert_path, chain_path = self.save_certificate(
certr, self.config.cert_path, self.config.chain_path)
cert_pem = certr.body.as_pem()
chain_pem = None
if certr.cert_chain_uri is not None:
chain_pem = self.network.fetch_chain(certr)
revoker.Revoker.store_cert_key(
cert_path, self.account.key.file, self.config)
if chain_pem is None:
# XXX: just to stop RenewableCert from complaining; this is
# probably not a good solution
chain_pem = ""
else:
chain_pem = chain_pem.as_pem()
return cert_key, cert_path, chain_path
return cert_pem, cert_key.pem, chain_pem
def obtain_and_enroll_certificate(self, domains, authenticator, installer,
plugins, csr=None):
"""Get a new certificate for the specified domains using the specified
authenticator and installer, and then create a new renewable lineage
containing it.
:param list domains: Domains to request.
:param authenticator: The authenticator to use.
:type authenticator: :class:`letsencrypt.interfaces.IAuthenticator`
:param installer: The installer to use.
:type installer: :class:`letsencrypt.interfaces.IInstaller`
:param plugins: A PluginsFactory object.
:param str csr: A preexisting CSR to use with this request.
:returns: A new :class:`letsencrypt.storage.RenewableCert` instance
referred to the enrolled cert lineage, or False if the cert could
not be obtained.
"""
# TODO: fully identify object types in docstring.
cert_pem, privkey, chain_pem = self._obtain_certificate(domains, csr)
self.config.namespace.authenticator = plugins.find_init(
authenticator).name
if installer is not None:
self.config.namespace.installer = plugins.find_init(installer).name
return storage.RenewableCert.new_lineage(domains[0], cert_pem,
privkey, chain_pem,
vars(self.config.namespace))
def obtain_certificate(self, domains):
"""Public method to obtain a certificate for the specified domains
using this client object. Returns the tuple (cert, privkey, chain)."""
return self._obtain_certificate(domains, None)
def save_certificate(self, certr, cert_path, chain_path):
# pylint: disable=no-self-use
@@ -192,29 +238,32 @@ class Client(object):
return os.path.abspath(act_cert_path), cert_chain_abspath
def deploy_certificate(self, domains, privkey, cert_path, chain_path=None):
def deploy_certificate(self, domains, lineage):
"""Install certificate
:param list domains: list of domains to install the certificate
:param privkey: private key for certificate
:type privkey: :class:`letsencrypt.le_util.Key`
:param str cert_path: certificate file path
:param str chain_path: chain file path
:param lineage: RenewableCert object representing the certificate
:type lineage: :class:`letsencrypt.storage.RenewableCert`
"""
if self.installer is None:
logging.warning("No installer specified, client is unable to deploy"
"the certificate")
raise errors.LetsEncryptClientError("No installer available")
chain_path = None if chain_path is None else os.path.abspath(chain_path)
# TODO: Is it possible not to have a chain at all? (The
# RenewableCert class currently doesn't support this case, but
# perhaps the CA can issue according to ACME without providing
# a chain, which would currently be a problem for instantiating
# RenewableCert, and subsequently also for this method.)
for dom in domains:
self.installer.deploy_cert(
dom, os.path.abspath(cert_path),
os.path.abspath(privkey.file), chain_path)
# TODO: Provide a fullchain reference for installers like
# nginx that want it
self.installer.deploy_cert(dom,
lineage.cert,
lineage.privkey,
lineage.chain)
self.installer.save("Deployed Let's Encrypt Certificate")
# sites may have been enabled / final cleanup

View File

@@ -1,4 +1,5 @@
"""Let's Encrypt constants."""
import configobj
import logging
from acme import challenges
@@ -26,6 +27,18 @@ CLI_DEFAULTS = dict(
"""Defaults for CLI flags and `.IConfig` attributes."""
RENEWER_DEFAULTS = configobj.ConfigObj(dict(
renewer_config_file="/etc/letsencrypt/renewer.conf",
renewal_configs_dir="/etc/letsencrypt/configs",
archive_dir="/etc/letsencrypt/archive",
live_dir="/etc/letsencrypt/live",
renewer_enabled="yes",
renew_before_expiry="30 days",
deploy_before_expiry="20 days",
))
"""Defaults for renewer script."""
EXCLUSIVE_CHALLENGES = frozenset([frozenset([
challenges.DVSNI, challenges.SimpleHTTPS])])
"""Mutually exclusive challenges."""
@@ -71,3 +84,7 @@ IConfig.work_dir)."""
NETSTAT = "/bin/netstat"
"""Location of netstat binary for checking whether a listener is already
running on the specified port (Linux-specific)."""
BOULDER_TEST_MODE_CHALLENGE_PORT = 5001
"""Port that Boulder will connect on for validations in test mode."""

View File

@@ -72,7 +72,7 @@ def init_save_csr(privkey, names, cert_dir, csrname="csr-letsencrypt.pem"):
csr_pem, csr_der = make_csr(privkey.pem, names)
# Save CSR
le_util.make_or_verify_dir(cert_dir, 0o755)
le_util.make_or_verify_dir(cert_dir, 0o755, os.geteuid())
csr_f, csr_filename = le_util.unique_file(
os.path.join(cert_dir, csrname), 0o644)
csr_f.write(csr_pem)
@@ -234,42 +234,78 @@ def make_ss_cert(key_str, domains, not_before=None,
return cert.as_pem()
def _request_san(req): # TODO: implement directly in PyOpenSSL!
def _pyopenssl_cert_or_req_san(cert_or_req):
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.
.. todo:: Implement directly in PyOpenSSL!
:param cert_or_req: Certificate or CSR.
:type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.
:returns: A list of Subject Alternative Names.
:rtype: list
"""
# constants based on implementation of
# OpenSSL.crypto.X509Error._subjectAltNameString
parts_separator = ", "
part_separator = ":"
extension_short_name = "subjectAltName"
if hasattr(cert_or_req, 'get_extensions'): # X509Req
extensions = cert_or_req.get_extensions()
else: # X509
extensions = [cert_or_req.get_extension(i)
for i in xrange(cert_or_req.get_extension_count())]
# pylint: disable=protected-access,no-member
label = OpenSSL.crypto.X509Extension._prefixes[OpenSSL.crypto._lib.GEN_DNS]
assert parts_separator not in label
prefix = label + part_separator
extensions = [ext._subjectAltNameString().split(parts_separator)
for ext in req.get_extensions()
if ext.get_short_name() == extension_short_name]
san_extensions = [
ext._subjectAltNameString().split(parts_separator)
for ext in extensions if ext.get_short_name() == extension_short_name]
# WARNING: this function assumes that no SAN can include
# parts_separator, hence the split!
return [part.split(part_separator)[1] for parts in extensions
return [part.split(part_separator)[1] for parts in san_extensions
for part in parts if part.startswith(prefix)]
def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get list of Subject Alternative Names from signing request.
:param str csr: Certificate Signing Request in PEM format (must contain
one or more subjectAlternativeNames, or the function will fail,
raising ValueError)
:returns: List of referenced subject alternative names
:rtype: list
"""
def _get_sans_from_cert_or_req(
cert_or_req_str, load_func, typ=OpenSSL.crypto.FILETYPE_PEM):
try:
request = OpenSSL.crypto.load_certificate_request(typ, csr)
cert_or_req = load_func(typ, cert_or_req_str)
except OpenSSL.crypto.Error as error:
logging.exception(error)
raise
return _request_san(request)
return _pyopenssl_cert_or_req_san(cert_or_req)
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a certificate.
:param str csr: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.
:rtype: list
"""
return _get_sans_from_cert_or_req(
cert, OpenSSL.crypto.load_certificate, typ)
def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a CSR.
:param str csr: CSR (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.
:rtype: list
"""
return _get_sans_from_cert_or_req(
csr, OpenSSL.crypto.load_certificate_request, typ)

View File

@@ -175,6 +175,11 @@ class IConfig(zope.interface.Interface):
le_vhost_ext = zope.interface.Attribute(
"SSL vhost configuration extension.")
enroll_autorenew = zope.interface.Attribute(
"Register this certificate in the database to be renewed"
" automatically.")
cert_path = zope.interface.Attribute("Let's Encrypt certificate file path.")
chain_path = zope.interface.Attribute("Let's Encrypt chain file path.")

View File

@@ -60,7 +60,7 @@ def unique_file(path, mode=0o777):
:param str path: path/filename.ext
:param int mode: File mode
:return: tuple of file object and file name
:returns: tuple of file object and file name
"""
path, tail = os.path.split(path)
@@ -70,8 +70,45 @@ def unique_file(path, mode=0o777):
try:
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
return os.fdopen(file_d, "w"), fname
except OSError:
pass
except OSError as exception:
# "File exists," is okay, try a different name.
if exception.errno != errno.EEXIST:
raise
count += 1
def unique_lineage_name(path, filename, mode=0o777):
"""Safely finds a unique file for writing only (by default). Uses a
file lineage convention.
:param str path: directory path
:param str filename: proposed filename
:param int mode: file mode
:returns: tuple of file object and file name (which may be modified from
the requested one by appending digits to ensure uniqueness)
:raises OSError: if writing files fails for an unanticipated reason,
such as a full disk or a lack of permission to write to specified
location.
"""
fname = os.path.join(path, "%s.conf" % (filename))
try:
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
return os.fdopen(file_d, "w"), fname
except OSError as err:
if err.errno != errno.EEXIST:
raise err
count = 1
while True:
fname = os.path.join(path, "%s-%04d.conf" % (filename, count))
try:
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
return os.fdopen(file_d, "w"), fname
except OSError as err:
if err.errno != errno.EEXIST:
raise err
count += 1

30
letsencrypt/notify.py Normal file
View File

@@ -0,0 +1,30 @@
"""Send e-mail notification to system administrators."""
import email
import smtplib
import socket
import subprocess
def notify(subject, whom, what):
"""Try to notify the addressee (whom) by e-mail, with Subject:
defined by subject and message body by what."""
msg = email.message_from_string(what)
msg.add_header("From", "Let's Encrypt renewal agent <root>")
msg.add_header("To", whom)
msg.add_header("Subject", subject)
msg = msg.as_string()
try:
lmtp = smtplib.LMTP()
lmtp.connect()
lmtp.sendmail("root", [whom], msg)
except (smtplib.SMTPHeloError, smtplib.SMTPRecipientsRefused,
smtplib.SMTPSenderRefused, smtplib.SMTPDataError, socket.error):
# We should try using /usr/sbin/sendmail in this case
try:
proc = subprocess.Popen(["/usr/sbin/sendmail", "-t"],
stdin=subprocess.PIPE)
proc.communicate(msg)
except OSError:
return False
return True

View File

@@ -15,6 +15,7 @@ import zope.interface
from acme import challenges
from letsencrypt import achallenges
from letsencrypt import constants
from letsencrypt import interfaces
from letsencrypt.plugins import common
@@ -378,7 +379,10 @@ class StandaloneAuthenticator(common.Plugin):
results_if_failure.append(False)
if not self.tasks:
raise ValueError("nothing for .perform() to do")
if self.already_listening(challenges.DVSNI.PORT):
port = challenges.DVSNI.PORT
if self.config and self.config.test_mode:
port = constants.BOULDER_TEST_MODE_CHALLENGE_PORT
if self.already_listening(port):
# If we know a process is already listening on this port,
# tell the user, and don't even attempt to bind it. (This
# test is Linux-specific and won't indicate that the port
@@ -386,7 +390,7 @@ class StandaloneAuthenticator(common.Plugin):
return results_if_failure
# Try to do the authentication; note that this creates
# the listener subprocess via os.fork()
if self.start_listener(challenges.DVSNI.PORT, key):
if self.start_listener(port, key):
return results_if_success
else:
# TODO: This should probably raise a DVAuthError exception

140
letsencrypt/renewer.py Normal file
View File

@@ -0,0 +1,140 @@
"""Renewer tool to handle autorenewal and autodeployment of renewed
certs within lineages of successor certificates, according to
configuration."""
# TODO: sanity checking consistency, validity, freshness?
# TODO: call new installer API to restart servers after deployment
import copy
import os
import configobj
from letsencrypt import configuration
from letsencrypt import constants
from letsencrypt import client
from letsencrypt import crypto_util
from letsencrypt import notify
from letsencrypt import storage
from letsencrypt.plugins import disco as plugins_disco
class AttrDict(dict):
"""A trick to allow accessing dictionary keys as object
attributes."""
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
def renew(cert, old_version):
"""Perform automated renewal of the referenced cert, if possible.
:param class:`letsencrypt.storage.RenewableCert` cert: the certificate
lineage to attempt to renew.
:param int old_version: the version of the certificate lineage relative
to which the renewal should be attempted.
:returns: int referring to newly created version of this cert lineage,
or False if renewal was not successful."""
# TODO: handle partial success (some names can be renewed but not
# others)
# TODO: handle obligatory key rotation vs. optional key rotation vs.
# requested key rotation
if "renewalparams" not in cert.configfile:
# TODO: notify user?
return False
renewalparams = cert.configfile["renewalparams"]
if "authenticator" not in renewalparams:
# TODO: notify user?
return False
# Instantiate the appropriate authenticator
plugins = plugins_disco.PluginsRegistry.find_all()
config = configuration.NamespaceConfig(AttrDict(renewalparams))
# XXX: this loses type data (for example, the fact that key_size
# was an int, not a str)
config.rsa_key_size = int(config.rsa_key_size)
try:
authenticator = plugins[renewalparams["authenticator"]]
except KeyError:
# TODO: Notify user? (authenticator could not be found)
return False
authenticator = authenticator.init(config)
authenticator.prepare()
account = client.determine_account(config)
# TODO: are there other ways to get the right account object, e.g.
# based on the email parameter that might be present in
# renewalparams?
our_client = client.Client(config, account, authenticator, None)
with open(cert.version("cert", old_version)) as f:
sans = crypto_util.get_sans_from_cert(f.read())
new_cert, new_key, new_chain = our_client.obtain_certificate(sans)
if new_cert and new_key and new_chain:
# XXX: Assumes that there was no key change. We need logic
# for figuring out whether there was or not. Probably
# best is to have obtain_certificate return None for
# new_key if the old key is to be used (since save_successor
# already understands this distinction!)
return cert.save_successor(old_version, new_cert, new_key, new_chain)
# TODO: Notify results
else:
# TODO: Notify negative results
return False
# TODO: Consider the case where the renewal was partially successful
# (where fewer than all names were renewed)
def main(config=None):
"""main function for autorenewer script."""
# TODO: Distinguish automated invocation from manual invocation,
# perhaps by looking at sys.argv[0] and inhibiting automated
# invocations if /etc/letsencrypt/renewal.conf defaults have
# turned it off. (The boolean parameter should probably be
# called renewer_enabled.)
# Merge supplied config, if provided, on top of builtin defaults
defaults_copy = copy.deepcopy(constants.RENEWER_DEFAULTS)
defaults_copy.merge(config if config is not None else configobj.ConfigObj())
config = defaults_copy
# Now attempt to read the renewer config file and augment or replace
# the renewer defaults with any options contained in that file. If
# renewer_config_file is undefined or if the file is nonexistent or
# empty, this .merge() will have no effect. TODO: when we have a more
# elaborate renewer command line, we will presumably also be able to
# specify a config file on the command line, which, if provided, should
# take precedence over this one.
config.merge(configobj.ConfigObj(config.get("renewer_config_file", "")))
for i in os.listdir(config["renewal_configs_dir"]):
print "Processing", i
if not i.endswith(".conf"):
continue
rc_config = configobj.ConfigObj(
os.path.join(config["renewal_configs_dir"], i))
try:
cert = storage.RenewableCert(rc_config)
except ValueError:
# This indicates an invalid renewal configuration file, such
# as one missing a required parameter (in the future, perhaps
# also one that is internally inconsistent or is missing a
# required parameter). As a TODO, maybe we should warn the
# user about the existence of an invalid or corrupt renewal
# config rather than simply ignoring it.
continue
if cert.should_autodeploy():
cert.update_all_links_to(cert.latest_common_version())
# TODO: restart web server (invoke IInstaller.restart() method)
notify.notify("Autodeployed a cert!!!", "root", "It worked!")
# TODO: explain what happened
if cert.should_autorenew():
# Note: not cert.current_version() because the basis for
# the renewal is the latest version, even if it hasn't been
# deployed yet!
old_version = cert.latest_common_version()
renew(cert, old_version)
notify.notify("Autorenewed a cert!!!", "root", "It worked!")
# TODO: explain what happened

641
letsencrypt/storage.py Normal file
View File

@@ -0,0 +1,641 @@
"""The RenewableCert class, representing renewable lineages of
certificates and storing the associated cert data and metadata."""
import copy
import datetime
import os
import re
import time
import configobj
import OpenSSL
import parsedatetime
import pytz
import pyrfc3339
from letsencrypt import constants
from letsencrypt import le_util
ALL_FOUR = ("cert", "privkey", "chain", "fullchain")
def parse_time_interval(interval, textparser=parsedatetime.Calendar()):
"""Parse the time specified time interval.
The interval can be in the English-language format understood by
parsedatetime, e.g., '10 days', '3 weeks', '6 months', '9 hours',
or a sequence of such intervals like '6 months 1 week' or '3 days
12 hours'. If an integer is found with no associated unit, it is
interpreted by default as a number of days.
:param str interval: the time interval to parse.
:returns: the interpretation of the time interval.
:rtype: :class:`datetime.timedelta`"""
if interval.strip().isdigit():
interval += " days"
return datetime.timedelta(0, time.mktime(textparser.parse(
interval, time.localtime(0))[0]))
class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""Represents a lineage of certificates that is under the management
of the Let's Encrypt client, indicated by the existence of an
associated renewal configuration file.
Note that the notion of "current version" for a lineage is maintained
on disk in the structure of symbolic links, and is not explicitly
stored in any instance variable in this object. The RenewableCert
object is able to determine information about the current (or other)
version by accessing data on disk, but does not inherently know any
of this information except by examining the symbolic links as needed.
The instance variables mentioned below point to symlinks that reflect
the notion of "current version" of each managed object, and it is
these paths that should be used when configuring servers to use the
certificate managed in a lineage. These paths are normally within
the "live" directory, and their symlink targets -- the actual cert
files -- are normally found within the "archive" directory.
:ivar cert: The path to the symlink representing the current version
of the certificate managed by this lineage.
:type cert: str
:ivar privkey: The path to the symlink representing the current version
of the private key managed by this lineage.
:type privkey: str
:ivar chain: The path to the symlink representing the current version
of the chain managed by this lineage.
:type chain: str
:ivar fullchain: The path to the symlink representing the current version
of the fullchain (combined chain and cert) managed by this lineage.
:type fullchain: str
:ivar configuration: The renewal configuration options associated with
this lineage, obtained from parsing the renewal configuration file
and/or systemwide defaults.
:type configuration: :class:`configobj.ConfigObj`"""
def __init__(self, configfile, config_opts=None):
"""Instantiate a RenewableCert object from an existing lineage.
:param :class:`configobj.ConfigObj` configfile: an already-parsed
ConfigObj object made from reading the renewal config file that
defines this lineage.
:param :class:`configobj.ConfigObj` config_opts: systemwide defaults
for renewal properties not otherwise specified in the individual
renewal config file.
:raises ValueError: if the configuration file's name didn't end in
".conf", or the file is missing or broken.
:raises TypeError: if the provided renewal configuration isn't a
ConfigObj object."""
if isinstance(configfile, configobj.ConfigObj):
if not os.path.basename(configfile.filename).endswith(".conf"):
raise ValueError("renewal config file name must end in .conf")
self.lineagename = os.path.basename(
configfile.filename)[:-len(".conf")]
else:
raise TypeError("RenewableCert config must be ConfigObj object")
# self.configuration should be used to read parameters that
# may have been chosen based on default values from the
# systemwide renewal configuration; self.configfile should be
# used to make and save changes.
self.configfile = configfile
# TODO: Do we actually use anything from defaults and do we want to
# read further defaults from the systemwide renewal configuration
# file at this stage?
defaults_copy = copy.deepcopy(constants.RENEWER_DEFAULTS)
defaults_copy.merge(config_opts if config_opts is not None
else configobj.ConfigObj())
self.configuration = defaults_copy
self.configuration.merge(self.configfile)
if not all(x in self.configuration for x in ALL_FOUR):
raise ValueError("renewal config file {0} is missing a required "
"file reference".format(configfile))
self.cert = self.configuration["cert"]
self.privkey = self.configuration["privkey"]
self.chain = self.configuration["chain"]
self.fullchain = self.configuration["fullchain"]
def consistent(self):
"""Are the files associated with this lineage self-consistent?
:returns: whether the files stored in connection with this
lineage appear to be correct and consistent with one another.
:rtype: bool"""
# Each element must be referenced with an absolute path
if any(not os.path.isabs(x) for x in
(self.cert, self.privkey, self.chain, self.fullchain)):
return False
# Each element must exist and be a symbolic link
if any(not os.path.islink(x) for x in
(self.cert, self.privkey, self.chain, self.fullchain)):
return False
for kind in ALL_FOUR:
link = getattr(self, kind)
where = os.path.dirname(link)
target = os.readlink(link)
if not os.path.isabs(target):
target = os.path.join(where, target)
# Each element's link must point within the cert lineage's
# directory within the official archive directory
desired_directory = os.path.join(
self.configuration["archive_dir"], self.lineagename)
if not os.path.samefile(os.path.dirname(target),
desired_directory):
return False
# The link must point to a file that exists
if not os.path.exists(target):
return False
# The link must point to a file that follows the archive
# naming convention
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
if not pattern.match(os.path.basename(target)):
return False
# It is NOT required that the link's target be a regular
# file (it may itself be a symlink). But we should probably
# do a recursive check that ultimately the target does
# exist?
# XXX: Additional possible consistency checks (e.g.
# cryptographic validation of the chain being a chain,
# the chain matching the cert, and the cert matching
# the subject key)
# XXX: All four of the targets are in the same directory
# (This check is redundant with the check that they
# are all in the desired directory!)
# len(set(os.path.basename(self.current_target(x)
# for x in ALL_FOUR))) == 1
return True
def fix(self):
"""Attempt to fix defects or inconsistencies in this lineage.
(Currently unimplemented.)"""
# TODO: Figure out what kinds of fixes are possible. For
# example, checking if there is a valid version that
# we can update the symlinks to. (Maybe involve
# parsing keys and certs to see if they exist and
# if a key corresponds to the subject key of a cert?)
# TODO: In general, the symlink-reading functions below are not
# cautious enough about the possibility that links or their
# targets may not exist. (This shouldn't happen, but might
# happen as a result of random tampering by a sysadmin, or
# filesystem errors, or crashes.)
def current_target(self, kind):
"""Returns full path to which the specified item currently points.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: the path to the current version of the specified member.
:rtype: str"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
link = getattr(self, kind)
if not os.path.exists(link):
return None
target = os.readlink(link)
if not os.path.isabs(target):
target = os.path.join(os.path.dirname(link), target)
return target
def current_version(self, kind):
"""Returns numerical version of the specified item.
For example, if kind
is "chain" and the current chain link points to a file named
"chain7.pem", returns the integer 7.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: the current version of the specified member.
:rtype: int"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
target = self.current_target(kind)
if target is None or not os.path.exists(target):
target = ""
matches = pattern.match(os.path.basename(target))
if matches:
return int(matches.groups()[0])
else:
return None
def version(self, kind, version):
"""The filename that corresponds to the specified version and kind.
Warning: the specified version may not exist in this lineage. There
is no guarantee that the file path returned by this method actually
exists.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:param int version: the desired version
:returns: the path to the specified version of the specified member.
:rtype: str"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
where = os.path.dirname(self.current_target(kind))
return os.path.join(where, "{0}{1}.pem".format(kind, version))
def available_versions(self, kind):
"""Which alternative versions of the specified kind of item exist?
The archive directory where the current version is stored is
consulted to obtain the list of alternatives.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: all of the version numbers that currently exist
:rtype: list of int"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
where = os.path.dirname(self.current_target(kind))
files = os.listdir(where)
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
matches = [pattern.match(f) for f in files]
return sorted([int(m.groups()[0]) for m in matches if m])
def newest_available_version(self, kind):
"""What is the newest available version of the specified kind of item?
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: the newest available version of this member
:rtype: int"""
return max(self.available_versions(kind))
def latest_common_version(self):
"""What is the newest version for which all items are available?
:returns: the newest available version for which all members (cert,
privkey, chain, and fullchain) exist
:rtype: int"""
# TODO: this can raise ValueError if there is no version overlap
# (it should probably return None instead)
# TODO: this can raise a spurious AttributeError if the current
# link for any kind is missing (it should probably return None)
versions = [self.available_versions(x) for x in ALL_FOUR]
return max(n for n in versions[0] if all(n in v for v in versions[1:]))
def next_free_version(self):
"""What is the smallest version newer than all full or partial versions?
:returns: the smallest version number that is larger than any version
of any item currently stored in this lineage
:rtype: int
"""
# TODO: consider locking/mutual exclusion between updating processes
# This isn't self.latest_common_version() + 1 because we don't want
# collide with a version that might exist for one file type but not
# for the others.
return max(self.newest_available_version(x) for x in ALL_FOUR) + 1
def has_pending_deployment(self):
"""Is there a later version of all of the managed items?
:returns: True if there is a complete version of this lineage with
a larger version number than the current version, and False
otherwise
:rtype: bool"""
# TODO: consider whether to assume consistency or treat
# inconsistent/consistent versions differently
smallest_current = min(self.current_version(x) for x in ALL_FOUR)
return smallest_current < self.latest_common_version()
def update_link_to(self, kind, version):
"""Make the specified item point at the specified version.
(Note that this method doesn't verify that the specified version
exists.)
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:param int version: the desired version"""
if kind not in ALL_FOUR:
raise ValueError("unknown kind of item")
link = getattr(self, kind)
filename = "{0}{1}.pem".format(kind, version)
# Relative rather than absolute target directory
target_directory = os.path.dirname(os.readlink(link))
# TODO: it could be safer to make the link first under a temporary
# filename, then unlink the old link, then rename the new link
# to the old link; this ensures that this process is able to
# create symlinks.
# TODO: we might also want to check consistency of related links
# for the other corresponding items
os.unlink(link)
os.symlink(os.path.join(target_directory, filename), link)
def update_all_links_to(self, version):
"""Change all member objects to point to the specified version.
:param int version: the desired version"""
for kind in ALL_FOUR:
self.update_link_to(kind, version)
def _notafterbefore(self, method, version):
"""Internal helper function for finding notbefore/notafter."""
if version is None:
target = self.current_target("cert")
else:
target = self.version("cert", version)
pem = open(target).read()
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
pem)
i = method(x509)
return pyrfc3339.parse(i[0:4] + "-" + i[4:6] + "-" + i[6:8] + "T" +
i[8:10] + ":" + i[10:12] + ":" + i[12:])
def notbefore(self, version=None):
"""When does the specified cert version start being valid?
(If no version is specified, use the current version.)
:param int version: the desired version number
:returns: the notBefore value from the specified cert version in this
lineage
:rtype: :class:`datetime.datetime`"""
return self._notafterbefore(lambda x509: x509.get_notBefore(), version)
def notafter(self, version=None):
"""When does the specified cert version stop being valid?
(If no version is specified, use the current version.)
:param int version: the desired version number
:returns: the notAfter value from the specified cert version in this
lineage
:rtype: :class:`datetime.datetime`"""
return self._notafterbefore(lambda x509: x509.get_notAfter(), version)
def should_autodeploy(self):
"""Should this lineage now automatically deploy a newer version?
This is a policy question and does not only depend on whether there
is a newer version of the cert. (This considers whether autodeployment
is enabled, whether a relevant newer version exists, and whether the
time interval for autodeployment has been reached.)
:returns: whether the lineage now ought to autodeploy an existing
newer cert version
:rtype: bool"""
if ("autodeploy" not in self.configuration or
self.configuration.as_bool("autodeploy")):
if self.has_pending_deployment():
interval = self.configuration.get("deploy_before_expiry",
"5 days")
autodeploy_interval = parse_time_interval(interval)
expiry = self.notafter()
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
remaining = expiry - now
if remaining < autodeploy_interval:
return True
return False
def ocsp_revoked(self, version=None):
# pylint: disable=no-self-use,unused-argument
"""Is the specified cert version revoked according to OCSP?
Also returns True if the cert version is declared as intended to be
revoked according to Let's Encrypt OCSP extensions. (If no version
is specified, uses the current version.)
This method is not yet implemented and currently always returns False.
:param int version: the desired version number
:returns: whether the certificate is or will be revoked
:rtype: bool"""
# XXX: This query and its associated network service aren't
# implemented yet, so we currently return False (indicating that the
# certificate is not revoked).
return False
def should_autorenew(self):
"""Should we now try to autorenew the most recent cert version?
This is a policy question and does not only depend on whether the
cert is expired. (This considers whether autorenewal is enabled,
whether the cert is revoked, and whether the time interval for
autorenewal has been reached.)
Note that this examines the numerically most recent cert version,
not the currently deployed version.
:returns: whether an attempt should now be made to autorenew the
most current cert version in this lineage
:rtype: bool"""
if ("autorenew" not in self.configuration
or self.configuration.as_bool("autorenew")):
# Consider whether to attempt to autorenew this cert now
# Renewals on the basis of revocation
if self.ocsp_revoked(self.latest_common_version()):
return True
# Renewals on the basis of expiry time
interval = self.configuration.get("renew_before_expiry", "10 days")
autorenew_interval = parse_time_interval(interval)
expiry = self.notafter(self.latest_common_version())
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
remaining = expiry - now
if remaining < autorenew_interval:
return True
return False
@classmethod
def new_lineage(cls, lineagename, cert, privkey, chain,
renewalparams=None, config=None):
# pylint: disable=too-many-locals,too-many-arguments
"""Create a new certificate lineage.
Attempts to create a certificate lineage -- enrolled for potential
future renewal -- with the (suggested) lineage name lineagename,
and the associated cert, privkey, and chain (the associated
fullchain will be created automatically). Optional configurator
and renewalparams record the configuration that was originally
used to obtain this cert, so that it can be reused later during
automated renewal.
Returns a new RenewableCert object referring to the created
lineage. (The actual lineage name, as well as all the relevant
file paths, will be available within this object.)
:param str lineagename: the suggested name for this lineage
(normally the current cert's first subject DNS name)
:param str cert: the initial certificate version in PEM format
:param str privkey: the private key in PEM format
:param str chain: the certificate chain in PEM format
:param :class:`configobj.ConfigObj` renewalparams: parameters that
should be used when instantiating authenticator and installer
objects in the future to attempt to renew this cert or deploy
new versions of it
:param :class:`configobj.ConfigObj` config: renewal configuration
defaults, affecting, for example, the locations of the
directories where the associated files will be saved
:returns: the newly-created RenewalCert object
:rtype: :class:`storage.renewableCert`"""
defaults_copy = copy.deepcopy(constants.RENEWER_DEFAULTS)
defaults_copy.merge(config if config is not None
else configobj.ConfigObj())
config = defaults_copy
# This attempts to read the renewer config file and augment or replace
# the renewer defaults with any options contained in that file. If
# renewer_config_file is undefined or if the file is nonexistent or
# empty, this .merge() will have no effect.
config.merge(configobj.ConfigObj(config.get("renewer_config_file", "")))
# Examine the configuration and find the new lineage's name
configs_dir = config["renewal_configs_dir"]
archive_dir = config["archive_dir"]
live_dir = config["live_dir"]
for i in (configs_dir, archive_dir, live_dir):
if not os.path.exists(i):
os.makedirs(i, 0700)
config_file, config_filename = le_util.unique_lineage_name(configs_dir,
lineagename)
if not config_filename.endswith(".conf"):
raise ValueError("renewal config file name must end in .conf")
# Determine where on disk everything will go
# lineagename will now potentially be modified based on which
# renewal configuration file could actually be created
lineagename = os.path.basename(config_filename)[:-len(".conf")]
archive = os.path.join(archive_dir, lineagename)
live_dir = os.path.join(live_dir, lineagename)
if os.path.exists(archive):
raise ValueError("archive directory exists for " + lineagename)
if os.path.exists(live_dir):
raise ValueError("live directory exists for " + lineagename)
os.mkdir(archive)
os.mkdir(live_dir)
relative_archive = os.path.join("..", "..", "archive", lineagename)
# Put the data into the appropriate files on disk
target = dict([(kind, os.path.join(live_dir, kind + ".pem"))
for kind in ALL_FOUR])
for kind in ALL_FOUR:
os.symlink(os.path.join(relative_archive, kind + "1.pem"),
target[kind])
with open(target["cert"], "w") as f:
f.write(cert)
with open(target["privkey"], "w") as f:
f.write(privkey)
# XXX: Let's make sure to get the file permissions right here
with open(target["chain"], "w") as f:
f.write(chain)
with open(target["fullchain"], "w") as f:
f.write(cert + chain)
# Document what we've done in a new renewal config file
config_file.close()
new_config = configobj.ConfigObj(config_filename, create_empty=True)
for kind in ALL_FOUR:
new_config[kind] = target[kind]
if renewalparams:
new_config["renewalparams"] = renewalparams
new_config.comments["renewalparams"] = ["",
"Options and defaults used"
" in the renewal process"]
# TODO: add human-readable comments explaining other available
# parameters
new_config.write()
return cls(new_config, config)
def save_successor(self, prior_version, new_cert, new_privkey, new_chain):
"""Save new cert and chain as a successor of a prior version.
Returns the new version number that was created. Note: does NOT
update links to deploy this version.
:param int prior_version: the old version to which this version is
regarded as a successor (used to choose a privkey, if the key
has not changed, but otherwise this information is not permanently
recorded anywhere)
:param str new_cert: the new certificate, in PEM format
:param str new_privkey: the new private key, in PEM format, or None,
if the private key has not changed
:param str new_chain: the new chain, in PEM format
:returns: the new version number that was created
:rtype: int"""
# XXX: assumes official archive location rather than examining links
# XXX: consider using os.open for availablity of os.O_EXCL
# XXX: ensure file permissions are correct; also create directories
# if needed (ensuring their permissions are correct)
# Figure out what the new version is and hence where to save things
target_version = self.next_free_version()
archive = self.configuration["archive_dir"]
prefix = os.path.join(archive, self.lineagename)
target = dict(
[(kind,
os.path.join(prefix, "{0}{1}.pem".format(kind, target_version)))
for kind in ALL_FOUR])
# Distinguish the cases where the privkey has changed and where it
# has not changed (in the latter case, making an appropriate symlink
# to an earlier privkey version)
if new_privkey is None:
# The behavior below keeps the prior key by creating a new
# symlink to the old key or the target of the old key symlink.
old_privkey = os.path.join(
prefix, "privkey{0}.pem".format(prior_version))
if os.path.islink(old_privkey):
old_privkey = os.readlink(old_privkey)
else:
old_privkey = "privkey{0}.pem".format(prior_version)
os.symlink(old_privkey, target["privkey"])
else:
with open(target["privkey"], "w") as f:
f.write(new_privkey)
# Save everything else
with open(target["cert"], "w") as f:
f.write(new_cert)
with open(target["chain"], "w") as f:
f.write(new_chain)
with open(target["fullchain"], "w") as f:
f.write(new_cert + new_chain)
return target_version

View File

@@ -15,6 +15,10 @@ RSA256_KEY = pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa256_key.pem'))
RSA512_KEY = pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))
CERT = pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))
SAN_CERT = pkg_resources.resource_string(
'letsencrypt.tests', os.path.join('testdata', 'cert-san.pem'))
class InitSaveKeyTest(unittest.TestCase):
@@ -68,6 +72,27 @@ class InitSaveCSRTest(unittest.TestCase):
self.assertEqual(csr.data, 'csr_der')
self.assertTrue('csr-letsencrypt.pem' in csr.file)
class MakeCSRTest(unittest.TestCase):
"""Tests for letsencrypt.crypto_util.make_csr."""
@classmethod
def _call(cls, *args, **kwargs):
from letsencrypt.crypto_util import make_csr
return make_csr(*args, **kwargs)
def test_san(self):
from letsencrypt.crypto_util import get_sans_from_csr
# TODO: Fails for RSA256_KEY
csr_pem, csr_der = self._call(
RSA512_KEY, ['example.com', 'www.example.com'])
self.assertEqual(
['example.com', 'www.example.com'], get_sans_from_csr(csr_pem))
self.assertEqual(
['example.com', 'www.example.com'], get_sans_from_csr(
csr_der, OpenSSL.crypto.FILETYPE_ASN1))
class ValidCSRTest(unittest.TestCase):
"""Tests for letsencrypt.crypto_util.valid_csr."""
@@ -151,7 +176,26 @@ class MakeSSCertTest(unittest.TestCase):
make_ss_cert(RSA512_KEY, ['example.com', 'www.example.com'])
class GetSansFromCsrTest(unittest.TestCase):
class GetSANsFromCertTest(unittest.TestCase):
"""Tests for letsencrypt.crypto_util.get_sans_from_cert."""
@classmethod
def _call(cls, *args, **kwargs):
from letsencrypt.crypto_util import get_sans_from_cert
return get_sans_from_cert(*args, **kwargs)
def test_single(self):
self.assertEqual([], self._call(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'cert.pem'))))
def test_san(self):
self.assertEqual(
['example.com', 'www.example.com'],
self._call(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'cert-san.pem'))))
class GetSANsFromCSRTest(unittest.TestCase):
"""Tests for letsencrypt.crypto_util.get_sans_from_csr."""
def test_extract_one_san(self):
from letsencrypt.crypto_util import get_sans_from_csr

View File

@@ -1,4 +1,5 @@
"""Tests for letsencrypt.le_util."""
import errno
import os
import shutil
import stat
@@ -78,7 +79,7 @@ class CheckPermissionsTest(unittest.TestCase):
class UniqueFileTest(unittest.TestCase):
"""Tests for letsencrypt.class.le_util.unique_file."""
"""Tests for letsencrypt.le_util.unique_file."""
def setUp(self):
self.root_path = tempfile.mkdtemp()
@@ -122,5 +123,45 @@ class UniqueFileTest(unittest.TestCase):
self.assertTrue(basename3.endswith('foo.txt'))
class UniqueLineageNameTest(unittest.TestCase):
"""Tests for letsencrypt.le_util.unique_lineage_name."""
def setUp(self):
self.root_path = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.root_path, ignore_errors=True)
def _call(self, filename, mode=0o777):
from letsencrypt.le_util import unique_lineage_name
return unique_lineage_name(self.root_path, filename, mode)
def test_basic(self):
f, name = self._call("wow")
self.assertTrue(isinstance(f, file))
self.assertTrue(isinstance(name, str))
def test_multiple(self):
for _ in range(10):
f, name = self._call("wow")
self.assertTrue(isinstance(f, file))
self.assertTrue(isinstance(name, str))
self.assertTrue("wow-0009.conf" in name)
@mock.patch("letsencrypt.le_util.os.fdopen")
def test_failure(self, mock_fdopen):
err = OSError("whoops")
err.errno = errno.EIO
mock_fdopen.side_effect = err
self.assertRaises(OSError, self._call, "wow")
@mock.patch("letsencrypt.le_util.os.fdopen")
def test_subsequent_failure(self, mock_fdopen):
self._call("wow")
err = OSError("whoops")
err.errno = errno.EIO
mock_fdopen.side_effect = err
self.assertRaises(OSError, self._call, "wow")
if __name__ == '__main__':
unittest.main() # pragma: no cover

View File

@@ -0,0 +1,51 @@
"""Tests for letsencrypt/notify.py"""
import mock
import socket
import unittest
class NotifyTests(unittest.TestCase):
"""Tests for the notifier."""
@mock.patch("letsencrypt.notify.smtplib.LMTP")
def test_smtp_success(self, mock_lmtp):
from letsencrypt.notify import notify
lmtp_obj = mock.MagicMock()
mock_lmtp.return_value = lmtp_obj
self.assertTrue(notify("Goose", "auntrhody@example.com",
"The old grey goose is dead."))
self.assertEqual(lmtp_obj.connect.call_count, 1)
self.assertEqual(lmtp_obj.sendmail.call_count, 1)
@mock.patch("letsencrypt.notify.smtplib.LMTP")
@mock.patch("letsencrypt.notify.subprocess.Popen")
def test_smtp_failure(self, mock_popen, mock_lmtp):
from letsencrypt.notify import notify
lmtp_obj = mock.MagicMock()
mock_lmtp.return_value = lmtp_obj
lmtp_obj.sendmail.side_effect = socket.error(17)
proc = mock.MagicMock()
mock_popen.return_value = proc
self.assertTrue(notify("Goose", "auntrhody@example.com",
"The old grey goose is dead."))
self.assertEqual(lmtp_obj.sendmail.call_count, 1)
self.assertEqual(proc.communicate.call_count, 1)
@mock.patch("letsencrypt.notify.smtplib.LMTP")
@mock.patch("letsencrypt.notify.subprocess.Popen")
def test_everything_fails(self, mock_popen, mock_lmtp):
from letsencrypt.notify import notify
lmtp_obj = mock.MagicMock()
mock_lmtp.return_value = lmtp_obj
lmtp_obj.sendmail.side_effect = socket.error(17)
proc = mock.MagicMock()
mock_popen.return_value = proc
proc.communicate.side_effect = OSError("What we have here is a "
"failure to communicate.")
self.assertFalse(notify("Goose", "auntrhody@example.com",
"The old grey goose is dead."))
self.assertEqual(lmtp_obj.sendmail.call_count, 1)
self.assertEqual(proc.communicate.call_count, 1)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -0,0 +1,650 @@
"""Tests for letsencrypt/renewer.py"""
import datetime
import os
import tempfile
import pkg_resources
import shutil
import unittest
import configobj
import mock
import pytz
from letsencrypt.storage import ALL_FOUR
def unlink_all(rc_object):
"""Unlink all four items associated with this RenewableCert.
(Helper function.)"""
for kind in ALL_FOUR:
os.unlink(getattr(rc_object, kind))
def fill_with_sample_data(rc_object):
"""Put dummy data into all four files of this RenewableCert.
(Helper function.)"""
for kind in ALL_FOUR:
with open(getattr(rc_object, kind), "w") as f:
f.write(kind)
class RenewableCertTests(unittest.TestCase):
# pylint: disable=too-many-public-methods
"""Tests for the RenewableCert class as well as other functions
within renewer.py."""
def setUp(self):
from letsencrypt import storage
self.tempdir = tempfile.mkdtemp()
os.makedirs(os.path.join(self.tempdir, "live", "example.org"))
os.makedirs(os.path.join(self.tempdir, "archive", "example.org"))
os.makedirs(os.path.join(self.tempdir, "configs"))
defaults = configobj.ConfigObj()
defaults["live_dir"] = os.path.join(self.tempdir, "live")
defaults["archive_dir"] = os.path.join(self.tempdir, "archive")
defaults["renewal_configs_dir"] = os.path.join(self.tempdir,
"configs")
config = configobj.ConfigObj()
for kind in ALL_FOUR:
config[kind] = os.path.join(self.tempdir, "live", "example.org",
kind + ".pem")
config.filename = os.path.join(self.tempdir, "configs",
"example.org.conf")
self.defaults = defaults # for main() test
self.test_rc = storage.RenewableCert(config, defaults)
def tearDown(self):
shutil.rmtree(self.tempdir)
def test_initialization(self):
self.assertEqual(self.test_rc.lineagename, "example.org")
for kind in ALL_FOUR:
self.assertEqual(
getattr(self.test_rc, kind), os.path.join(
self.tempdir, "live", "example.org", kind + ".pem"))
def test_renewal_bad_config(self):
"""Test that the RenewableCert constructor will complain if
the renewal configuration file doesn't end in ".conf" or if it
isn't a ConfigObj."""
from letsencrypt import storage
defaults = configobj.ConfigObj()
config = configobj.ConfigObj()
# These files don't exist and aren't created here; the point of the test
# is to confirm that the constructor rejects them outright because of
# the configfile's name.
for kind in ALL_FOUR:
config["cert"] = "nonexistent_" + kind + ".pem"
config.filename = "nonexistent_sillyfile"
self.assertRaises(ValueError, storage.RenewableCert, config, defaults)
self.assertRaises(TypeError, storage.RenewableCert, "fun", defaults)
def test_renewal_incomplete_config(self):
"""Test that the RenewableCert constructor will complain if
the renewal configuration file is missing a required file element."""
from letsencrypt import storage
defaults = configobj.ConfigObj()
config = configobj.ConfigObj()
config["cert"] = "imaginary_cert.pem"
# Here the required privkey is missing.
config["chain"] = "imaginary_chain.pem"
config["fullchain"] = "imaginary_fullchain.pem"
config.filename = "imaginary_config.conf"
self.assertRaises(ValueError, storage.RenewableCert, config, defaults)
def test_consistent(self): # pylint: disable=too-many-statements
oldcert = self.test_rc.cert
self.test_rc.cert = "relative/path"
# Absolute path for item requirement
self.assertFalse(self.test_rc.consistent())
self.test_rc.cert = oldcert
# Items must exist requirement
self.assertFalse(self.test_rc.consistent())
# Items must be symlinks requirements
fill_with_sample_data(self.test_rc)
self.assertFalse(self.test_rc.consistent())
unlink_all(self.test_rc)
# Items must point to desired place if they are relative
for kind in ALL_FOUR:
os.symlink(os.path.join("..", kind + "17.pem"),
getattr(self.test_rc, kind))
self.assertFalse(self.test_rc.consistent())
unlink_all(self.test_rc)
# Items must point to desired place if they are absolute
for kind in ALL_FOUR:
os.symlink(os.path.join(self.tempdir, kind + "17.pem"),
getattr(self.test_rc, kind))
self.assertFalse(self.test_rc.consistent())
unlink_all(self.test_rc)
# Items must point to things that exist
for kind in ALL_FOUR:
os.symlink(os.path.join("..", "..", "archive", "example.org",
kind + "17.pem"),
getattr(self.test_rc, kind))
self.assertFalse(self.test_rc.consistent())
# This version should work
fill_with_sample_data(self.test_rc)
self.assertTrue(self.test_rc.consistent())
# Items must point to things that follow the naming convention
os.unlink(self.test_rc.fullchain)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"fullchain_17.pem"), self.test_rc.fullchain)
with open(self.test_rc.fullchain, "w") as f:
f.write("wrongly-named fullchain")
self.assertFalse(self.test_rc.consistent())
def test_current_target(self):
# Relative path logic
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert17.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write("cert")
self.assertTrue(os.path.samefile(self.test_rc.current_target("cert"),
os.path.join(self.tempdir, "archive",
"example.org",
"cert17.pem")))
# Absolute path logic
os.unlink(self.test_rc.cert)
os.symlink(os.path.join(self.tempdir, "archive", "example.org",
"cert17.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write("cert")
self.assertTrue(os.path.samefile(self.test_rc.current_target("cert"),
os.path.join(self.tempdir, "archive",
"example.org",
"cert17.pem")))
def test_current_version(self):
for ver in (1, 5, 10, 20):
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert{0}.pem".format(ver)),
self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write("cert")
os.unlink(self.test_rc.cert)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert10.pem"), self.test_rc.cert)
self.assertEqual(self.test_rc.current_version("cert"), 10)
def test_no_current_version(self):
self.assertEqual(self.test_rc.current_version("cert"), None)
def test_latest_and_next_versions(self):
for ver in range(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.assertEqual(self.test_rc.latest_common_version(), 5)
self.assertEqual(self.test_rc.next_free_version(), 6)
# Having one kind of file of a later version doesn't change the
# result
os.unlink(self.test_rc.privkey)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"privkey7.pem"), self.test_rc.privkey)
with open(self.test_rc.privkey, "w") as f:
f.write("privkey")
self.assertEqual(self.test_rc.latest_common_version(), 5)
# ... although it does change the next free version
self.assertEqual(self.test_rc.next_free_version(), 8)
# Nor does having three out of four change the result
os.unlink(self.test_rc.cert)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert7.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write("cert")
os.unlink(self.test_rc.fullchain)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"fullchain7.pem"), self.test_rc.fullchain)
with open(self.test_rc.fullchain, "w") as f:
f.write("fullchain")
self.assertEqual(self.test_rc.latest_common_version(), 5)
# If we have everything from a much later version, it does change
# the result
ver = 17
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.assertEqual(self.test_rc.latest_common_version(), 17)
self.assertEqual(self.test_rc.next_free_version(), 18)
def test_update_link_to(self):
for ver in range(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.assertEqual(ver, self.test_rc.current_version(kind))
self.test_rc.update_link_to("cert", 3)
self.test_rc.update_link_to("privkey", 2)
self.assertEqual(3, self.test_rc.current_version("cert"))
self.assertEqual(2, self.test_rc.current_version("privkey"))
self.assertEqual(5, self.test_rc.current_version("chain"))
self.assertEqual(5, self.test_rc.current_version("fullchain"))
# Currently we are allowed to update to a version that doesn't exist
self.test_rc.update_link_to("chain", 3000)
# However, current_version doesn't allow querying the resulting
# version (because it's a broken link).
self.assertEqual(os.path.basename(os.readlink(self.test_rc.chain)),
"chain3000.pem")
def test_version(self):
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert12.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write("cert")
# TODO: We should probably test that the directory is still the
# same, but it's tricky because we can get an absolute
# path out when we put a relative path in.
self.assertEqual("cert8.pem",
os.path.basename(self.test_rc.version("cert", 8)))
def test_update_all_links_to(self):
for ver in range(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.assertEqual(ver, self.test_rc.current_version(kind))
self.assertEqual(self.test_rc.latest_common_version(), 5)
for ver in range(1, 6):
self.test_rc.update_all_links_to(ver)
for kind in ALL_FOUR:
self.assertEqual(ver, self.test_rc.current_version(kind))
self.assertEqual(self.test_rc.latest_common_version(), 5)
def test_has_pending_deployment(self):
for ver in range(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.assertEqual(ver, self.test_rc.current_version(kind))
for ver in range(1, 6):
self.test_rc.update_all_links_to(ver)
for kind in ALL_FOUR:
self.assertEqual(ver, self.test_rc.current_version(kind))
if ver < 5:
self.assertTrue(self.test_rc.has_pending_deployment())
else:
self.assertFalse(self.test_rc.has_pending_deployment())
def _test_notafterbefore(self, function, timestamp):
test_cert = pkg_resources.resource_string(
"letsencrypt.tests", "testdata/cert.pem")
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert12.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write(test_cert)
desired_time = datetime.datetime.utcfromtimestamp(timestamp)
desired_time = desired_time.replace(tzinfo=pytz.UTC)
for result in (function(), function(12)):
self.assertEqual(result, desired_time)
self.assertEqual(result.utcoffset(), datetime.timedelta(0))
def test_notbefore(self):
self._test_notafterbefore(self.test_rc.notbefore, 1418337285)
# 2014-12-11 22:34:45+00:00 = Unix time 1418337285
def test_notafter(self):
self._test_notafterbefore(self.test_rc.notafter, 1418942085)
# 2014-12-18 22:34:45+00:00 = Unix time 1418942085
@mock.patch("letsencrypt.storage.datetime")
def test_time_interval_judgments(self, mock_datetime):
"""Test should_autodeploy() and should_autorenew() on the basis
of expiry time windows."""
test_cert = pkg_resources.resource_string(
"letsencrypt.tests", "testdata/cert.pem")
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}12.pem".format(kind)), where)
with open(where, "w") as f:
f.write(kind)
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}11.pem".format(kind)), where)
with open(where, "w") as f:
f.write(kind)
self.test_rc.update_all_links_to(12)
with open(self.test_rc.cert, "w") as f:
f.write(test_cert)
self.test_rc.update_all_links_to(11)
with open(self.test_rc.cert, "w") as f:
f.write(test_cert)
mock_datetime.timedelta = datetime.timedelta
for (current_time, interval, result) in [
# 2014-12-13 12:00:00+00:00 (about 5 days prior to expiry)
# Times that should result in autorenewal/autodeployment
(1418472000, "2 months", True), (1418472000, "1 week", True),
# Times that should not
(1418472000, "4 days", False), (1418472000, "2 days", False),
# 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry)
# Times that should result in autorenewal/autodeployment
(1241179200, "7 years", True),
(1241179200, "11 years 2 months", True),
# Times that should not
(1241179200, "8 hours", False), (1241179200, "2 days", False),
(1241179200, "40 days", False), (1241179200, "9 months", False),
# 2015-01-01 (after expiry has already happened, so all
# intervals should cause autorenewal/autodeployment)
(1420070400, "0 seconds", True),
(1420070400, "10 seconds", True),
(1420070400, "10 minutes", True),
(1420070400, "10 weeks", True), (1420070400, "10 months", True),
(1420070400, "10 years", True), (1420070400, "99 months", True),
]:
sometime = datetime.datetime.utcfromtimestamp(current_time)
mock_datetime.datetime.utcnow.return_value = sometime
self.test_rc.configuration["deploy_before_expiry"] = interval
self.test_rc.configuration["renew_before_expiry"] = interval
self.assertEqual(self.test_rc.should_autodeploy(), result)
self.assertEqual(self.test_rc.should_autorenew(), result)
def test_should_autodeploy(self):
"""Test should_autodeploy() on the basis of reasons other than
expiry time window."""
# pylint: disable=too-many-statements
# Autodeployment turned off
self.test_rc.configuration["autodeploy"] = "0"
self.assertFalse(self.test_rc.should_autodeploy())
self.test_rc.configuration["autodeploy"] = "1"
# No pending deployment
for ver in range(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.assertFalse(self.test_rc.should_autodeploy())
@mock.patch("letsencrypt.storage.RenewableCert.ocsp_revoked")
def test_should_autorenew(self, mock_ocsp):
"""Test should_autorenew on the basis of reasons other than
expiry time window."""
# pylint: disable=too-many-statements
# Autorenewal turned off
self.test_rc.configuration["autorenew"] = "0"
self.assertFalse(self.test_rc.should_autorenew())
self.test_rc.configuration["autorenew"] = "1"
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}12.pem".format(kind)), where)
with open(where, "w") as f:
f.write(kind)
# Mandatory renewal on the basis of OCSP revocation
mock_ocsp.return_value = True
self.assertTrue(self.test_rc.should_autorenew())
mock_ocsp.return_value = False
def test_save_successor(self):
for ver in range(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
if os.path.islink(where):
os.unlink(where)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"{0}{1}.pem".format(kind, ver)), where)
with open(where, "w") as f:
f.write(kind)
self.test_rc.update_all_links_to(3)
self.assertEqual(6, self.test_rc.save_successor(3, "new cert", None,
"new chain"))
with open(self.test_rc.version("cert", 6)) as f:
self.assertEqual(f.read(), "new cert")
with open(self.test_rc.version("chain", 6)) as f:
self.assertEqual(f.read(), "new chain")
with open(self.test_rc.version("fullchain", 6)) as f:
self.assertEqual(f.read(), "new cert" + "new chain")
# version 6 of the key should be a link back to version 3
self.assertFalse(os.path.islink(self.test_rc.version("privkey", 3)))
self.assertTrue(os.path.islink(self.test_rc.version("privkey", 6)))
# Let's try two more updates
self.assertEqual(7, self.test_rc.save_successor(6, "again", None,
"newer chain"))
self.assertEqual(8, self.test_rc.save_successor(7, "hello", None,
"other chain"))
# All of the subsequent versions should link directly to the original
# privkey.
for i in (6, 7, 8):
self.assertTrue(os.path.islink(self.test_rc.version("privkey", i)))
self.assertEqual("privkey3.pem", os.path.basename(os.readlink(
self.test_rc.version("privkey", i))))
for kind in ALL_FOUR:
self.assertEqual(self.test_rc.available_versions(kind), range(1, 9))
self.assertEqual(self.test_rc.current_version(kind), 3)
# Test updating from latest version rather than old version
self.test_rc.update_all_links_to(8)
self.assertEqual(9, self.test_rc.save_successor(8, "last", None,
"attempt"))
for kind in ALL_FOUR:
self.assertEqual(self.test_rc.available_versions(kind),
range(1, 10))
self.assertEqual(self.test_rc.current_version(kind), 8)
with open(self.test_rc.version("fullchain", 9)) as f:
self.assertEqual(f.read(), "last" + "attempt")
# Test updating when providing a new privkey. The key should
# be saved in a new file rather than creating a new symlink.
self.assertEqual(10, self.test_rc.save_successor(9, "with", "a",
"key"))
self.assertTrue(os.path.exists(self.test_rc.version("privkey", 10)))
self.assertFalse(os.path.islink(self.test_rc.version("privkey", 10)))
def test_new_lineage(self):
"""Test for new_lineage() class method."""
from letsencrypt import storage
config_dir = self.defaults["renewal_configs_dir"]
archive_dir = self.defaults["archive_dir"]
live_dir = self.defaults["live_dir"]
result = storage.RenewableCert.new_lineage("the-lineage.com", "cert",
"privkey", "chain", None,
self.defaults)
# This consistency check tests most relevant properties about the
# newly created cert lineage.
self.assertTrue(result.consistent())
self.assertTrue(os.path.exists(os.path.join(config_dir,
"the-lineage.com.conf")))
with open(result.fullchain) as f:
self.assertEqual(f.read(), "cert" + "chain")
# Let's do it again and make sure it makes a different lineage
result = storage.RenewableCert.new_lineage("the-lineage.com", "cert2",
"privkey2", "chain2", None,
self.defaults)
self.assertTrue(os.path.exists(
os.path.join(config_dir, "the-lineage.com-0001.conf")))
# Now trigger the detection of already existing files
os.mkdir(os.path.join(live_dir, "the-lineage.com-0002"))
self.assertRaises(ValueError, storage.RenewableCert.new_lineage,
"the-lineage.com", "cert3", "privkey3", "chain3",
None, self.defaults)
os.mkdir(os.path.join(archive_dir, "other-example.com"))
self.assertRaises(ValueError, storage.RenewableCert.new_lineage,
"other-example.com", "cert4", "privkey4", "chain4",
None, self.defaults)
# Make sure it can accept renewal parameters
params = {"stuff": "properties of stuff", "great": "awesome"}
result = storage.RenewableCert.new_lineage("the-lineage.com", "cert2",
"privkey2", "chain2",
params, self.defaults)
# TODO: Conceivably we could test that the renewal parameters actually
# got saved
def test_new_lineage_nonexistent_dirs(self):
"""Test that directories can be created if they don't exist."""
from letsencrypt import storage
config_dir = self.defaults["renewal_configs_dir"]
archive_dir = self.defaults["archive_dir"]
live_dir = self.defaults["live_dir"]
shutil.rmtree(config_dir)
shutil.rmtree(archive_dir)
shutil.rmtree(live_dir)
storage.RenewableCert.new_lineage("the-lineage.com", "cert2",
"privkey2", "chain2",
None, self.defaults)
self.assertTrue(os.path.exists(
os.path.join(config_dir, "the-lineage.com.conf")))
self.assertTrue(os.path.exists(
os.path.join(live_dir, "the-lineage.com", "privkey.pem")))
self.assertTrue(os.path.exists(
os.path.join(archive_dir, "the-lineage.com", "privkey1.pem")))
@mock.patch("letsencrypt.storage.le_util.unique_lineage_name")
def test_invalid_config_filename(self, mock_uln):
from letsencrypt import storage
mock_uln.return_value = "this_does_not_end_with_dot_conf", "yikes"
self.assertRaises(ValueError, storage.RenewableCert.new_lineage,
"example.com", "cert", "privkey", "chain",
None, self.defaults)
def test_bad_kind(self):
self.assertRaises(ValueError, self.test_rc.current_target, "elephant")
self.assertRaises(ValueError, self.test_rc.current_version, "elephant")
self.assertRaises(ValueError, self.test_rc.version, "elephant", 17)
self.assertRaises(ValueError, self.test_rc.available_versions,
"elephant")
self.assertRaises(ValueError, self.test_rc.newest_available_version,
"elephant")
self.assertRaises(ValueError, self.test_rc.update_link_to,
"elephant", 17)
def test_ocsp_revoked(self):
# XXX: This is currently hardcoded to False due to a lack of an
# OCSP server to test against.
self.assertFalse(self.test_rc.ocsp_revoked())
def test_parse_time_interval(self):
from letsencrypt import storage
# XXX: I'm not sure if intervals related to years and months
# take account of the current date (if so, some of these
# may fail in the future, like in leap years or even in
# months of different lengths!)
intended = {"": 0, "17 days": 17, "23": 23, "1 month": 31,
"7 weeks": 49, "1 year 1 day": 366, "1 year-1 day": 364,
"4 years": 1461}
for time in intended:
self.assertEqual(storage.parse_time_interval(time),
datetime.timedelta(intended[time]))
@mock.patch("letsencrypt.renewer.plugins_disco")
@mock.patch("letsencrypt.client.determine_account")
@mock.patch("letsencrypt.client.Client")
def test_renew(self, mock_c, mock_da, mock_pd):
"""Tests for renew()."""
from letsencrypt import renewer
test_cert = pkg_resources.resource_string(
"letsencrypt.tests", "testdata/cert-san.pem")
for kind in ALL_FOUR:
os.symlink(os.path.join("..", "..", "archive", "example.org",
kind + "1.pem"),
getattr(self.test_rc, kind))
fill_with_sample_data(self.test_rc)
with open(self.test_rc.cert, "w") as f:
f.write(test_cert)
# Fails because renewalparams are missing
self.assertFalse(renewer.renew(self.test_rc, 1))
self.test_rc.configfile["renewalparams"] = {"some": "stuff"}
# Fails because there's no authenticator specified
self.assertFalse(renewer.renew(self.test_rc, 1))
self.test_rc.configfile["renewalparams"]["rsa_key_size"] = "2048"
self.test_rc.configfile["renewalparams"]["server"] = "acme.example.com"
self.test_rc.configfile["renewalparams"]["authenticator"] = "fake"
mock_auth = mock.MagicMock()
mock_pd.PluginsRegistry.find_all.return_value = {"apache": mock_auth}
# Fails because "fake" != "apache"
self.assertFalse(renewer.renew(self.test_rc, 1))
self.test_rc.configfile["renewalparams"]["authenticator"] = "apache"
mock_client = mock.MagicMock()
mock_client.obtain_certificate.return_value = ("cert", "key", "chain")
mock_c.return_value = mock_client
self.assertEqual(2, renewer.renew(self.test_rc, 1))
# TODO: We could also make several assertions about calls that should
# have been made to the mock functions here.
self.assertEqual(mock_da.call_count, 1)
mock_client.obtain_certificate.return_value = (None, None, None)
# This should fail because the renewal itself appears to fail
self.assertEqual(False, renewer.renew(self.test_rc, 1))
@mock.patch("letsencrypt.renewer.notify")
@mock.patch("letsencrypt.storage.RenewableCert")
@mock.patch("letsencrypt.renewer.renew")
def test_main(self, mock_renew, mock_rc, mock_notify):
"""Test for main() function."""
from letsencrypt import renewer
mock_rc_instance = mock.MagicMock()
mock_rc_instance.should_autodeploy.return_value = True
mock_rc_instance.should_autorenew.return_value = True
mock_rc_instance.latest_common_version.return_value = 10
mock_rc.return_value = mock_rc_instance
with open(os.path.join(self.defaults["renewal_configs_dir"],
"README"), "w") as f:
f.write("This is a README file to make sure that the renewer is")
f.write("able to correctly ignore files that don't end in .conf.")
with open(os.path.join(self.defaults["renewal_configs_dir"],
"example.org.conf"), "w") as f:
# This isn't actually parsed in this test; we have a separate
# test_initialization that tests the initialization, assuming
# that configobj can correctly parse the config file.
f.write("cert = cert.pem\nprivkey = privkey.pem\n")
f.write("chain = chain.pem\nfullchain = fullchain.pem\n")
with open(os.path.join(self.defaults["renewal_configs_dir"],
"example.com.conf"), "w") as f:
f.write("cert = cert.pem\nprivkey = privkey.pem\n")
f.write("chain = chain.pem\nfullchain = fullchain.pem\n")
renewer.main(self.defaults)
self.assertEqual(mock_rc.call_count, 2)
self.assertEqual(mock_rc_instance.update_all_links_to.call_count, 2)
self.assertEqual(mock_notify.notify.call_count, 4)
self.assertEqual(mock_renew.call_count, 2)
# If we have instances that don't need any work done, no work should
# be done (call counts associated with processing deployments or
# renewals should not increase).
mock_happy_instance = mock.MagicMock()
mock_happy_instance.should_autodeploy.return_value = False
mock_happy_instance.should_autorenew.return_value = False
mock_happy_instance.latest_common_version.return_value = 10
mock_rc.return_value = mock_happy_instance
renewer.main(self.defaults)
self.assertEqual(mock_rc.call_count, 4)
self.assertEqual(mock_happy_instance.update_all_links_to.call_count, 0)
self.assertEqual(mock_notify.notify.call_count, 4)
self.assertEqual(mock_renew.call_count, 2)
def test_bad_config_file(self):
from letsencrypt import renewer
with open(os.path.join(self.defaults["renewal_configs_dir"],
"bad.conf"), "w") as f:
f.write("incomplete = configfile\n")
renewer.main(self.defaults)
# The ValueError is caught inside and nothing happens.
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -35,6 +35,7 @@ install_requires = [
'jsonschema',
'mock',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'parsedatetime',
'psutil>=2.1.0', # net_connections introduced in 2.1.0
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
'pycrypto',
@@ -115,6 +116,7 @@ setup(
entry_points={
'console_scripts': [
'letsencrypt = letsencrypt.cli:main',
'letsencrypt-renewer = letsencrypt.renewer:main',
'jws = letsencrypt.acme.jose.jws:CLI.run',
],
'letsencrypt.plugins': [