1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-23 07:20:55 +03:00
Files
certbot/letsencrypt/client/client.py
2015-01-13 01:24:55 -08:00

436 lines
14 KiB
Python

"""ACME protocol client class and helper functions."""
import collections
import csv
import logging
import os
import shutil
import socket
import string
import sys
import M2Crypto
import zope.component
from letsencrypt.client import acme
from letsencrypt.client import auth_handler
from letsencrypt.client import client_authenticator
from letsencrypt.client import CONFIG
from letsencrypt.client import crypto_util
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client import network
# it's weird to point to chocolate servers via raw IPv6 addresses, and
# such addresses can be %SCARY in some contexts, so out of paranoia
# let's disable them by default
ALLOW_RAW_IPV6_SERVER = False
class Client(object):
"""ACME protocol client.
:ivar network: Network object for sending and receiving messages
:type network: :class:`letsencrypt.client.network.Network`
:ivar list names: Domain names (:class:`list` of :class:`str`).
:ivar authkey: Authorization Key
:type authkey: :class:`letsencrypt.client.client.Client.Key`
:ivar auth_handler: Object that supports the IAuthenticator interface.
auth_handler contains both a dv_authenticator and a client_authenticator
:type auth_handler: :class:`letsencrypt.client.auth_handler.AuthHandler`
:ivar installer: Object supporting the IInstaller interface.
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
zope.interface.implements(interfaces.IAuthenticator)
Key = collections.namedtuple("Key", "file pem")
CSR = collections.namedtuple("CSR", "file data form")
def __init__(self, server, names, authkey, dv_auth, installer):
"""Initialize a client.
:param str server: CA server to contact
:param dv_auth: IAuthenticator Interface that can solve the
CONFIG.DV_CHALLENGES
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
self.network = network.Network(server)
self.names = names
self.authkey = authkey
sanity_check_names([server] + names)
self.installer = installer
client_auth = client_authenticator.ClientAuthenticator(server)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, client_auth, self.network)
def obtain_certificate(self, csr,
cert_path=CONFIG.CERT_PATH,
chain_path=CONFIG.CHAIN_PATH):
"""Obtains a certificate from the ACME server.
:param csr: A valid CSR in DER format for the certificate the client
intends to receive.
:type csr: :class:`CSR`
:param str cert_path: Full desired path to end certificate.
:param str chain_path: Full desired path to end chain file.
:returns: cert_file, chain_file (paths to respective files)
:rtype: `tuple` of `str`
"""
# Request Challenges
for name in self.names:
self.auth_handler.add_chall_msg(
name, self.acme_challenge(name), self.authkey)
# Perform Challenges/Get Authorizations
self.auth_handler.get_authorizations()
# Retrieve certificate
certificate_dict = self.acme_certificate(csr.data)
# Save Certificate
cert_file, chain_file = self.save_certificate(
certificate_dict, cert_path, chain_path)
self.store_cert_key(cert_file, False)
return cert_file, chain_file
def acme_challenge(self, domain):
"""Handle ACME "challenge" phase.
:returns: ACME "challenge" message.
:rtype: dict
"""
return self.network.send_and_receive_expected(
acme.challenge_request(domain), "challenge")
def acme_certificate(self, csr_der):
"""Handle ACME "certificate" phase.
:param str csr_der: CSR in DER format.
:returns: ACME "certificate" message.
:rtype: dict
"""
logging.info("Preparing and sending CSR...")
return self.network.send_and_receive_expected(
acme.certificate_request(csr_der, self.authkey.pem), "certificate")
# pylint: disable=no-self-use
def save_certificate(self, certificate_dict, cert_path, chain_path):
"""Saves the certificate received from the ACME server.
:param dict certificate_dict: certificate message from server
:param str cert_path: Path to attempt to save the cert file
:param str chain_path: Path to attempt to save the chain file
:returns: cert_file, chain_file (absolute paths to the actual files)
:rtype: `tuple` of `str`
:raises IOError: If unable to find room to write the cert files
"""
cert_chain_abspath = None
cert_fd, cert_file = le_util.unique_file(cert_path, 0o644)
cert_fd.write(
crypto_util.b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.close()
logging.info(
"Server issued certificate; certificate written to %s", cert_file)
if certificate_dict.get("chain", None):
chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644)
for cert in certificate_dict.get("chain", []):
chain_fd.write(crypto_util.b64_cert_to_pem(cert))
chain_fd.close()
logging.info("Cert chain written to %s", chain_fn)
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(chain_fn)
return os.path.abspath(cert_file), cert_chain_abspath
def deploy_certificate(self, privkey, cert_file, chain_file):
"""Install certificate
:returns: Path to a certificate file.
:rtype: str
"""
# Find set of virtual hosts to deploy certificates to
vhost = self.get_virtual_hosts(self.names)
chain = None if chain_file is None else os.path.abspath(chain_file)
for host in vhost:
self.installer.deploy_cert(host,
os.path.abspath(cert_file),
os.path.abspath(privkey.file),
chain)
# Enable any vhost that was issued to, but not enabled
if not host.enabled:
logging.info("Enabling Site %s", host.filep)
self.installer.enable_site(host)
self.installer.save("Deployed Let's Encrypt Certificate")
# sites may have been enabled / final cleanup
self.installer.restart()
zope.component.getUtility(
interfaces.IDisplay).success_installation(self.names)
return vhost
def optimize_config(self, vhost, redirect=None):
"""Optimize the configuration.
.. todo:: Handle multiple vhosts
:param vhost: vhost to optimize
:type vhost: :class:`letsencrypt.client.apache.obj.VirtualHost`
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
"""
if redirect is None:
redirect = zope.component.getUtility(
interfaces.IDisplay).redirect_by_default()
if redirect:
self.redirect_to_ssl(vhost)
self.installer.restart()
# if self.ocsp_stapling is None:
# q = ("Would you like to protect the privacy of your users "
# "by enabling OCSP stapling? If so, your users will not have "
# "to query the Let's Encrypt CA separately about the current "
# "revocation status of your certificate.")
# self.ocsp_stapling = self.ocsp_stapling = display.ocsp_stapling(q)
# if self.ocsp_stapling:
# # TODO enable OCSP Stapling
# continue
def store_cert_key(self, cert_file, encrypt=False):
"""Store certificate key.
:param str cert_file: Path to a certificate file.
:param bool encrypt: Should the certificate key be encrypted?
:returns: True if key file was stored successfully, False otherwise.
:rtype: bool
"""
list_file = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")
le_util.make_or_verify_dir(CONFIG.CERT_KEY_BACKUP, 0o700)
idx = 0
if encrypt:
logging.error(
"Unfortunately securely storing the certificates/"
"keys is not yet available. Stay tuned for the "
"next update!")
return False
if os.path.isfile(list_file):
with open(list_file, 'r+b') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
csvwriter.writerow([str(idx), cert_file, self.authkey.file])
else:
with open(list_file, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["0", cert_file, self.authkey.file])
shutil.copy2(self.authkey.file,
os.path.join(
CONFIG.CERT_KEY_BACKUP,
os.path.basename(self.authkey.file) + "_" + str(idx)))
shutil.copy2(cert_file,
os.path.join(
CONFIG.CERT_KEY_BACKUP,
os.path.basename(cert_file) + "_" + str(idx)))
return True
def redirect_to_ssl(self, vhost):
"""Redirect all traffic from HTTP to HTTPS
:param vhost: list of ssl_vhosts
:type vhost: :class:`letsencrypt.client.interfaces.IInstaller`
"""
for ssl_vh in vhost:
success, redirect_vhost = self.installer.enable_redirect(ssl_vh)
logging.info(
"\nRedirect vhost: %s - %s ", redirect_vhost.filep, success)
# If successful, make sure redirect site is enabled
if success:
self.installer.enable_site(redirect_vhost)
def get_virtual_hosts(self, domains):
"""Retrieve the appropriate virtual host for the domain
:param list domains: Domains to find ssl vhosts for
:returns: associated vhosts
:rtype: :class:`letsencrypt.client.apache.obj.VirtualHost`
"""
vhost = set()
for name in domains:
host = self.installer.choose_virtual_host(name)
if host is not None:
vhost.add(host)
return vhost
def validate_key_csr(privkey, csr):
"""Validate CSR and key files.
Verifies that the client key and csr arguments are valid and correspond to
one another. This does not currently check the names in the CSR.
:param privkey: Key associated with CSR
:type privkey: :class:`letsencrypt.client.client.Client.Key`
:param csr: CSR
:type csr: :class:`letsencrypt.client.client.Client.CSR`
:raises LetsEncryptClientError: if validation fails
"""
# TODO: Handle all of these problems appropriately
# The client can eventually do things like prompt the user
# and allow the user to take more appropriate actions
if csr.form == "der":
csr_obj = M2Crypto.X509.load_request_der_string(csr.data)
csr = Client.CSR(csr.file, csr_obj.as_pem(), "der")
# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):
raise errors.LetsEncryptClientError(
"The provided CSR is not a valid CSR")
# If key is provided, it must be readable and valid.
if privkey.pem and not crypto_util.valid_privkey(privkey.pem):
raise errors.LetsEncryptClientError(
"The provided key is not a valid key")
# If CSR and key are provided, the key must be the same key used
# in the CSR.
if csr.data and privkey.pem:
if not crypto_util.csr_matches_pubkey(
csr.data, privkey.pem):
raise errors.LetsEncryptClientError(
"The key and CSR do not match")
def init_key():
"""Initializes privkey.
Inits key and CSR using provided files or generating new files
if necessary. Both will be saved in PEM format on the
filesystem. The CSR is placed into DER format to allow
the namedtuple to easily work with the protocol.
"""
key_pem = crypto_util.make_key(CONFIG.RSA_KEY_SIZE)
# Save file
le_util.make_or_verify_dir(CONFIG.KEY_DIR, 0o700)
key_f, key_filename = le_util.unique_file(
os.path.join(CONFIG.KEY_DIR, "key-letsencrypt.pem"), 0o600)
key_f.write(key_pem)
key_f.close()
logging.info("Generating key: %s", key_filename)
return Client.Key(key_filename, key_pem)
def init_csr(privkey, names):
"""Initialize a CSR with the given private key."""
csr_pem, csr_der = crypto_util.make_csr(privkey.pem, names)
# Save CSR
le_util.make_or_verify_dir(CONFIG.CERT_DIR, 0o755)
csr_f, csr_filename = le_util.unique_file(
os.path.join(CONFIG.CERT_DIR, "csr-letsencrypt.pem"), 0o644)
csr_f.write(csr_pem)
csr_f.close()
logging.info("Creating CSR: %s", csr_filename)
return Client.CSR(csr_filename, csr_der, "der")
def csr_pem_to_der(csr):
"""Convert pem CSR to der."""
csr_obj = M2Crypto.X509.load_request_string(csr.data)
return Client.CSR(csr.file, csr_obj.as_der(), "der")
def sanity_check_names(names):
"""Make sure host names are valid.
:param list names: List of host names
"""
for name in names:
if not is_hostname_sane(name):
logging.fatal("%r is an impossible hostname", name)
sys.exit(81)
def is_hostname_sane(hostname):
"""Make sure the given host name is sane.
Do enough to avoid shellcode from the environment. There's
no need to do more.
:param str hostname: Host name to validate
:returns: True if hostname is valid, otherwise false.
:rtype: bool
"""
# hostnames & IPv4
allowed = string.ascii_letters + string.digits + "-."
if all([c in allowed for c in hostname]):
return True
if not ALLOW_RAW_IPV6_SERVER:
return False
# ipv6 is messy and complicated, can contain %zoneindex etc.
try:
# is this a valid IPv6 address?
socket.getaddrinfo(hostname, 443, socket.AF_INET6)
return True
except socket.error:
return False