1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-23 07:20:55 +03:00
Files
certbot/letsencrypt/client/client.py

532 lines
18 KiB
Python

"""ACME protocol client class and helper functions."""
import csv
import logging
import os
import shutil
import sys
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt import acme
from letsencrypt.client import auth_handler
from letsencrypt.client import client_authenticator
from letsencrypt.client import crypto_util
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client import network
from letsencrypt.client import reverter
from letsencrypt.client import revoker
from letsencrypt.client.apache import configurator
class Client(object):
"""ACME protocol client.
:ivar network: Network object for sending and receiving messages
:type network: :class:`letsencrypt.client.network.Network`
:ivar authkey: Authorization Key
:type authkey: :class:`letsencrypt.client.le_util.Key`
:ivar auth_handler: Object that supports the IAuthenticator interface.
auth_handler contains both a dv_authenticator and a client_authenticator
:type auth_handler: :class:`letsencrypt.client.auth_handler.AuthHandler`
:ivar installer: Object supporting the IInstaller interface.
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
:ivar config: Configuration.
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
"""
def __init__(self, config, authkey, dv_auth, installer):
"""Initialize a client.
:param dv_auth: IAuthenticator that can solve the
:const:`letsencrypt.client.constants.DV_CHALLENGES`
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
self.network = network.Network(config.server)
self.authkey = authkey
self.installer = installer
self.config = config
if dv_auth is not None:
client_auth = client_authenticator.ClientAuthenticator(config)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, client_auth, self.network)
else:
self.auth_handler = None
def obtain_certificate(self, domains, csr=None):
"""Obtains a certificate from the ACME server.
:param str domains: list of domains to get a certificate
:param csr: CSR must contain requested domains, the key used to generate
this CSR can be different than self.authkey
:type csr: :class:`CSR`
:returns: cert_file, chain_file (paths to respective files)
:rtype: `tuple` of `str`
"""
if self.auth_handler is None:
logging.warning("Unable to obtain a certificate, because client "
"does not have a valid auth handler.")
# Request Challenges
for name in domains:
self.auth_handler.add_chall_msg(
name, self.acme_challenge(name), self.authkey)
# Perform Challenges/Get Authorizations
self.auth_handler.get_authorizations()
# Create CSR from names
if csr is None:
csr = init_csr(self.authkey, domains, self.config.cert_dir)
# Retrieve certificate
certificate_msg = self.acme_certificate(csr.data)
# Save Certificate
cert_file, chain_file = self.save_certificate(
certificate_msg, self.config.cert_path, self.config.chain_path)
self.store_cert_key(cert_file, False)
return cert_file, chain_file
def acme_challenge(self, domain):
"""Handle ACME "challenge" phase.
:returns: ACME "challenge" message.
:rtype: :class:`letsencrypt.acme.messages.Challenge`
"""
return self.network.send_and_receive_expected(
acme.messages.ChallengeRequest(identifier=domain),
acme.messages.Challenge)
def acme_certificate(self, csr_der):
"""Handle ACME "certificate" phase.
:param str csr_der: CSR in DER format.
:returns: ACME "certificate" message.
:rtype: :class:`letsencrypt.acme.message.Certificate`
"""
logging.info("Preparing and sending CSR...")
return self.network.send_and_receive_expected(
acme.messages.CertificateRequest.create(
csr=M2Crypto.X509.load_request_der_string(csr_der),
key=Crypto.PublicKey.RSA.importKey(self.authkey.pem)),
acme.messages.Certificate)
def save_certificate(self, certificate_msg, cert_path, chain_path):
# pylint: disable=no-self-use
"""Saves the certificate received from the ACME server.
:param certificate_msg: ACME "certificate" message from server.
:type certificate_msg: :class:`letsencrypt.acme.messages.Certificate`
:param str cert_path: Path to attempt to save the cert file
:param str chain_path: Path to attempt to save the chain file
:returns: cert_file, chain_file (absolute paths to the actual files)
:rtype: `tuple` of `str`
:raises IOError: If unable to find room to write the cert files
"""
cert_chain_abspath = None
cert_fd, cert_file = le_util.unique_file(cert_path, 0o644)
cert_fd.write(certificate_msg.certificate.as_pem())
cert_fd.close()
logging.info(
"Server issued certificate; certificate written to %s", cert_file)
if certificate_msg.chain:
chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644)
for cert in certificate_msg.chain:
chain_fd.write(cert.to_pem())
chain_fd.close()
logging.info("Cert chain written to %s", chain_fn)
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(chain_fn)
return os.path.abspath(cert_file), cert_chain_abspath
def deploy_certificate(self, domains, privkey, cert_file, chain_file=None):
"""Install certificate
:param list domains: list of domains to install the certificate
:param privkey: private key for certificate
:type privkey: :class:`letsencrypt.client.le_util.Key`
:param str cert_file: certificate file path
:param str chain_file: chain file path
"""
if self.installer is None:
logging.warning("No installer specified, client is unable to deploy"
"the certificate")
raise errors.LetsEncryptClientError("No installer available")
chain = None if chain_file is None else os.path.abspath(chain_file)
for dom in domains:
self.installer.deploy_cert(dom,
os.path.abspath(cert_file),
os.path.abspath(privkey.file),
chain)
self.installer.save("Deployed Let's Encrypt Certificate")
# sites may have been enabled / final cleanup
self.installer.restart()
zope.component.getUtility(
interfaces.IDisplay).success_installation(domains)
def enhance_config(self, domains, redirect=None):
"""Enhance the configuration.
.. todo:: This needs to handle the specific enhancements offered by the
installer. We will also have to find a method to pass in the chosen
values efficiently.
:param list domains: list of domains to configure
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
:raises :class:`letsencrypt.client.errors.LetsEncryptClientError`: if
no installer is specified in the client.
"""
if self.installer is None:
logging.warning("No installer is specified, there isn't any "
"configuration to enhance.")
raise errors.LetsEncryptClientError("No installer available")
if redirect is None:
redirect = zope.component.getUtility(
interfaces.IDisplay).redirect_by_default()
if redirect:
self.redirect_to_ssl(domains)
def store_cert_key(self, cert_file, encrypt=False):
"""Store certificate key. (Used to allow quick revocation)
:param str cert_file: Path to a certificate file.
:param bool encrypt: Should the certificate key be encrypted?
:returns: True if key file was stored successfully, False otherwise.
:rtype: bool
"""
list_file = os.path.join(self.config.cert_key_backup, "LIST")
le_util.make_or_verify_dir(self.config.cert_key_backup, 0o700)
idx = 0
if encrypt:
logging.error(
"Unfortunately securely storing the certificates/"
"keys is not yet available. Stay tuned for the "
"next update!")
return False
if os.path.isfile(list_file):
with open(list_file, 'r+b') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
csvwriter.writerow([str(idx), cert_file, self.authkey.file])
else:
with open(list_file, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["0", cert_file, self.authkey.file])
shutil.copy2(self.authkey.file,
os.path.join(
self.config.cert_key_backup,
os.path.basename(self.authkey.file) + "_" + str(idx)))
shutil.copy2(cert_file,
os.path.join(
self.config.cert_key_backup,
os.path.basename(cert_file) + "_" + str(idx)))
return True
def redirect_to_ssl(self, domains):
"""Redirect all traffic from HTTP to HTTPS
:param vhost: list of ssl_vhosts
:type vhost: :class:`letsencrypt.client.interfaces.IInstaller`
"""
for dom in domains:
try:
self.installer.enhance(dom, "redirect")
except errors.LetsEncryptConfiguratorError:
logging.warn('Unable to perform redirect for %s', dom)
self.installer.save("Add Redirects")
self.installer.restart()
def validate_key_csr(privkey, csr=None):
"""Validate Key and CSR files.
Verifies that the client key and csr arguments are valid and correspond to
one another. This does not currently check the names in the CSR due to
the inability to read SANs from CSRs in python crypto libraries.
If csr is left as None, only the key will be validated.
:param privkey: Key associated with CSR
:type privkey: :class:`letsencrypt.client.le_util.Key`
:param csr: CSR
:type csr: :class:`letsencrypt.client.le_util.CSR`
:raises LetsEncryptClientError: if validation fails
"""
# TODO: Handle all of these problems appropriately
# The client can eventually do things like prompt the user
# and allow the user to take more appropriate actions
# Key must be readable and valid.
if privkey.pem and not crypto_util.valid_privkey(privkey.pem):
raise errors.LetsEncryptClientError(
"The provided key is not a valid key")
if csr:
if csr.form == "der":
csr_obj = M2Crypto.X509.load_request_der_string(csr.data)
csr = le_util.CSR(csr.file, csr_obj.as_pem(), "der")
# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):
raise errors.LetsEncryptClientError(
"The provided CSR is not a valid CSR")
# If both CSR and key are provided, the key must be the same key used
# in the CSR.
if csr.data and privkey.pem:
if not crypto_util.csr_matches_pubkey(
csr.data, privkey.pem):
raise errors.LetsEncryptClientError(
"The key and CSR do not match")
def init_key(key_size, key_dir):
"""Initializes privkey.
Inits key and CSR using provided files or generating new files
if necessary. Both will be saved in PEM format on the
filesystem. The CSR is placed into DER format to allow
the namedtuple to easily work with the protocol.
:param str key_dir: Key save directory.
"""
try:
key_pem = crypto_util.make_key(key_size)
except ValueError as err:
logging.fatal(str(err))
sys.exit(1)
# Save file
le_util.make_or_verify_dir(key_dir, 0o700)
key_f, key_filename = le_util.unique_file(
os.path.join(key_dir, "key-letsencrypt.pem"), 0o600)
key_f.write(key_pem)
key_f.close()
logging.info("Generating key (%d bits): %s", key_size, key_filename)
return le_util.Key(key_filename, key_pem)
def init_csr(privkey, names, cert_dir):
"""Initialize a CSR with the given private key.
:param str cert_dir: Certificate save directory.
"""
csr_pem, csr_der = crypto_util.make_csr(privkey.pem, names)
# Save CSR
le_util.make_or_verify_dir(cert_dir, 0o755)
csr_f, csr_filename = le_util.unique_file(
os.path.join(cert_dir, "csr-letsencrypt.pem"), 0o644)
csr_f.write(csr_pem)
csr_f.close()
logging.info("Creating CSR: %s", csr_filename)
return le_util.CSR(csr_filename, csr_der, "der")
# This should be controlled by commandline parameters
def determine_authenticator(config):
"""Returns a valid IAuthenticator.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
try:
return configurator.ApacheConfigurator(config)
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to determine a way to authenticate the server")
def determine_installer(config):
"""Returns a valid installer if one exists.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
try:
return configurator.ApacheConfigurator(config)
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to find a way to install the certificate.")
def rollback(checkpoints, config):
"""Revert configuration the specified number of checkpoints.
.. note:: If another installer uses something other than the reverter class
to do their configuration changes, the correct reverter will have to be
determined.
.. note:: This function restarts the server even if there weren't any
rollbacks. The user may be confused or made an error and simply needs
to restart the server.
.. todo:: This function will have to change depending on the functionality
of future installers. Perhaps the interface should define errors that
are thrown for the various functions.
:param int checkpoints: Number of checkpoints to revert.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
# Misconfigurations are only a slight problems... allow the user to rollback
try:
installer = determine_installer(config)
except errors.LetsEncryptMisconfigurationError:
_misconfigured_rollback(checkpoints, config)
return
# No Errors occurred during init... proceed normally
# If installer is None... couldn't find an installer... there shouldn't be
# anything to rollback
if installer is not None:
installer.rollback_checkpoints(checkpoints)
installer.restart()
def _misconfigured_rollback(checkpoints, config):
"""Handles the case where the Installer is misconfigured.
:param int checkpoints: Number of checkpoints to revert.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
yes = zope.component.getUtility(interfaces.IDisplay).generic_yesno(
"Oh, no! The web server is currently misconfigured.{0}{0}"
"Would you still like to rollback the "
"configuration?".format(os.linesep))
if not yes:
logging.info("The error message is above.")
logging.info("Configuration was not rolled back.")
return
logging.info("Rolling back using the Reverter module")
# recovery routine has probably already been run by installer
# in the__init__ attempt, run it again for safety... it shouldn't hurt
# Also... not sure how future installers will handle recovery.
rev = reverter.Reverter(config)
rev.recovery_routine()
rev.rollback_checkpoints(checkpoints)
# We should try to restart the server
try:
installer = determine_installer(config)
installer.restart()
logging.info("Hooray! Rollback solved the misconfiguration!")
logging.info("Your web server is back up and running.")
except errors.LetsEncryptMisconfigurationError:
logging.warning(
"Rollback was unable to solve the misconfiguration issues")
def revoke(config):
"""Revoke certificates.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
# Misconfigurations don't really matter. Determine installer better choose
# correctly though.
try:
installer = determine_installer(config)
except errors.LetsEncryptMisconfigurationError:
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"The web server is currently misconfigured. Some "
"abilities like seeing which certificates are currently "
"installed may not be available.")
installer = None
# This is a temporary fix to avoid errors. The Revoker is not fully
# developed.
if installer is None:
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"The Let's Encrypt Revoker module does not currently support "
"revocation without a valid installer. This feature should come "
"soon.")
return
revoc = revoker.Revoker(installer, config)
revoc.list_certs_keys()
def view_config_changes(config):
"""View checkpoints and associated configuration changes.
.. note:: This assumes that the installation is using a Reverter object.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
"""
rev = reverter.Reverter(config)
rev.recovery_routine()
rev.view_config_changes()