1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-14 23:22:21 +03:00

Prepare for "None" and misconfigured Installers

This commit is contained in:
James Kasten
2015-01-24 02:15:23 -08:00
parent 88d016b8ca
commit f67db5ef43
9 changed files with 246 additions and 50 deletions

View File

@@ -1039,7 +1039,8 @@ def enable_mod(mod_name):
def mod_loaded(module):
"""Checks to see if mod_ssl is loaded
Uses CONFIG.APACHE_CTL to get loaded module list
Uses CONFIG.APACHE_CTL to get loaded module list. This also effectively
serves as a config_test.
:returns: If ssl_module is included and active in Apache
:rtype: bool
@@ -1073,7 +1074,13 @@ def apache_restart():
"""Restarts the Apache Server.
.. todo:: Try to use reload instead. (This caused timing problems before)
.. todo:: This should be written to use the process return code.
.. todo:: On failure, this should be a recovery_routine call with another
restart. This will confuse and inhibit developers from testing code
though. This change should happen after
the ApacheConfigurator has been thoroughly tested. The function will
need to be moved into the class again. Perhaps
this version can live on... for testing purposes.
"""
try:
@@ -1084,7 +1091,7 @@ def apache_restart():
if proc.returncode != 0:
# Enter recovery routine...
logging.error("Configtest failed")
logging.error("Apache Restart Failed!")
logging.error(stdout)
logging.error(stderr)
return False

View File

@@ -342,7 +342,7 @@ class ApacheParser(object):
if os.path.isfile(os.path.join(self.root, name)):
return os.path.join(self.root, name)
raise errors.LetsEncryptConfiguratorError(
raise errors.LetsEncryptNoInstallationError(
"Could not find configuration root")
def _set_user_config_file(self, root):

View File

@@ -8,6 +8,7 @@ from letsencrypt.client import CONFIG
from letsencrypt.client import crypto_util
from letsencrypt.client import le_util
# Authenticator Challenges
DvsniChall = collections.namedtuple("DvsniChall", "domain, r_b64, nonce, key")
SimpleHttpsChall = collections.namedtuple(

View File

@@ -22,7 +22,7 @@ from letsencrypt.client import le_util
from letsencrypt.client import network
# it's weird to point to chocolate servers via raw IPv6 addresses, and
# it's weird to point to ACME servers via raw IPv6 addresses, and
# such addresses can be %SCARY in some contexts, so out of paranoia
# let's disable them by default
ALLOW_RAW_IPV6_SERVER = False
@@ -48,6 +48,7 @@ class Client(object):
zope.interface.implements(interfaces.IAuthenticator)
Key = collections.namedtuple("Key", "file pem")
# Note: form is the type of data, "pem" or "der"
CSR = collections.namedtuple("CSR", "file data form")
def __init__(self, server, authkey, dv_auth, installer):
@@ -65,9 +66,12 @@ class Client(object):
self.installer = installer
client_auth = client_authenticator.ClientAuthenticator(server)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, client_auth, self.network)
if dv_auth is not None:
client_auth = client_authenticator.ClientAuthenticator(server)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, client_auth, self.network)
else:
self.auth_handler = None
def obtain_certificate(self, domains, csr=None,
cert_path=CONFIG.CERT_PATH,
@@ -86,7 +90,12 @@ class Client(object):
:rtype: `tuple` of `str`
"""
if self.auth_handler is None:
logging.warning("Unable to obtain a certificate, because client "
"does not have a valid auth handler.")
sanity_check_names(domains)
# Request Challenges
for name in domains:
self.auth_handler.add_chall_msg(
@@ -180,6 +189,11 @@ class Client(object):
:param str chain_file: chain file path
"""
if self.installer is None:
logging.warning("No installer specified, client is unable to deploy"
"the certificate")
raise errors.LetsEncryptClientError("No installer available")
chain = None if chain_file is None else os.path.abspath(chain_file)
for dom in domains:
@@ -203,7 +217,15 @@ class Client(object):
:param redirect: If traffic should be forwarded from HTTP to HTTPS.
:type redirect: bool or None
:raises :class:`letsencrypt.client.errors.LetsEncryptClientError`: if
no installer is specified in the client.
"""
if self.installer is None:
logging.warning("No installer is specified, there isn't any "
"configuration to enhance.")
raise errors.LetsEncryptClientError("No installer available")
if redirect is None:
redirect = zope.component.getUtility(
interfaces.IDisplay).redirect_by_default()

View File

@@ -14,15 +14,19 @@ class LetsEncryptAuthHandlerError(LetsEncryptClientError):
class LetsEncryptClientAuthError(LetsEncryptAuthHandlerError):
"""Let's Encrypt Client Authenticator Error."""
"""Let's Encrypt Client Authenticator error."""
class LetsEncryptConfiguratorError(LetsEncryptClientError):
"""Let's Encrypt Configurator error."""
class LetsEncryptMisconfigurationError(LetsEncryptClientError):
"""Let's Encrypt Misconfiguration Error."""
class LetsEncryptNoInstallationError(LetsEncryptConfiguratorError):
"""Let's Encrypt No Installation error."""
class LetsEncryptMisconfigurationError(LetsEncryptConfiguratorError):
"""Let's Encrypt Misconfiguration error."""
class LetsEncryptDvsniError(LetsEncryptConfiguratorError):

View File

@@ -8,6 +8,7 @@ from letsencrypt.client import CONFIG
from letsencrypt.client import errors
from letsencrypt.client import le_util
class Reverter(object):
"""Reverter Class - save and revert configuration checkpoints"""
def __init__(self, direc=None):
@@ -76,9 +77,6 @@ class Reverter(object):
All checkpoints are printed to the console.
.. note:: Any 'IN_PROGRESS' checkpoints will be removed by the cleanup
script found in the constructor, before this function would ever be
called.
.. todo:: Decide on a policy for error handling, OSError IOError...
"""
@@ -86,8 +84,8 @@ class Reverter(object):
backups.sort(reverse=True)
if not backups:
logging.info("Letsencrypt has not saved any backups of your "
"configuration")
logging.info("The Let's Encrypt client has not saved any backups "
"of your configuration")
return
# Make sure there isn't anything unexpected in the backup folder
# There should only be timestamped (float) directories
@@ -154,17 +152,11 @@ class Reverter(object):
"""
le_util.make_or_verify_dir(cp_dir, 0o755, os.geteuid())
existing_filepaths = []
filepaths_path = os.path.join(cp_dir, "FILEPATHS")
# Open up FILEPATHS differently depending on if it already exists
if os.path.isfile(filepaths_path):
op_fd = open(filepaths_path, 'r+')
existing_filepaths = op_fd.read().splitlines()
else:
op_fd = open(filepaths_path, 'w')
op_fd, existing_filepaths = self._read_and_append(
os.path.join(cp_dir, "FILEPATHS"))
idx = len(existing_filepaths)
for filename in save_files:
# No need to copy/index already existing files
# The oldest copy already exists in the directory...
@@ -191,6 +183,22 @@ class Reverter(object):
with open(os.path.join(cp_dir, "CHANGES_SINCE"), 'a') as notes_fd:
notes_fd.write(save_notes)
def _read_and_append(self, filepath): # pylint: disable=no-self-use
"""Reads the file lines and returns a fd.
Read the file returning the lines, and a pointer to the end of the file.
"""
# Open up filepath differently depending on if it already exists
if os.path.isfile(filepath):
op_fd = open(filepath, 'r+')
lines = op_fd.read().splitlines()
else:
lines = []
op_fd = open(filepath, 'w')
return op_fd, lines
def _recover_checkpoint(self, cp_dir):
"""Recover a specific checkpoint.
@@ -237,13 +245,13 @@ class Reverter(object):
"""
protected_files = []
# Check modified files
# Get temp modified files
temp_path = os.path.join(self.direc['temp'], "FILEPATHS")
if os.path.isfile(temp_path):
with open(temp_path, 'r') as protected_fd:
protected_files.extend(protected_fd.read().splitlines())
# Check new files
# Get temp new files
new_path = os.path.join(self.direc['temp'], "NEW_FILES")
if os.path.isfile(new_path):
with open(new_path, 'r') as protected_fd:
@@ -285,14 +293,23 @@ class Reverter(object):
cp_dir = self.direc['progress']
le_util.make_or_verify_dir(cp_dir, 0o755, os.geteuid())
# Append all new files (that aren't already registered)
new_fd = None
try:
with open(os.path.join(cp_dir, "NEW_FILES"), 'a') as new_fd:
for file_path in files:
new_fd.write("{0}{1}".format(file_path, os.linesep))
new_fd, ex_files = self._read_and_append(
os.path.join(cp_dir, "NEW_FILES"))
for path in files:
if path not in ex_files:
new_fd.write("{0}{1}".format(path, os.linesep))
except (IOError, OSError):
logging.error("Unable to register file creation(s) - %s", files)
raise errors.LetsEncryptReverterError(
"Unable to register file creation(s) - {0}".format(files))
finally:
if new_fd is not None:
new_fd.close()
def recovery_routine(self):
"""Revert all previously modified files.
@@ -397,14 +414,15 @@ class Reverter(object):
# It is possible save checkpoints faster than 1 per second resulting in
# collisions in the naming convention.
cur_time = time.time()
for i in range(10):
for _ in range(10):
final_dir = os.path.join(self.direc['backup'], str(cur_time))
try:
os.rename(self.direc['progress'], final_dir)
return
except OSError:
# It is possible if the checkpoints are made extremely quickly that
# There will be a naming collision, increment and try again
# It is possible if the checkpoints are made extremely quickly
# that will result in a name collision.
# If so, increment and try again
cur_time += .01
# After 10 attempts... something is probably wrong here...

View File

@@ -0,0 +1,88 @@
"""letsencrypt.scripts.main.py tests."""
import unittest
import mock
import zope.component
class RollbackTest(unittest.TestCase):
"""Test the rollback function."""
def setUp(self):
self.m_install = mock.MagicMock()
self.m_input = mock.MagicMock()
zope.component.getUtility = self.m_input
def _call(self, checkpoints):
from letsencrypt.scripts.main import rollback
rollback(checkpoints)
@mock.patch("letsencrypt.scripts.main.determine_installer")
def test_no_problems(self, mock_det):
mock_det.side_effect = self.m_install
self._call(1)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 1)
self.assertEqual(self.m_install().restart.call_count, 1)
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.scripts.main.determine_installer")
def test_misconfiguration_fixed(self, mock_det, mock_rev):
from letsencrypt.client.errors import LetsEncryptMisconfigurationError
mock_det.side_effect = [LetsEncryptMisconfigurationError,
self.m_install]
self.m_input().generic_yesno.return_value = True
self._call(1)
# Don't rollback twice... (only on one object)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 1)
# Only restart once
self.assertEqual(self.m_install.restart.call_count, 1)
@mock.patch("letsencrypt.scripts.main.logging.warning")
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.scripts.main.determine_installer")
def test_misconfiguration_remains(self, mock_det, mock_rev, mock_warn):
from letsencrypt.client.errors import LetsEncryptMisconfigurationError
mock_det.side_effect = LetsEncryptMisconfigurationError
self.m_input().generic_yesno.return_value = True
self._call(1)
# Don't rollback twice... (only on one object)
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 1)
# Never call restart because init never succeeds
self.assertEqual(self.m_install().restart.call_count, 0)
# There should be a warning about the remaining problem
self.assertEqual(mock_warn.call_count, 1)
@mock.patch("letsencrypt.client.reverter.Reverter")
@mock.patch("letsencrypt.scripts.main.determine_installer")
def test_user_decides_to_manually_investigate(self, mock_det, mock_rev):
from letsencrypt.client.errors import LetsEncryptMisconfigurationError
mock_det.side_effect = LetsEncryptMisconfigurationError
self.m_input().generic_yesno.return_value = False
self._call(1)
# Neither is ever called
self.assertEqual(self.m_install().rollback_checkpoints.call_count, 0)
self.assertEqual(mock_rev().rollback_checkpoints.call_count, 0)
@mock.patch("letsencrypt.scripts.main.determine_installer")
def test_no_installer(self, mock_det):
mock_det.return_value = None
# Just make sure no exceptions are raised
self._call(1)
if __name__ == '__main__':
unittest.main()

View File

@@ -7,6 +7,7 @@ import unittest
import mock
# pylint: disable=invalid-name,protected-access,too-many-instance-attributes
class ReverterCheckpointLocalTest(unittest.TestCase):
"""Test the Reverter Class."""
@@ -105,6 +106,16 @@ class ReverterCheckpointLocalTest(unittest.TestCase):
self.assertFalse(os.path.isfile(config3))
self.assertFalse(os.path.isfile(config4))
def test_multiple_registration_same_file(self):
self.reverter.register_file_creation(True, self.config1)
self.reverter.register_file_creation(True, self.config1)
self.reverter.register_file_creation(True, self.config1)
self.reverter.register_file_creation(True, self.config1)
files = get_new_files(self.direc['temp'])
self.assertEqual(len(files), 1)
def test_register_file_creation_write_error(self):
from letsencrypt.client.errors import LetsEncryptReverterError
@@ -427,6 +438,11 @@ def get_filepaths(dire):
return read_in(os.path.join(dire, 'FILEPATHS'))
def get_new_files(dire):
"""Get new files."""
return read_in(os.path.join(dire, 'NEW_FILES')).splitlines()
def read_in(path):
"""Read in a file, return the str"""
with open(path, 'r') as file_fd:

View File

@@ -97,6 +97,7 @@ def main():
logging.fatal("Please fix your configuration before proceeding. "
"The Installer exited with the following message: "
"%s", str(err))
sys.exit(1)
# Use the same object if possible
if interfaces.IAuthenticator.providedBy(installer):
@@ -117,9 +118,16 @@ def main():
# Validate the key and csr
client.validate_key_csr(privkey)
cert_file, chain_file = acme.obtain_certificate(domains)
acme.deploy_certificate(domains, privkey, cert_file, chain_file)
acme.enhance_config(domains, args.redirect)
# This more closely mimics the capabilities of the CLI
# It should be possible for reconfig only, install-only, no-install
# I am not sure the best way to handle all of the unimplemented abilities,
# but this code should be safe on all environments.
if auth is not None:
cert_file, chain_file = acme.obtain_certificate(domains)
if installer is not None and cert_file is not None:
acme.deploy_certificate(domains, privkey, cert_file, chain_file)
if installer is not None:
acme.enhance_config(domains, args.redirect)
def display_eula():
@@ -137,8 +145,7 @@ def choose_names(installer):
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
# This function adds all names
# found within the config to self.names
# This function adds all names found in the installer configuration
# Then filters them based on user selection
code, names = zope.component.getUtility(
interfaces.IDisplay).filter_names(get_all_names(installer))
@@ -175,7 +182,7 @@ def determine_authenticator():
if interfaces.IAuthenticator.implementedBy(
configurator.ApacheConfigurator):
return configurator.ApacheConfigurator()
except errors.LetsEncryptConfiguratorError:
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to determine a way to authenticate the server")
@@ -185,7 +192,7 @@ def determine_installer():
if interfaces.IInstaller.implementedBy(
configurator.ApacheConfigurator):
return configurator.ApacheConfigurator()
except errors.LetsEncryptConfiguratorError:
except errors.LetsEncryptNoInstallationError:
logging.info("Unable to find a way to install the certificate.")
@@ -213,31 +220,62 @@ def rollback(checkpoints):
to do their configuration changes, the correct reverter will have to be
determined.
.. note:: This function restarts the server even if there weren't any
rollbacks. The user may be confused or made an error and simply needs
to restart the server.
.. todo:: This function will have to change depending on the functionality
of future installers. Perhaps the interface should define errors that
are thrown for the various functions.
:param int checkpoints: Number of checkpoints to revert.
"""
# Misconfigurations are only a slight problems... allow the user to rollback
try:
installer = determine_installer()
installer.rollback_checkpoints(checkpoints)
installer.restart()
except errors.LetsEncryptMisconfigurationError:
logging.warn("Installer is misconfigured before rollback.")
logging.info("Rolling back using Reverter module")
# recovery routine has already been run by installer __init__ attempt
reverter.Reverter().rollback_checkpoints(checkpoints)
yes = zope.component.getUtility(interfaces.IDisplay).generic_yesno(
"Oh, no! The web server is currently misconfigured.{0}{0}"
"Would you still like to rollback the "
"configuration?".format(os.linesep))
if not yes:
logging.info("The error message is above.")
logging.info(
"Configuration was not rolled back.".format(os.linesep))
return
logging.info("Rolling back using the Reverter module")
# recovery routine has probably already been run by installer
# in the__init__ attempt, run it again for safety... it shouldn't hurt
# Also... not sure how future installers will handle recovery.
rev = reverter.Reverter()
rev.recovery_routine()
rev.rollback_checkpoints(checkpoints)
# We should try to restart the server
try:
installer = determine_installer()
installer.restart()
logging.info("Rollback solved misconfiguration!")
logging.info("Hooray! Rollback solved the misconfiguration!")
logging.info("Your web server is back up and running.")
except errors.LetsEncryptMisconfigurationError:
logging.warn("Rollback was unable to solve misconfiguration issues")
logging.warning("Rollback was unable to solve the "
"misconfiguration issues")
finally:
return
# No Errors occurred during init... proceed normally
# If installer is None... couldn't find an installer... there shouldn't be
# anything to rollback
if installer is not None:
installer.rollback_checkpoints(checkpoints)
installer.restart()
def revoke(server):
"""Revoke certificates.
:param str server: ACME server client wishes to revoke certificates from
:param str server: ACME server the client wishes to revoke certificates from
"""
# Misconfigurations don't really matter. Determine installer better choose
@@ -245,7 +283,9 @@ def revoke(server):
try:
installer = determine_installer()
except errors.LetsEncryptMisconfigurationError:
logging.warn("Installer is currently misconfigured.")
logging.warning("The web server is currently misconfigured. Some "
"abilities like seeing which certificates are currently"
" installed may not be available at this time.")
revoc = revoker.Revoker(server, installer)
revoc.list_certs_keys()