1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

Merge branch 'master' into fix_771

This commit is contained in:
James Kasten
2015-09-15 22:49:16 -07:00
10 changed files with 115 additions and 32 deletions

View File

@@ -429,20 +429,34 @@ class Client(object): # pylint: disable=too-many-instance-attributes
# respond with status code 403 (Forbidden)
return self.check_cert(certr)
def fetch_chain(self, certr):
def fetch_chain(self, certr, max_length=10):
"""Fetch chain for certificate.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:param .CertificateResource certr: Certificate Resource
:param int max_length: Maximum allowed length of the chain.
Note that each element in the certificate requires new
``HTTP GET`` request, and the length of the chain is
controlled by the ACME CA.
:returns: Certificate chain, or `None` if no "up" Link was provided.
:rtype: `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
:raises errors.Error: if recursion exceeds `max_length`
:returns: Certificate chain for the Certificate Resource. It is
a list ordered so that the first element is a signer of the
certificate from Certificate Resource. Will be empty if
``cert_chain_uri`` is ``None``.
:rtype: `list` of `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
"""
if certr.cert_chain_uri is not None:
return self._get_cert(certr.cert_chain_uri)[1]
else:
return None
chain = []
uri = certr.cert_chain_uri
while uri is not None and len(chain) < max_length:
response, cert = self._get_cert(uri)
uri = response.links.get('up', {}).get('url')
chain.append(cert)
if uri is not None:
raise errors.Error(
"Recursion limit reached. Didn't get {0}".format(uri))
return chain
def revoke(self, cert):
"""Revoke certificate.

View File

@@ -348,16 +348,34 @@ class ClientTest(unittest.TestCase):
self.assertEqual(
self.client.check_cert(self.certr), self.client.refresh(self.certr))
def test_fetch_chain(self):
def test_fetch_chain_no_up_link(self):
self.assertEqual([], self.client.fetch_chain(self.certr.update(
cert_chain_uri=None)))
def test_fetch_chain_single(self):
# pylint: disable=protected-access
self.client._get_cert = mock.MagicMock()
self.client._get_cert.return_value = ("response", "certificate")
self.assertEqual(self.client._get_cert(self.certr.cert_chain_uri)[1],
self.client._get_cert.return_value = (
mock.MagicMock(links={}), "certificate")
self.assertEqual([self.client._get_cert(self.certr.cert_chain_uri)[1]],
self.client.fetch_chain(self.certr))
def test_fetch_chain_no_up_link(self):
self.assertTrue(self.client.fetch_chain(self.certr.update(
cert_chain_uri=None)) is None)
def test_fetch_chain_max(self):
# pylint: disable=protected-access
up_response = mock.MagicMock(links={'up': {'url': 'http://cert'}})
noup_response = mock.MagicMock(links={})
self.client._get_cert = mock.MagicMock()
self.client._get_cert.side_effect = [
(up_response, "cert")] * 9 + [(noup_response, "last_cert")]
chain = self.client.fetch_chain(self.certr, max_length=10)
self.assertEqual(chain, ["cert"] * 9 + ["last_cert"])
def test_fetch_chain_too_many(self): # recursive
# pylint: disable=protected-access
response = mock.MagicMock(links={'up': {'url': 'http://cert'}})
self.client._get_cert = mock.MagicMock()
self.client._get_cert.return_value = (response, "certificate")
self.assertRaises(errors.Error, self.client.fetch_chain, self.certr)
def test_revoke(self):
self.client.revoke(self.certr.body)

View File

@@ -270,8 +270,7 @@ def auth_from_domains(le_client, config, domains, plugins):
lineage.save_successor(
lineage.latest_common_version(), OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body),
new_key.pem, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_chain))
new_key.pem, crypto_util.dump_pyopenssl_chain(new_chain))
lineage.update_all_links_to(lineage.latest_common_version())
# TODO: Check return value of save_successor

View File

@@ -259,14 +259,11 @@ class Client(object):
"Non-standard path(s), might not work with crontab installed "
"by your operating system package manager")
# XXX: just to stop RenewableCert from complaining; this is
# probably not a good solution
chain_pem = "" if chain is None else OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, chain)
lineage = storage.RenewableCert.new_lineage(
domains[0], OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, certr.body),
key.pem, chain_pem, params, config, cli_config)
key.pem, crypto_util.dump_pyopenssl_chain(chain),
params, config, cli_config)
self._report_renewal_status(lineage)
return lineage
@@ -304,7 +301,7 @@ class Client(object):
:param certr: ACME "certificate" resource.
:type certr: :class:`acme.messages.Certificate`
:param chain_cert:
:param list chain_cert:
:param str cert_path: Candidate path to a certificate.
:param str chain_path: Candidate path to a certificate chain.
@@ -331,12 +328,11 @@ class Client(object):
logger.info("Server issued certificate; certificate written to %s",
act_cert_path)
if chain_cert is not None:
if chain_cert:
chain_file, act_chain_path = le_util.unique_file(
chain_path, 0o644)
# TODO: Except
chain_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, chain_cert)
chain_pem = crypto_util.dump_pyopenssl_chain(chain_cert)
try:
chain_file.write(chain_pem)
finally:

View File

@@ -11,6 +11,7 @@ import os
import OpenSSL
from acme import crypto_util as acme_crypto_util
from acme import jose
from letsencrypt import errors
from letsencrypt import le_util
@@ -270,3 +271,24 @@ def asn1_generalizedtime_to_dt(timestamp):
def pyopenssl_x509_name_as_text(x509name):
"""Convert `OpenSSL.crypto.X509Name` to text."""
return "/".join("{0}={1}" for key, value in x509name.get_components())
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
`acme.jose.ComparableX509`).
"""
# XXX: returns empty string when no chain is available, which
# shuts up RenewableCert, but might not be the best solution...
def _dump_cert(cert):
if isinstance(cert, jose.ComparableX509):
# pylint: disable=protected-access
cert = cert._wrapped
return OpenSSL.crypto.dump_certificate(filetype, cert)
# assumes that OpenSSL.crypto.dump_certificate includes ending
# newline character
return "".join(_dump_cert(cert) for cert in chain)

