1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Script plugin (#3521)

* Script plugin initial commit

* Fix auth script path

* Return correct responses

* Added DNS-01 support

* Report the challenge pref correctly

* Use config root from certbot constants rather than hardcoded

* Remove prehook and rename posthook to cleanup for clarity

* Refactoring

* Docs

* Refactoring

* Refactoring continued, working now

* Use global preferred-challenges argument in favor of local

* Added http-01 as fallback challenge if not defined

* Do not continue if auth script not defined

* Skip unnecessary steps when running

* Read config values from correct places

* Tests and minor fixes

* Make Python 2.6 happy again

* Added CERTBOT_AUTH_OUTPUT and better tests

* Lint & Py3 fixes

* Make Python 2.6 happy again

* Doc changes

* Refactor hook execute and reuse in script plugin

* Refactored hook validation

* Added long_description for plugin help text

* Refactored env var writing
This commit is contained in:
Joona Hoikkala
2016-11-08 01:22:48 +02:00
committed by Peter Eckersley
parent d197b5aa05
commit d741e684d0
8 changed files with 381 additions and 10 deletions

View File

@@ -80,6 +80,7 @@ USAGE = SHORT_USAGE + """Choice of server plugins for obtaining and installing c
--standalone Run a standalone webserver for authentication
%s
--webroot Place files in a server's webroot folder for authentication
--script User provided shell scripts for authentication
OR use different plugins to obtain (authenticate) the cert and then install it:
@@ -92,7 +93,7 @@ More detailed help:
all, automation, paths, security, testing, or any of the subcommands or
plugins (certonly, renew, install, register, nginx, apache, standalone,
webroot, etc.)
webroot, script, etc.)
"""
@@ -587,7 +588,8 @@ class HelpfulArgumentParser(object):
"""
for name, plugin_ep in six.iteritems(plugins):
parser_or_group = self.add_group(name, description=plugin_ep.description)
parser_or_group = self.add_group(name,
description=plugin_ep.long_description)
plugin_ep.plugin_cls.inject_parser_options(parser_or_group, name)
def determine_help_topics(self, chosen_topic):
@@ -989,6 +991,8 @@ def _plugins_parsing(helpful, plugins):
help="Obtain and install certs using Nginx")
helpful.add(["plugins", "certonly"], "--standalone", action="store_true",
help='Obtain certs using a "standalone" webserver.')
helpful.add(["plugins", "certonly"], "--script", action="store_true",
help='Obtain certs using shell script(s)')
helpful.add(["plugins", "certonly"], "--manual", action="store_true",
help='Provide laborious manual instructions for obtaining a cert')
helpful.add(["plugins", "certonly"], "--webroot", action="store_true",

View File

@@ -12,16 +12,16 @@ logger = logging.getLogger(__name__)
def validate_hooks(config):
"""Check hook commands are executable."""
_validate_hook(config.pre_hook, "pre")
_validate_hook(config.post_hook, "post")
_validate_hook(config.renew_hook, "renew")
validate_hook(config.pre_hook, "pre")
validate_hook(config.post_hook, "post")
validate_hook(config.renew_hook, "renew")
def _prog(shell_cmd):
"""Extract the program run by a shell command"""
cmd = _which(shell_cmd)
return os.path.basename(cmd) if cmd else None
def _validate_hook(shell_cmd, hook_name):
def validate_hook(shell_cmd, hook_name):
"""Check that a command provided as a hook is plausibly executable.
:raises .errors.HookCommandNotFound: if the command is not found
@@ -69,17 +69,30 @@ def renew_hook(config, domains, lineage_path):
else:
logger.warning("Dry run: skipping renewal hook command: %s", config.renew_hook)
def _run_hook(shell_cmd):
"""Run a hook command.
:returns: stderr if there was any"""
cmd = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=PIPE, stdin=PIPE)
_out, err = cmd.communicate()
err, _ = execute(shell_cmd)
return err
def execute(shell_cmd):
"""Run a command.
:returns: `tuple` (`str` stderr, `str` stdout)"""
cmd = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=PIPE)
out, err = cmd.communicate()
if cmd.returncode != 0:
logger.error('Hook command "%s" returned error code %d', shell_cmd, cmd.returncode)
logger.error('Hook command "%s" returned error code %d',
shell_cmd, cmd.returncode)
if err:
logger.error('Error output from %s:\n%s', _prog(shell_cmd), err)
return (err, out)
def _is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

