1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-23 07:20:55 +03:00

Finished authenticator tests

This commit is contained in:
Brad Warren
2015-07-21 18:14:57 -07:00
parent d1653399f5
commit 780d5fcbb9
9 changed files with 191 additions and 28 deletions

View File

@@ -935,7 +935,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
subprocess.check_call([self.conf("enmod"), mod_name],
stdout=open("/dev/null", "w"),
stderr=open("/dev/null", "w"))
apache_restart(self.conf("init"))
apache_restart(self.conf("init-script"))
except (OSError, subprocess.CalledProcessError):
logger.exception("Error enabling mod_%s", mod_name)
raise errors.MisconfigurationError(

1
tests/MANIFEST.in Normal file
View File

@@ -0,0 +1 @@
include compatibility/testdata/rsa1024_key.pem

View File

@@ -12,4 +12,6 @@ ENV APACHE_RUN_USER=daemon \
COPY tests/compatibility/configurators/apache/a2enmod.sh /usr/local/bin/
CMD [ "httpd-foreground" ]
# Note: this only exposes the port to other docker containers. You
# still have to bind to 443@host at runtime.
EXPOSE 443

View File

@@ -1,5 +1,9 @@
"""Proxies ApacheConfigurator for Apache 2.4 tests"""
import zope.interface
from tests.compatibility import errors
from tests.compatibility import interfaces
from tests.compatibility.configurators.apache import common as apache_common
@@ -33,10 +37,14 @@ SHARED_MODULES = {
class Proxy(apache_common.Proxy):
"""Wraps the ApacheConfigurator for Apache 2.4 tests"""
zope.interface.implements(interfaces.IConfiguratorProxy)
def __init__(self, args):
"""Initializes the plugin with the given command line args"""
super(Proxy, self).__init__(args)
self.start_docker("bradmw/apache2.4")
# Running init isn't ideal, but the Docker container needs to survive
# Apache restarts
self.start_docker("bradmw/apache2.4", "init")
def preprocess_config(self, server_root):
"""Prepares the configuration for use in the Docker"""

View File

@@ -48,6 +48,14 @@ class Proxy(configurators_common.Proxy):
def load_config(self):
"""Loads the next configuration for the plugin to test"""
if hasattr(self.le_config, "apache_init_script"):
try:
self.check_call_in_docker(
[self.le_config.apache_init_script, "stop"])
except errors.Error:
raise errors.Error(
"Failed to stop previous apache config from running")
config = super(Proxy, self).load_config()
self.modules = _get_modules(config)
self.version = _get_version(config)
@@ -63,7 +71,7 @@ class Proxy(configurators_common.Proxy):
try:
self.check_call_in_docker(
"apachectl -d {0} -f {1} -k restart".format(
"apachectl -d {0} -f {1} -k start".format(
server_root, config_file))
except errors.Error:
raise errors.Error(
@@ -93,7 +101,7 @@ class Proxy(configurators_common.Proxy):
self.le_config.apache_ctl = "apachectl -d {0} -f {1}".format(
server_root, config_file)
self.le_config.apache_enmod = "a2enmod.sh {0}".format(server_root)
self.le_config.apache_init = self.le_config.apache_ctl + " -k"
self.le_config.apache_init_script = self.le_config.apache_ctl + " -k"
self._apache_configurator = configurator.ApacheConfigurator(
config=configuration.NamespaceConfig(self.le_config),
@@ -119,6 +127,13 @@ class Proxy(configurators_common.Proxy):
else:
raise errors.Error("No configuration file loaded")
def deploy_cert(self, domain, cert_path, key_path, chain_path=None):
"""Installs cert"""
cert_path, key_path, chain_path = self.copy_certs_and_keys(
cert_path, key_path, chain_path)
self._apache_configurator.deploy_cert(
domain, cert_path, key_path, chain_path)
def _create_test_conf(server_root, apache_config):
"""Creates a test config file and adds it to the Apache config"""

View File

@@ -1,6 +1,7 @@
"""Provides a common base for configurator proxies"""
import logging
import os
import shutil
import tempfile
import threading
@@ -34,10 +35,12 @@ class Proxy(object):
def __init__(self, args):
"""Initializes the plugin with the given command line args"""
temp_dir = tempfile.mkdtemp()
self.le_config = util.create_le_config(temp_dir)
self._config_dir = util.extract_configs(args.configs, temp_dir)
self._configs = os.listdir(self._config_dir)
self._temp_dir = tempfile.mkdtemp()
self.le_config = util.create_le_config(self._temp_dir)
config_dir = util.extract_configs(args.configs, self._temp_dir)
self._configs = [
os.path.join(config_dir, config)
for config in os.listdir(config_dir)]
self.args = args
self._docker_client = docker.Client(
@@ -58,21 +61,22 @@ class Proxy(object):
def load_config(self):
"""Returns the next config directory to be tested"""
return os.path.join(self._config_dir, self._configs.pop())
return self._configs.pop()
def start_docker(self, image_name):
def start_docker(self, image_name, command):
"""Creates and runs a Docker container with the specified image"""
logger.info("Pulling Docker image. This may take a minute.")
for line in self._docker_client.pull(image_name, stream=True):
logger.debug(line)
host_config = docker.utils.create_host_config(
binds={
self._config_dir : {"bind" : self._config_dir, "mode" : "rw"}},
self._temp_dir : {"bind" : self._temp_dir, "mode" : "rw"}},
port_bindings={
80 : ("127.0.0.1", self.http_port),
443 : ("127.0.0.1", self.https_port)},)
container = self._docker_client.create_container(
image_name, ports=[80, 443], volumes=self._config_dir,
image_name, command, ports=[80, 443], volumes=self._temp_dir,
host_config=host_config)
if container["Warnings"]:
logger.warning(container["Warnings"])
@@ -123,8 +127,25 @@ class Proxy(object):
def execute_in_docker(self, command):
"""Executes command inside the running docker image"""
logger.info("Executing '%s'", command)
logger.debug("Executing '%s'", command)
exec_id = self._docker_client.exec_create(self._container_id, command)
output = self._docker_client.exec_start(exec_id)
returncode = self._docker_client.exec_inspect(exec_id)["ExitCode"]
return returncode, output
def copy_certs_and_keys(self, cert_path, key_path, chain_path=None):
"""Copies certs and keys into the temporary directory"""
cert_and_key_dir = os.path.join(self._temp_dir, "certs_and_keys")
os.mkdir(cert_and_key_dir)
cert = os.path.join(cert_and_key_dir, "cert")
shutil.copy(cert_path, cert)
key = os.path.join(cert_and_key_dir, "key")
shutil.copy(key_path, key)
if chain_path:
chain = os.path.join(cert_and_key_dir, "chain")
shutil.copy(chain_path, chain)
else:
chain = None
return cert, key, chain

View File

@@ -1,8 +1,20 @@
"""Tests Let's Encrypt plugins against different server configurations."""
import argparse
import filecmp
import logging
import os
import shutil
import tempfile
import OpenSSL
from acme import challenges
from acme import crypto_util
from acme import messages
from letsencrypt import achallenges
from letsencrypt.tests import acme_util
from tests.compatibility import errors
from tests.compatibility import util
from tests.compatibility.configurators.apache import apache24
@@ -20,6 +32,93 @@ PLUGINS = {"apache" : apache24.Proxy}
logger = logging.getLogger(__name__)
def test_authenticator(plugin, config, temp_dir):
"""Tests plugin as an authenticator"""
backup = os.path.join(temp_dir, "backup")
shutil.copytree(config, backup, symlinks=True)
achalls = _create_achalls(plugin)
if achalls:
try:
responses = plugin.perform(achalls)
for i in xrange(len(responses)):
if not responses[i]:
raise errors.Error(
"Plugin returned 'None' or 'False' response to "
"challenge")
elif isinstance(responses[i], challenges.DVSNIResponse):
if responses[i].simple_verify(achalls[i],
achalls[i].domain,
util.JWK.key.public_key(),
host="127.0.0.1",
port=plugin.https_port):
logger.info(
"Verification of DVSNI response for %s succeeded",
achalls[i].domain)
else:
raise errors.Error(
"Verification of DVSNI response for {0} "
"failed".format(achalls[i].domain))
finally:
plugin.cleanup(achalls)
if _dirs_are_unequal(config, backup):
raise errors.Error("Challenge cleanup failed")
else:
logger.info("Challenge cleanup succeeded")
def _create_achalls(plugin):
"""Returns a list of annotated challenges to test on plugin"""
achalls = list()
names = plugin.get_testable_domain_names()
for domain in names:
prefs = plugin.get_chall_pref(domain)
for chall_type in prefs:
if chall_type == challenges.DVSNI:
chall = challenges.DVSNI(
r=os.urandom(challenges.DVSNI.R_SIZE),
nonce=os.urandom(challenges.DVSNI.NONCE_SIZE))
challb = acme_util.chall_to_challb(
chall, messages.STATUS_PENDING)
achall = achallenges.DVSNI(
challb=challb, domain=domain, key=util.JWK)
achalls.append(achall)
return achalls
def test_installer(plugin, config, temp_dir):
"""Tests plugin as an installer"""
backup = os.path.join(temp_dir, "backup")
shutil.copytree(config, backup, symlinks=True)
if plugin.get_all_names() != plugin.get_all_names_answer():
raise errors.Error("get_all_names test failed")
else:
logging.info("get_all_names test succeeded")
domains = list(plugin.get_testable_domain_names())
cert = crypto_util.gen_ss_cert(util.KEY, domains)
cert_path = os.path.join(temp_dir, "cert.pem")
with open(cert_path, "w") as f:
f.write(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert))
for domain in domains:
plugin.deploy_cert(domain, cert_path, util.KEY_PATH)
plugin.save()
plugin.restart()
def _dirs_are_unequal(dir1, dir2):
"""Returns True if dir1 and dir2 are equal"""
dircmp = filecmp.dircmp(dir1, dir2)
return (dircmp.left_only or dircmp.right_only or
dircmp.diff_files or dircmp.funny_files)
def get_args():
"""Returns parsed command line arguments."""
parser = argparse.ArgumentParser(
@@ -62,17 +161,10 @@ def setup_logging(args):
handler = logging.StreamHandler()
root_logger = logging.getLogger()
root_logger.setLevel(logging.WARNING - args.verbose_count * 10)
root_logger.setLevel(logging.INFO - args.verbose_count * 10)
root_logger.addHandler(handler)
def test_installer(plugin):
"""Tests plugin as an installer"""
if plugin.get_all_names() != plugin.get_all_names_answer():
raise errors.Error(
"Names found by plugin don't match names found by the wrapper")
def main():
"""Main test script execution."""
args = get_args()
@@ -81,17 +173,20 @@ def main():
if args.plugin not in PLUGINS:
raise errors.Error("Unknown plugin {0}".format(args.plugin))
temp_dir = tempfile.mkdtemp()
plugin = PLUGINS[args.plugin](args)
try:
plugin.execute_in_docker("mkdir -p /var/log/apache2")
while plugin.has_more_configs():
try:
print "Loaded configuration: {0}".format(plugin.load_config())
if args.install:
test_installer(plugin)
config = plugin.load_config()
logger.info("Loaded configuration: %s", config)
if args.auth:
test_authenticator(plugin, config, temp_dir)
#if args.install:
#test_installer(plugin, temp_dir)
except errors.Error as error:
print "Test failed"
print error
logger.warning("Test failed: %s", error)
finally:
plugin.cleanup_from_tests()

View File

@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCsREbM+UcfsgDy2w56AVGyxsO0HVsbEZHHoEzv7qksIwFgRYMp
rowwIxD450RQQqjvw9IoXlMVmr1t5szn5KXn9JRO9T5KNCCy3VPx75WBcp6kzd9Q
2HS1OEOtpilNnDkZ+TJfdgFWPUBYj2o4Md1hPmcvagiIJY5U6speka2bjwIDAQAB
AoGANCMZ9pF/mDUsmlP4Rq69hkkoFAxKdZ/UqkF256so4mXZ1cRUFTpxzWPfkCWW
hGAYdzCiG3uo08IYkPmojIqkN1dk5Hcq5eQAmshaPkQHQCHjmPjjcNvgjIXQoGUf
TpDU2hbY4UAlJlj4ZLh+jGP5Zq8/WrNi8RsI3v9Nagfp/FECQQDgi2q8p1gX0TNh
d1aEKmSXkR3bxkyFk6oS+pBrAG3+yX27ZayN6Rx6DOs/FcBsOu7fX3PYBziDeEWe
Lkf1P743AkEAxGYT/LY3puglSz4iJZZzWmRCrVOg41yhfQ+F1BRX43/2vtoU5GyM
2lUn1vQ2e/rfmnAvfJxc90GeZCIHB1ihaQJBALH8UMLxMtbOMJgVbDKfF9U8ZhqK
+KT5A1q/2jG2yXmoZU1hroFeQgBMtTvwFfK0VBwjIUQflSBA+Y4EyW0Q9ckCQGvd
jHitM1+N/H2YwHRYbz5j9mLvnVuCEod3MQ9LpQGj1Eb5y6OxIqL/RgQ+2HW7UXem
yc3sqvp5pZ5lOesE+JECQETPI64gqxlTIs3nErNMpMynUuTWpaElOcIJTT6icLzB
Xix67kKXjROO5D58GEYkM0Yi5k7YdUPoQBW7MoIrSIA=
-----END RSA PRIVATE KEY-----

View File

@@ -8,10 +8,16 @@ import shutil
import socket
import tarfile
from acme import jose
from acme import test_util
from letsencrypt import constants
from tests.compatibility import errors
_KEY_BASE = "rsa1024_key.pem"
KEY_PATH = test_util.vector_path(_KEY_BASE)
KEY = test_util.load_pyopenssl_private_key(_KEY_BASE)
JWK = jose.JWKRSA(key=test_util.load_rsa_private_key(_KEY_BASE))
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")