mirror of
https://github.com/certbot/certbot.git
synced 2025-09-10 22:11:53 +03:00
* Drop support for EOL Python 2.6 * Use more helpful assertIn/NotIn instead of assertTrue/False * Drop support for EOL Python 3.3 * Remove redundant Python 3.3 code * Restore code for RHEL 6 and virtualenv for Py2.7 * Revert pipstrap.py to upstream * Merge py26_packages and non_py26_packages into all_packages * Revert changes to *-auto in root * Update by calling letsencrypt-auto-source/build.py * Revert permissions for pipstrap.py
384 lines
15 KiB
Python
384 lines
15 KiB
Python
"""Tests for letsencrypt-auto"""
|
|
|
|
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
|
|
from contextlib import contextmanager
|
|
from functools import partial
|
|
from json import dumps
|
|
from os import chmod, environ, makedirs
|
|
from os.path import abspath, dirname, exists, join
|
|
import re
|
|
from shutil import copy, rmtree
|
|
import socket
|
|
import ssl
|
|
from stat import S_IRUSR, S_IXUSR
|
|
from subprocess import CalledProcessError, Popen, PIPE
|
|
import sys
|
|
from tempfile import mkdtemp
|
|
from threading import Thread
|
|
from unittest import TestCase
|
|
|
|
from pytest import mark
|
|
|
|
|
|
@mark.skip
|
|
def tests_dir():
|
|
"""Return a path to the "tests" directory."""
|
|
return dirname(abspath(__file__))
|
|
|
|
|
|
sys.path.insert(0, dirname(tests_dir()))
|
|
from build import build as build_le_auto
|
|
|
|
|
|
BOOTSTRAP_FILENAME = 'certbot-auto-bootstrap-version.txt'
|
|
"""Name of the file where certbot-auto saves its bootstrap version."""
|
|
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
"""An HTTPS request handler which is quiet and serves a specific folder."""
|
|
|
|
def __init__(self, resources, *args, **kwargs):
|
|
"""
|
|
:arg resources: A dict of resource paths pointing to content bytes
|
|
|
|
"""
|
|
self.resources = resources
|
|
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
|
|
|
|
def log_message(self, format, *args):
|
|
"""Don't log each request to the terminal."""
|
|
|
|
def do_GET(self):
|
|
"""Serve a GET request."""
|
|
content = self.send_head()
|
|
if content is not None:
|
|
self.wfile.write(content)
|
|
|
|
def send_head(self):
|
|
"""Common code for GET and HEAD commands
|
|
|
|
This sends the response code and MIME headers and returns either a
|
|
bytestring of content or, if none is found, None.
|
|
|
|
"""
|
|
path = self.path[1:] # Strip leading slash.
|
|
content = self.resources.get(path)
|
|
if content is None:
|
|
self.send_error(404, 'Path "%s" not found in self.resources' % path)
|
|
else:
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.send_header('Content-Length', str(len(content)))
|
|
self.end_headers()
|
|
return content
|
|
|
|
|
|
def server_and_port(resources):
|
|
"""Return an unstarted HTTPS server and the port it will use."""
|
|
# Find a port, and bind to it. I can't get the OS to close the socket
|
|
# promptly after we shut down the server, so we typically need to try
|
|
# a couple ports after the first test case. Setting
|
|
# TCPServer.allow_reuse_address = True seems to have nothing to do
|
|
# with this behavior.
|
|
worked = False
|
|
for port in xrange(4443, 4543):
|
|
try:
|
|
server = HTTPServer(('localhost', port),
|
|
partial(RequestHandler, resources))
|
|
except socket.error:
|
|
pass
|
|
else:
|
|
worked = True
|
|
server.socket = ssl.wrap_socket(
|
|
server.socket,
|
|
certfile=join(tests_dir(), 'certs', 'localhost', 'server.pem'),
|
|
server_side=True)
|
|
break
|
|
if not worked:
|
|
raise RuntimeError("Couldn't find an unused socket for the testing HTTPS server.")
|
|
return server, port
|
|
|
|
|
|
@contextmanager
|
|
def serving(resources):
|
|
"""Spin up a local HTTPS server, and yield its base URL.
|
|
|
|
Use a self-signed cert generated as outlined by
|
|
https://coolaj86.com/articles/create-your-own-certificate-authority-for-
|
|
testing/.
|
|
|
|
"""
|
|
server, port = server_and_port(resources)
|
|
thread = Thread(target=server.serve_forever)
|
|
try:
|
|
thread.start()
|
|
yield 'https://localhost:{port}/'.format(port=port)
|
|
finally:
|
|
server.shutdown()
|
|
thread.join()
|
|
|
|
|
|
LE_AUTO_PATH = join(dirname(tests_dir()), 'letsencrypt-auto')
|
|
|
|
|
|
@contextmanager
|
|
def temp_paths():
|
|
"""Creates and deletes paths for letsencrypt-auto and its venv."""
|
|
dir = mkdtemp(prefix='le-test-')
|
|
try:
|
|
yield join(dir, 'letsencrypt-auto'), join(dir, 'venv')
|
|
finally:
|
|
rmtree(dir, ignore_errors=True)
|
|
|
|
|
|
def out_and_err(command, input=None, shell=False, env=None):
|
|
"""Run a shell command, and return stderr and stdout as string.
|
|
|
|
If the command returns nonzero, raise CalledProcessError.
|
|
|
|
:arg command: A list of commandline args
|
|
:arg input: Data to pipe to stdin. Omit for none.
|
|
|
|
Remaining args have the same meaning as for Popen.
|
|
|
|
"""
|
|
process = Popen(command,
|
|
stdout=PIPE,
|
|
stdin=PIPE,
|
|
stderr=PIPE,
|
|
shell=shell,
|
|
env=env)
|
|
out, err = process.communicate(input=input)
|
|
status = process.poll() # same as in check_output(), though wait() sounds better
|
|
if status:
|
|
error = CalledProcessError(status, command)
|
|
error.output = out
|
|
raise error
|
|
return out, err
|
|
|
|
|
|
def signed(content, private_key_name='signing.key'):
|
|
"""Return the signed SHA-256 hash of ``content``, using the given key file."""
|
|
command = ['openssl', 'dgst', '-sha256', '-sign',
|
|
join(tests_dir(), private_key_name)]
|
|
out, err = out_and_err(command, input=content)
|
|
return out
|
|
|
|
|
|
def install_le_auto(contents, install_path):
|
|
"""Install some given source code as the letsencrypt-auto script at the
|
|
root level of a virtualenv.
|
|
|
|
:arg contents: The contents of the built letsencrypt-auto script
|
|
:arg install_path: The path where to install the script
|
|
|
|
"""
|
|
with open(install_path, 'w') as le_auto:
|
|
le_auto.write(contents)
|
|
chmod(install_path, S_IRUSR | S_IXUSR)
|
|
|
|
|
|
def run_le_auto(le_auto_path, venv_dir, base_url, **kwargs):
|
|
"""Run the prebuilt version of letsencrypt-auto, returning stdout and
|
|
stderr strings.
|
|
|
|
If the command returns other than 0, raise CalledProcessError.
|
|
|
|
"""
|
|
env = environ.copy()
|
|
d = dict(VENV_PATH=venv_dir,
|
|
# URL to PyPI-style JSON that tell us the latest released version
|
|
# of LE:
|
|
LE_AUTO_JSON_URL=base_url + 'certbot/json',
|
|
# URL to dir containing letsencrypt-auto and letsencrypt-auto.sig:
|
|
LE_AUTO_DIR_TEMPLATE=base_url + '%s/',
|
|
# The public key corresponding to signing.key:
|
|
LE_AUTO_PUBLIC_KEY="""-----BEGIN PUBLIC KEY-----
|
|
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsMoSzLYQ7E1sdSOkwelg
|
|
tzKIh2qi3bpXuYtcfFC0XrvWig071NwIj+dZiT0OLZ2hPispEH0B7ISuuWg1ll7G
|
|
hFW0VdbxL6JdGzS2ShNWkX9hE9z+j8VqwDPOBn3ZHm03qwpYkBDwQib3KqOdYbTT
|
|
uUtJmmGcuk3a9Aq/sCT6DdfmTSdP5asdQYwIcaQreDrOosaS84DTWI3IU+UYJVgl
|
|
LsIVPBuy9IcgHidUQ96hJnoPsDCWsHwX62495QKEarauyKQrJzFes0EY95orDM47
|
|
Z5o/NDiQB11m91yNB0MmPYY9QSbnOA9j7IaaC97AwRLuwXY+/R2ablTcxurWou68
|
|
iQIDAQAB
|
|
-----END PUBLIC KEY-----""",
|
|
NO_CERT_VERIFY='1',
|
|
**kwargs)
|
|
env.update(d)
|
|
return out_and_err(
|
|
le_auto_path + ' --version',
|
|
shell=True,
|
|
env=env)
|
|
|
|
|
|
def set_le_script_version(venv_dir, version):
|
|
"""Tell the letsencrypt script to report a certain version.
|
|
|
|
We actually replace the script with a dummy version that knows only how to
|
|
print its version.
|
|
|
|
"""
|
|
letsencrypt_path = join(venv_dir, 'bin', 'letsencrypt')
|
|
with open(letsencrypt_path, 'w') as script:
|
|
script.write("#!/usr/bin/env python\n"
|
|
"from sys import stderr\n"
|
|
"stderr.write('letsencrypt %s\\n')" % version)
|
|
chmod(letsencrypt_path, S_IRUSR | S_IXUSR)
|
|
|
|
|
|
class AutoTests(TestCase):
|
|
"""Test the major branch points of letsencrypt-auto:
|
|
|
|
* An le-auto upgrade is needed.
|
|
* An le-auto upgrade is not needed.
|
|
* There was an out-of-date LE script installed.
|
|
* There was a current LE script installed.
|
|
* There was no LE script installed (less important).
|
|
* Pip hash-verification passes.
|
|
* Pip has a hash mismatch.
|
|
* The OpenSSL sig matches.
|
|
* The OpenSSL sig mismatches.
|
|
|
|
For tests which get to the end, we run merely ``letsencrypt --version``.
|
|
The functioning of the rest of the certbot script is covered by other
|
|
test suites.
|
|
|
|
"""
|
|
NEW_LE_AUTO = build_le_auto(
|
|
version='99.9.9',
|
|
requirements='letsencrypt==99.9.9 --hash=sha256:1cc14d61ab424cdee446f51e50f1123f8482ec740587fe78626c933bba2873a0')
|
|
NEW_LE_AUTO_SIG = signed(NEW_LE_AUTO)
|
|
|
|
def test_successes(self):
|
|
"""Exercise most branches of letsencrypt-auto.
|
|
|
|
They just happen to be the branches in which everything goes well.
|
|
|
|
I violate my usual rule of having small, decoupled tests, because...
|
|
|
|
1. We shouldn't need to run a Cartesian product of the branches: the
|
|
phases run in separate shell processes, containing state leakage
|
|
pretty effectively. The only shared state is FS state, and it's
|
|
limited to a temp dir, assuming (if we dare) all functions properly.
|
|
2. One combination of branches happens to set us up nicely for testing
|
|
the next, saving code.
|
|
|
|
"""
|
|
with temp_paths() as (le_auto_path, venv_dir):
|
|
# This serves a PyPI page with a higher version, a GitHub-alike
|
|
# with a corresponding le-auto script, and a matching signature.
|
|
resources = {'certbot/json': dumps({'releases': {'99.9.9': None}}),
|
|
'v99.9.9/letsencrypt-auto': self.NEW_LE_AUTO,
|
|
'v99.9.9/letsencrypt-auto.sig': self.NEW_LE_AUTO_SIG}
|
|
with serving(resources) as base_url:
|
|
run_letsencrypt_auto = partial(
|
|
run_le_auto,
|
|
le_auto_path,
|
|
venv_dir,
|
|
base_url,
|
|
PIP_FIND_LINKS=join(tests_dir(),
|
|
'fake-letsencrypt',
|
|
'dist'))
|
|
|
|
# Test when a phase-1 upgrade is needed, there's no LE binary
|
|
# installed, and pip hashes verify:
|
|
install_le_auto(build_le_auto(version='50.0.0'), le_auto_path)
|
|
out, err = run_letsencrypt_auto()
|
|
self.assertTrue(re.match(r'letsencrypt \d+\.\d+\.\d+',
|
|
err.strip().splitlines()[-1]))
|
|
# Make a few assertions to test the validity of the next tests:
|
|
self.assertIn('Upgrading certbot-auto ', out)
|
|
self.assertIn('Creating virtual environment...', out)
|
|
|
|
# Now we have le-auto 99.9.9 and LE 99.9.9 installed. This
|
|
# conveniently sets us up to test the next 2 cases.
|
|
|
|
# Test when neither phase-1 upgrade nor phase-2 upgrade is
|
|
# needed (probably a common case):
|
|
out, err = run_letsencrypt_auto()
|
|
self.assertNotIn('Upgrading certbot-auto ', out)
|
|
self.assertNotIn('Creating virtual environment...', out)
|
|
|
|
def test_phase2_upgrade(self):
|
|
"""Test a phase-2 upgrade without a phase-1 upgrade."""
|
|
resources = {'certbot/json': dumps({'releases': {'99.9.9': None}}),
|
|
'v99.9.9/letsencrypt-auto': self.NEW_LE_AUTO,
|
|
'v99.9.9/letsencrypt-auto.sig': self.NEW_LE_AUTO_SIG}
|
|
with serving(resources) as base_url:
|
|
pip_find_links=join(tests_dir(), 'fake-letsencrypt', 'dist')
|
|
with temp_paths() as (le_auto_path, venv_dir):
|
|
install_le_auto(self.NEW_LE_AUTO, le_auto_path)
|
|
|
|
# Create venv saving the correct bootstrap script version
|
|
out, err = run_le_auto(le_auto_path, venv_dir, base_url,
|
|
PIP_FIND_LINKS=pip_find_links)
|
|
self.assertNotIn('Upgrading certbot-auto ', out)
|
|
self.assertIn('Creating virtual environment...', out)
|
|
with open(join(venv_dir, BOOTSTRAP_FILENAME)) as f:
|
|
bootstrap_version = f.read()
|
|
|
|
# Create a new venv with an old letsencrypt version
|
|
with temp_paths() as (le_auto_path, venv_dir):
|
|
venv_bin = join(venv_dir, 'bin')
|
|
makedirs(venv_bin)
|
|
set_le_script_version(venv_dir, '0.0.1')
|
|
with open(join(venv_dir, BOOTSTRAP_FILENAME), 'w') as f:
|
|
f.write(bootstrap_version)
|
|
|
|
install_le_auto(self.NEW_LE_AUTO, le_auto_path)
|
|
out, err = run_le_auto(le_auto_path, venv_dir, base_url,
|
|
PIP_FIND_LINKS=pip_find_links)
|
|
|
|
self.assertNotIn('Upgrading certbot-auto ', out)
|
|
self.assertIn('Creating virtual environment...', out)
|
|
|
|
def test_openssl_failure(self):
|
|
"""Make sure we stop if the openssl signature check fails."""
|
|
with temp_paths() as (le_auto_path, venv_dir):
|
|
# Serve an unrelated hash signed with the good key (easier than
|
|
# making a bad key, and a mismatch is a mismatch):
|
|
resources = {'': '<a href="certbot/">certbot/</a>',
|
|
'certbot/json': dumps({'releases': {'99.9.9': None}}),
|
|
'v99.9.9/letsencrypt-auto': build_le_auto(version='99.9.9'),
|
|
'v99.9.9/letsencrypt-auto.sig': signed('something else')}
|
|
with serving(resources) as base_url:
|
|
copy(LE_AUTO_PATH, le_auto_path)
|
|
try:
|
|
out, err = run_le_auto(le_auto_path, venv_dir, base_url)
|
|
except CalledProcessError as exc:
|
|
self.assertEqual(exc.returncode, 1)
|
|
self.assertTrue("Couldn't verify signature of downloaded "
|
|
"certbot-auto." in exc.output)
|
|
else:
|
|
print(out)
|
|
self.fail('Signature check on certbot-auto erroneously passed.')
|
|
|
|
def test_pip_failure(self):
|
|
"""Make sure pip stops us if there is a hash mismatch."""
|
|
with temp_paths() as (le_auto_path, venv_dir):
|
|
resources = {'': '<a href="certbot/">certbot/</a>',
|
|
'certbot/json': dumps({'releases': {'99.9.9': None}})}
|
|
with serving(resources) as base_url:
|
|
# Build a le-auto script embedding a bad requirements file:
|
|
install_le_auto(
|
|
build_le_auto(
|
|
version='99.9.9',
|
|
requirements='configobj==5.0.6 --hash=sha256:badbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadb'),
|
|
le_auto_path)
|
|
try:
|
|
out, err = run_le_auto(le_auto_path, venv_dir, base_url)
|
|
except CalledProcessError as exc:
|
|
self.assertEqual(exc.returncode, 1)
|
|
self.assertTrue("THESE PACKAGES DO NOT MATCH THE HASHES "
|
|
"FROM THE REQUIREMENTS FILE" in exc.output)
|
|
self.assertFalse(
|
|
exists(venv_dir),
|
|
msg="The virtualenv was left around, even though "
|
|
"installation didn't succeed. We shouldn't do "
|
|
"this, as it foils our detection of whether we "
|
|
"need to recreate the virtualenv, which hinges "
|
|
"on the presence of $VENV_BIN/letsencrypt.")
|
|
else:
|
|
self.fail("Pip didn't detect a bad hash and stop the "
|
|
"installation.")
|