View File

@@ -53,6 +53,14 @@ class PluginEntryPoint(object):
"""Description with name. Handy for UI."""
return "{0} ({1})".format(self.description, self.name)
@property
def long_description(self):
"""Long description of the plugin."""
try:
return self.plugin_cls.long_description
except AttributeError:
return self.description
@property
def hidden(self):
"""Should this plugin be hidden from UI?"""

View File

@@ -63,6 +63,18 @@ class PluginEntryPointTest(unittest.TestCase):
self.assertEqual(
"Desc (sa)", self.plugin_ep.description_with_name)
def test_long_description(self):
self.plugin_ep.plugin_cls = mock.MagicMock(
long_description="Long desc")
self.assertEqual(
"Long desc", self.plugin_ep.long_description)
def test_long_description_nonexistent(self):
self.plugin_ep.plugin_cls = mock.MagicMock(
description="Long desc not found", spec=["description"])
self.assertEqual(
"Long desc not found", self.plugin_ep.long_description)
def test_ifaces(self):
self.assertTrue(self.plugin_ep.ifaces((interfaces.IAuthenticator,)))
self.assertFalse(self.plugin_ep.ifaces((interfaces.IInstaller,)))

161
certbot/plugins/script.py Normal file
View File

@@ -0,0 +1,161 @@
"""Script-based Authenticator."""
import logging
import os
import sys
import zope.interface
from acme import challenges
from certbot import errors
from certbot import interfaces
from certbot import hooks
from certbot.plugins import common
logger = logging.getLogger(__name__)
CHALLENGES = ["http-01", "dns-01"]
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
"""Script authenticator
calls user defined script to perform authentication and
optionally cleanup.
"""
description = "Authenticate using user provided script(s)"
long_description = ("Authenticate using user provided script(s). " +
"Authenticator script has the following environment " +
"variables available for it: " +
"CERTBOT_DOMAIN - The domain being authenticated " +
"CERTBOT_VALIDATION - The validation string " +
"CERTBOT_TOKEN - Resource name part of HTTP-01 " +
"challenge (HTTP-01 only). " +
"Cleanup script has all the above, and additional " +
"var: CERTBOT_AUTH_OUTPUT - stdout output from the " +
"authenticator"
)
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.cleanup_script = None
self.auth_script = None
self.challenges = []
@classmethod
def add_parser_arguments(cls, add):
add("auth", default=None, required=False,
help="path or command for the authentication script")
add("cleanup", default=None, required=False,
help="path or command for the cleanup script")
@property
def supported_challenges(self):
"""Challenges supported by this plugin."""
return self.challenges
def more_info(self): # pylint: disable=missing-docstring
return("This authenticator enables user to perform authentication " +
"using shell script(s).")
def prepare(self):
"""Prepare script plugin, check challenge, scripts and register them"""
pref_challenges = self.config.pref_challs
for c in pref_challenges:
if c.typ in CHALLENGES:
self.challenges.append(c)
if not self.challenges and len(pref_challenges):
# Challenges requested, but not supported
raise errors.PluginError(
"Unfortunately script plugin doesn't yet support " +
"the requested challenges")
# Challenge not defined on cli, set default
if not self.challenges:
self.challenges.append(challenges.Challenge.TYPES["http-01"])
if not self.conf("auth"):
raise errors.PluginError("Parameter --script-auth is required " +
"for script plugin")
self._prepare_scripts()
def _prepare_scripts(self):
"""Helper method for prepare, to take care of validating scripts"""
script_path = self.conf("auth")
cleanup_path = self.conf("cleanup")
if self.config.validate_hooks:
hooks.validate_hook(script_path, "script_auth")
self.auth_script = script_path
if cleanup_path:
if self.config.validate_hooks:
hooks.validate_hook(cleanup_path, "script_cleanup")
self.cleanup_script = cleanup_path
def get_chall_pref(self, domain):
"""Return challenge(s) we're answering to """
# pylint: disable=unused-argument
return self.challenges
def perform(self, achalls):
"""Perform the authentication per challenge"""
mapping = {"http-01": self._setup_env_http,
"dns-01": self._setup_env_dns}
responses = []
for achall in achalls:
response, validation = achall.response_and_validation()
# Setup env vars
mapping[achall.typ](achall, validation)
output = self.execute(self.auth_script)
if output:
self._write_auth_output(output)
responses.append(response)
return responses
def _setup_env_http(self, achall, validation):
"""Write environment variables for http challenge"""
ev = dict()
ev["CERTBOT_TOKEN"] = achall.chall.encode("token")
ev["CERTBOT_VALIDATION"] = validation
ev["CERTBOT_DOMAIN"] = achall.domain
os.environ.update(ev)
def _setup_env_dns(self, achall, validation):
"""Write environment variables for dns challenge"""
ev = dict()
ev["CERTBOT_VALIDATION"] = validation
ev["CERTBOT_DOMAIN"] = achall.domain
os.environ.update(ev)
def _write_auth_output(self, out):
"""Write output from auth script to env var for
cleanup to act upon"""
os.environ.update({"CERTBOT_AUTH_OUTPUT": out.strip()})
def _normalize_string(self, value):
"""Return string instead of bytestring for Python3.
Helper function for writing env vars, as os.environ needs str"""
if isinstance(value, bytes):
value = value.decode(sys.getdefaultencoding())
return str(value)
def execute(self, shell_cmd):
"""Run a script.
:param str shell_cmd: Command to run
:returns: `str` stdout output"""
_, out = hooks.execute(shell_cmd)
return self._normalize_string(out)
def cleanup(self, achalls): # pylint: disable=unused-argument
"""Run cleanup.sh """
if self.cleanup_script:
self.execute(self.cleanup_script)