View File

@@ -85,7 +85,7 @@ def renew(cert, old_version):
with open(cert.version("cert", old_version)) as f:
sans = crypto_util.get_sans_from_cert(f.read())
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(sans)
if new_chain is not None:
if new_chain:
# XXX: Assumes that there was no key change. We need logic
# for figuring out whether there was or not. Probably
# best is to have obtain_certificate return None for
@@ -94,8 +94,7 @@ def renew(cert, old_version):
return cert.save_successor(
old_version, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body),
new_key.pem, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_chain))
new_key.pem, crypto_util.dump_pyopenssl_chain(new_chain))
# TODO: Notify results
else:
# TODO: Notify negative results

View File

@@ -604,6 +604,8 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
with open(target["chain"], "w") as f:
f.write(chain)
with open(target["fullchain"], "w") as f:
# assumes that OpenSSL.crypto.dump_certificate includes
# ending newline character
f.write(cert + chain)
# Document what we've done in a new renewal config file

View File

@@ -1,4 +1,7 @@
"""Tests for letsencrypt.client."""
import os
import shutil
import tempfile
import unittest
import configobj
@@ -145,6 +148,36 @@ class ClientTest(unittest.TestCase):
self.assertTrue("renewal but not automatic deployment" in msg)
self.assertTrue(cert.cli_config.renewal_configs_dir in msg)
def test_save_certificate(self):
certs = ["matching_cert.pem", "cert.pem", "cert-san.pem"]
tmp_path = tempfile.mkdtemp()
os.chmod(tmp_path, 0o755) # TODO: really??
certr = mock.MagicMock(body=test_util.load_cert(certs[0]))
cert1 = test_util.load_cert(certs[1])
cert2 = test_util.load_cert(certs[2])
candidate_cert_path = os.path.join(tmp_path, "certs", "cert.pem")
candidate_chain_path = os.path.join(tmp_path, "chains", "chain.pem")
cert_path, chain_path = self.client.save_certificate(
certr, [cert1, cert2], candidate_cert_path, candidate_chain_path)
self.assertEqual(os.path.dirname(cert_path),
os.path.dirname(candidate_cert_path))
self.assertEqual(os.path.dirname(chain_path),
os.path.dirname(candidate_chain_path))
with open(cert_path, "r") as cert_file:
cert_contents = cert_file.read()
self.assertEqual(cert_contents, test_util.load_vector(certs[0]))
with open(chain_path, "r") as chain_file:
chain_contents = chain_file.read()
self.assertEqual(chain_contents, test_util.load_vector(certs[1]) +
test_util.load_vector(certs[2]))
shutil.rmtree(tmp_path)
class RollbackTest(unittest.TestCase):
"""Tests for letsencrypt.client.rollback."""

View File

@@ -633,7 +633,7 @@ class RenewableCertTests(BaseRenewableCertTest):
mock_client = mock.MagicMock()
# pylint: disable=star-args
mock_client.obtain_certificate.return_value = (
mock.MagicMock(body=CERT), CERT, mock.Mock(pem="key"),
mock.MagicMock(body=CERT), [CERT], mock.Mock(pem="key"),
mock.sentinel.csr)
mock_c.return_value = mock_client
self.assertEqual(2, renewer.renew(self.test_rc, 1))
@@ -641,7 +641,7 @@ class RenewableCertTests(BaseRenewableCertTest):
# have been made to the mock functions here.
mock_acc_storage().load.assert_called_once_with(account_id="abcde")
mock_client.obtain_certificate.return_value = (
mock.sentinel.certr, None, mock.sentinel.key, mock.sentinel.csr)
mock.sentinel.certr, [], mock.sentinel.key, mock.sentinel.csr)
# This should fail because the renewal itself appears to fail
self.assertFalse(renewer.renew(self.test_rc, 1))

View File

@@ -6,7 +6,7 @@
# acme and letsencrypt are not yet on pypi, so when Tox invokes
# "install *.zip", it will not find deps
skipsdist = true
envlist = py26,py27,py33,py34,cover,lint
envlist = py27,cover,lint
[testenv]
commands =