mirror of
https://github.com/certbot/certbot.git
synced 2026-01-21 19:01:07 +03:00
Merge pull request #502 from kuba/manual
ManualAuthenticator for SimpleHTTP
This commit is contained in:
5
docs/api/plugins/manual.rst
Normal file
5
docs/api/plugins/manual.rst
Normal file
@@ -0,0 +1,5 @@
|
||||
:mod:`letsencrypt.plugins.manual`
|
||||
---------------------------------
|
||||
|
||||
.. automodule:: letsencrypt.plugins.manual
|
||||
:members:
|
||||
138
letsencrypt/plugins/manual.py
Normal file
138
letsencrypt/plugins/manual.py
Normal file
@@ -0,0 +1,138 @@
|
||||
"""Manual plugin."""
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import requests
|
||||
import zope.component
|
||||
import zope.interface
|
||||
|
||||
from acme import challenges
|
||||
from acme import jose
|
||||
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt.plugins import common
|
||||
|
||||
|
||||
class ManualAuthenticator(common.Plugin):
|
||||
"""Manual Authenticator.
|
||||
|
||||
.. todo:: Support for `~.challenges.DVSNI`.
|
||||
|
||||
"""
|
||||
zope.interface.implements(interfaces.IAuthenticator)
|
||||
zope.interface.classProvides(interfaces.IPluginFactory)
|
||||
|
||||
description = "Manual Authenticator"
|
||||
|
||||
MESSAGE_TEMPLATE = """\
|
||||
Make sure your web server displays the following content at
|
||||
{uri} before continuing:
|
||||
|
||||
{achall.token}
|
||||
|
||||
If you don't have HTTP server configured, you can run the following
|
||||
command on the target server (as root):
|
||||
|
||||
{command}
|
||||
"""
|
||||
|
||||
HTTP_TEMPLATE = """\
|
||||
mkdir -p {response.URI_ROOT_PATH}
|
||||
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
|
||||
# run only once per server:
|
||||
python -m SimpleHTTPServer 80"""
|
||||
"""Non-TLS command template."""
|
||||
|
||||
# https://www.piware.de/2011/01/creating-an-https-server-in-python/
|
||||
HTTPS_TEMPLATE = """\
|
||||
mkdir -p {response.URI_ROOT_PATH} # run only once per server
|
||||
echo -n {achall.token} > {response.URI_ROOT_PATH}/{response.path}
|
||||
# run only once per server:
|
||||
openssl req -new -newkey rsa:4096 -subj "/" -days 1 -nodes -x509 -keyout key.pem -out cert.pem
|
||||
python -c "import BaseHTTPServer, SimpleHTTPServer, ssl; \\
|
||||
s = BaseHTTPServer.HTTPServer(('', 443), SimpleHTTPServer.SimpleHTTPRequestHandler); \\
|
||||
s.socket = ssl.wrap_socket(s.socket, keyfile='key.pem', certfile='cert.pem'); \\
|
||||
s.serve_forever()" """
|
||||
"""TLS command template.
|
||||
|
||||
According to the ACME specification, "the ACME server MUST ignore
|
||||
the certificate provided by the HTTPS server", so the first command
|
||||
generates temporary self-signed certificate. For the same reason
|
||||
``requests.get`` in `_verify` sets ``verify=False``. Python HTTPS
|
||||
server command serves the ``token`` on all URIs.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ManualAuthenticator, self).__init__(*args, **kwargs)
|
||||
self.template = (self.HTTP_TEMPLATE if self.config.no_simple_http_tls
|
||||
else self.HTTPS_TEMPLATE)
|
||||
|
||||
def prepare(self): # pylint: disable=missing-docstring,no-self-use
|
||||
pass # pragma: no cover
|
||||
|
||||
def more_info(self): # pylint: disable=missing-docstring,no-self-use
|
||||
return """\
|
||||
This plugin requires user's manual intervention in setting up a HTTP
|
||||
server for solving SimpleHTTP challenges and thus does not need to be
|
||||
run as a privilidged process. Alternatively shows instructions on how
|
||||
to use Python's built-in HTTP server and, in case of HTTPS, openssl
|
||||
binary for temporary key/certificate generation.""".replace("\n", "")
|
||||
|
||||
def get_chall_pref(self, domain):
|
||||
# pylint: disable=missing-docstring,no-self-use,unused-argument
|
||||
return [challenges.SimpleHTTP]
|
||||
|
||||
def perform(self, achalls): # pylint: disable=missing-docstring
|
||||
responses = []
|
||||
# TODO: group achalls by the same socket.gethostbyname(_ex)
|
||||
# and prompt only once per server (one "echo -n" per domain)
|
||||
for achall in achalls:
|
||||
responses.append(self._perform_single(achall))
|
||||
return responses
|
||||
|
||||
def _perform_single(self, achall):
|
||||
# same path for each challenge response would be easier for
|
||||
# users, but will not work if multiple domains point at the
|
||||
# same server: default command doesn't support virtual hosts
|
||||
response = challenges.SimpleHTTPResponse(
|
||||
path=jose.b64encode(os.urandom(18)),
|
||||
tls=(not self.config.no_simple_http_tls))
|
||||
assert response.good_path # is encoded os.urandom(18) good?
|
||||
|
||||
self._notify_and_wait(self.MESSAGE_TEMPLATE.format(
|
||||
achall=achall, response=response,
|
||||
uri=response.uri(achall.domain),
|
||||
command=self.template.format(achall=achall, response=response)))
|
||||
|
||||
if self._verify(achall, response):
|
||||
return response
|
||||
else:
|
||||
return None
|
||||
|
||||
def _notify_and_wait(self, message): # pylint: disable=no-self-use
|
||||
# TODO: IDisplay wraps messages, breaking the command
|
||||
#answer = zope.component.getUtility(interfaces.IDisplay).notification(
|
||||
# message=message, height=25, pause=True)
|
||||
sys.stdout.write(message)
|
||||
raw_input("Press ENTER to continue")
|
||||
|
||||
def _verify(self, achall, chall_response): # pylint: disable=no-self-use
|
||||
uri = chall_response.uri(achall.domain)
|
||||
logging.debug("Verifying %s...", uri)
|
||||
try:
|
||||
response = requests.get(uri, verify=False)
|
||||
except requests.exceptions.ConnectionError as error:
|
||||
logging.exception(error)
|
||||
return False
|
||||
|
||||
ret = response.text == achall.token
|
||||
if not ret:
|
||||
logging.error("Unable to verify %s! Expected: %r, returned: %r.",
|
||||
uri, achall.token, response.text)
|
||||
|
||||
return ret
|
||||
|
||||
def cleanup(self, achalls): # pylint: disable=missing-docstring,no-self-use
|
||||
pass # pragma: no cover
|
||||
59
letsencrypt/plugins/manual_test.py
Normal file
59
letsencrypt/plugins/manual_test.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Tests for letsencrypt.plugins.manual."""
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import requests
|
||||
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt.tests import acme_util
|
||||
|
||||
|
||||
class ManualAuthenticatorTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.plugins.manual.ManualAuthenticator."""
|
||||
|
||||
def setUp(self):
|
||||
from letsencrypt.plugins.manual import ManualAuthenticator
|
||||
self.config = mock.MagicMock(no_simple_http_tls=True)
|
||||
self.auth = ManualAuthenticator(config=self.config, name="manual")
|
||||
self.achalls = [achallenges.SimpleHTTP(
|
||||
challb=acme_util.SIMPLE_HTTP, domain="foo.com", key=None)]
|
||||
|
||||
def test_more_info(self):
|
||||
self.assertTrue(isinstance(self.auth.more_info(), str))
|
||||
|
||||
def test_get_chall_pref(self):
|
||||
self.assertTrue(all(issubclass(pref, challenges.Challenge)
|
||||
for pref in self.auth.get_chall_pref("foo.com")))
|
||||
|
||||
def test_perform_empty(self):
|
||||
self.assertEqual([], self.auth.perform([]))
|
||||
|
||||
@mock.patch("letsencrypt.plugins.manual.sys.stdout")
|
||||
@mock.patch("letsencrypt.plugins.manual.os.urandom")
|
||||
@mock.patch("letsencrypt.plugins.manual.requests.get")
|
||||
@mock.patch("__builtin__.raw_input")
|
||||
def test_perform(self, mock_raw_input, mock_get, mock_urandom, mock_stdout):
|
||||
mock_urandom.return_value = "foo"
|
||||
mock_get().text = self.achalls[0].token
|
||||
|
||||
self.assertEqual(
|
||||
[challenges.SimpleHTTPResponse(tls=False, path='Zm9v')],
|
||||
self.auth.perform(self.achalls))
|
||||
mock_raw_input.assert_called_once()
|
||||
mock_get.assert_called_with(
|
||||
"http://foo.com/.well-known/acme-challenge/Zm9v", verify=False)
|
||||
|
||||
message = mock_stdout.write.mock_calls[0][1][0]
|
||||
self.assertTrue(self.achalls[0].token in message)
|
||||
self.assertTrue('Zm9v' in message)
|
||||
|
||||
mock_get().text = self.achalls[0].token + '!'
|
||||
self.assertEqual([None], self.auth.perform(self.achalls))
|
||||
|
||||
mock_get.side_effect = requests.exceptions.ConnectionError
|
||||
self.assertEqual([None], self.auth.perform(self.achalls))
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
Reference in New Issue
Block a user