View File

@@ -0,0 +1,170 @@
"""Tests for certbot.plugins.manual."""
import os
import tempfile
import unittest
import mock
from acme import challenges
from acme import jose
from certbot import achallenges
from certbot import errors
from certbot.tests import acme_util
from certbot.tests import test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class AuthenticatorTest(unittest.TestCase):
"""Tests for certbot.plugins.script.Authenticator."""
def setUp(self):
from certbot.plugins.script import Authenticator
self.auth_return_value = "return from auth\n"
self.script_nonexec = create_script(b'# empty')
self.script_exec = create_script_exec(b'echo "return from auth\n"')
self.config = mock.MagicMock(
script_auth=self.script_exec,
script_cleanup=self.script_exec,
pref_challs=[challenges.Challenge.TYPES["http-01"],
challenges.Challenge.TYPES["dns-01"],
challenges.Challenge.TYPES["tls-sni-01"]])
self.tlssni_config = mock.MagicMock(
script_auth=self.script_exec,
script_cleanup=self.script_exec,
pref_challs=[challenges.Challenge.TYPES["tls-sni-01"]])
self.nochall_config = mock.MagicMock(
script_auth=self.script_exec,
script_cleanup=self.script_exec,
)
self.default = Authenticator(config=self.config, name="script")
self.onlytlssni = Authenticator(config=self.tlssni_config,
name="script")
self.nochall = Authenticator(config=self.nochall_config,
name="script")
self.http01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.HTTP01_P, domain="foo.com", account_key=KEY)
self.dns01 = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.DNS01_P, domain="foo.com", account_key=KEY)
self.achalls = [self.http01, self.dns01]
def tearDown(self):
os.remove(self.script_exec)
os.remove(self.script_nonexec)
def test_prepare_normal(self):
"""Test prepare with typical configuration"""
from certbot.plugins.script import Authenticator
# Erroring combinations in from of (auth_script, cleanup_script, error)
for v in [("/NONEXISTENT/script.sh", "/NONEXISTENT/script.sh",
errors.HookCommandNotFound),
(self.script_nonexec, "/NONEXISTENT/script.sh",
errors.HookCommandNotFound),
(self.script_exec, "/NONEXISTENT/script.sh",
errors.HookCommandNotFound),
("/NONEXISTENT/script.sh", self.script_nonexec,
errors.HookCommandNotFound),
("/NONEXISTENT/script.sh", self.script_exec,
errors.HookCommandNotFound),
(None, self.script_exec,
errors.PluginError)]:
testconf = mock.MagicMock(
script_auth=v[0],
script_cleanup=v[1],
pref_challs=[challenges.Challenge.TYPES["http-01"]])
testauth = Authenticator(config=testconf, name="script")
self.assertRaises(v[2], testauth.prepare)
# This should not error
self.default.prepare()
self.assertEqual(len(self.default.challenges), 2)
def test_prepare_tlssni(self):
"""Test for provided, but unsupported challenge type"""
self.assertRaises(errors.PluginError, self.onlytlssni.prepare)
def test_prepare_nochall(self):
"""Test for default challenge"""
self.nochall.prepare()
self.assertEqual(len(self.nochall.challenges), 1)
def test_more_info(self):
self.assertTrue(isinstance(self.default.more_info(), str))
def test_get_chall_pref(self):
self.default.prepare()
self.assertTrue(all(issubclass(pref, challenges.Challenge)
for pref in self.default.get_chall_pref(
"foo.com")))
def test_get_supported_challenges(self):
self.default.prepare()
self.assertTrue(all(issubclass(sup, challenges.Challenge)
for sup in self.default.supported_challenges))
def test_perform(self):
resp_http = self.http01.response(KEY)
resp_dns = self.dns01.response(KEY)
self.default.prepare()
# Check for the env vars prior to the run
self.assertFalse("CERTBOT_VALIDATION" in os.environ.keys())
self.assertFalse("CERTBOT_DOMAIN" in os.environ.keys())
self.assertFalse("CERTBOT_AUTH_OUTPUT" in os.environ.keys())
pref_resp = self.default.perform(self.achalls)
self.assertEqual([resp_http, resp_dns], pref_resp)
# Check for the env vars post run
self.assertTrue("CERTBOT_VALIDATION" in os.environ.keys())
self.assertTrue("CERTBOT_DOMAIN" in os.environ.keys())
self.assertTrue("CERTBOT_AUTH_OUTPUT" in os.environ.keys())
self.assertEqual(os.environ["CERTBOT_AUTH_OUTPUT"],
self.auth_return_value.strip())
@mock.patch('certbot.plugins.script.Authenticator.execute')
def test_cleanup(self, mock_exec):
mock_exec.return_value = (0, None, None)
self.default.prepare()
self.default.cleanup(self.achalls)
self.assertEqual(mock_exec.call_count, 1)
@mock.patch('certbot.hooks.Popen')
def test_execute(self, mock_popen):
proc = mock.Mock()
# tuple values: stdout, stderr, errorcode, num_of_logger_calls
for t in [("", "", 0, 0),
(self.auth_return_value, "", 0, 0),
(None, "stderr_output", 0, 1),
("whatever", "stderr_output", 1, 2),
(b'bytestring outval', "", 0, 0)]:
proc = mock.Mock()
attrs = {'communicate.return_value': (t[0], t[1]),
'returncode': t[2]}
proc.configure_mock(**attrs) # pylint: disable=star-args
mock_popen.return_value = proc
with mock.patch('certbot.hooks.logger.error') as mock_log:
output = self.default.execute(self.script_exec)
self.assertEqual(mock_log.call_count, t[3])
self.assertTrue(isinstance(output, str))
def create_script(contents):
""" Helper to create temporary file """
f = tempfile.NamedTemporaryFile(delete=False, prefix='.sh')
f.write(contents)
f.close()
return f.name
def create_script_exec(contents):
""" Helper to create temporary file with exec permissions"""
fname = create_script(contents)
os.chmod(fname, 0o700)
return fname

View File

@@ -131,7 +131,7 @@ def choose_plugin(prepared, question):
else:
return None
noninstaller_plugins = ["webroot", "manual", "standalone"]
noninstaller_plugins = ["webroot", "manual", "standalone", "script"]
def record_chosen_plugins(config, plugins, auth, inst):
"Update the config entries to reflect the plugins we actually selected."
@@ -236,6 +236,8 @@ def cli_plugin_requests(config):
req_auth = set_configurator(req_auth, "webroot")
if config.manual:
req_auth = set_configurator(req_auth, "manual")
if config.script:
req_auth = set_configurator(req_auth, "script")
logger.debug("Requested authenticator %s and installer %s", req_auth, req_inst)
return req_auth, req_inst

View File

@@ -132,6 +132,7 @@ setup(
'null = certbot.plugins.null:Installer',
'standalone = certbot.plugins.standalone:Authenticator',
'webroot = certbot.plugins.webroot:Authenticator',
'script = certbot.plugins.script:Authenticator',
],
},
)