1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Refactoring for better logging (#4444)

* Move colored_logging.py to log.py

* Add atexit.register code to util

* Add tests for atexit_register

* Copy except_hook to log

* Add pre_arg_setup

* move setup_log_file_handler to log.py

* Add post_arg_setup

* move changes to main

* Undo changes to MainTest

* s/pre_arg_setup/pre_arg_parse_setup

* s/post_arg_setup/post_arg_parse_setup
This commit is contained in:
Brad Warren
2017-03-30 16:17:57 -07:00
committed by GitHub
parent a542fcd019
commit e194e0dd5f
10 changed files with 486 additions and 399 deletions

View File

@@ -1,45 +0,0 @@
"""A formatter and StreamHandler for colorizing logging output."""
import logging
import sys
from certbot import util
class StreamHandler(logging.StreamHandler):
"""Sends colored logging output to a stream.
If the specified stream is not a tty, the class works like the
standard logging.StreamHandler. Default red_level is logging.WARNING.
:ivar bool colored: True if output should be colored
:ivar bool red_level: The level at which to output
"""
def __init__(self, stream=None):
if sys.version_info < (2, 7):
# pragma: no cover
# pylint: disable=non-parent-init-called
logging.StreamHandler.__init__(self, stream)
else:
super(StreamHandler, self).__init__(stream)
self.colored = (sys.stderr.isatty() if stream is None else
stream.isatty())
self.red_level = logging.WARNING
def format(self, record):
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
:returns: Formatted, string representation of record
:rtype: str
"""
out = (logging.StreamHandler.format(self, record)
if sys.version_info < (2, 7)
else super(StreamHandler, self).format(record))
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out

180
certbot/log.py Normal file
View File

@@ -0,0 +1,180 @@
"""Logging utilities for Certbot."""
from __future__ import print_function
import functools
import logging
import os
import sys
import time
import traceback
from acme import messages
from certbot import cli
from certbot import constants
from certbot import errors
from certbot import util
# Logging format
CLI_FMT = "%(message)s"
FILE_FMT = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
logger = logging.getLogger(__name__)
def pre_arg_parse_setup():
"""Ensures fatal exceptions are logged and reported to the user."""
sys.excepthook = functools.partial(except_hook, config=None)
def post_arg_parse_setup(config):
"""Setup logging after command line arguments are parsed.
:param certbot.interface.IConfig config: Configuration object
"""
file_handler, file_path = setup_log_file_handler(
config, 'letsencrypt.log', FILE_FMT)
if config.quiet:
level = constants.QUIET_LOGGING_LEVEL
else:
level = -config.verbose_count * 10
stderr_handler = ColoredStreamHandler()
stderr_handler.setFormatter(logging.Formatter(CLI_FMT))
stderr_handler.setLevel(level)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG) # send all records to handlers
root_logger.addHandler(stderr_handler)
root_logger.addHandler(file_handler)
logger.debug('Root logging level set at %d', level)
logger.info('Saving debug log to %s', file_path)
sys.excepthook = functools.partial(except_hook, config=config)
def setup_log_file_handler(config, logfile, fmt):
"""Setup file debug logging.
:param certbot.interface.IConfig config: Configuration object
:param str logfile: basename for the log file
:param str fmt: logging format string
:returns: file handler and absolute path to the log file
:rtype: tuple
"""
# TODO: logs might contain sensitive data such as contents of the
# private key! #525
util.make_or_verify_core_dir(
config.logs_dir, 0o700, os.geteuid(), config.strict_permissions)
log_file_path = os.path.join(config.logs_dir, logfile)
try:
handler = logging.handlers.RotatingFileHandler(
log_file_path, maxBytes=2 ** 20, backupCount=1000)
except IOError as error:
raise errors.Error(util.PERM_ERR_FMT.format(error))
# rotate on each invocation, rollover only possible when maxBytes
# is nonzero and backupCount is nonzero, so we set maxBytes as big
# as possible not to overrun in single CLI invocation (1MB).
handler.doRollover() # TODO: creates empty letsencrypt.log.1 file
handler.setLevel(logging.DEBUG)
handler_formatter = logging.Formatter(fmt=fmt)
handler_formatter.converter = time.gmtime # don't use localtime
handler.setFormatter(handler_formatter)
return handler, log_file_path
class ColoredStreamHandler(logging.StreamHandler):
"""Sends colored logging output to a stream.
If the specified stream is not a tty, the class works like the
standard logging.StreamHandler. Default red_level is logging.WARNING.
:ivar bool colored: True if output should be colored
:ivar bool red_level: The level at which to output
"""
def __init__(self, stream=None):
if sys.version_info < (2, 7):
# pragma: no cover
# pylint: disable=non-parent-init-called
logging.StreamHandler.__init__(self, stream)
else:
super(ColoredStreamHandler, self).__init__(stream)
self.colored = (sys.stderr.isatty() if stream is None else
stream.isatty())
self.red_level = logging.WARNING
def format(self, record):
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
:returns: Formatted, string representation of record
:rtype: str
"""
out = (logging.StreamHandler.format(self, record)
if sys.version_info < (2, 7)
else super(ColoredStreamHandler, self).format(record))
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out
def except_hook(exc_type, exc_value, trace, config):
"""Logs exceptions and reports them to the user.
Config is used to determine how to display exceptions to the user. In
general, if config.debug is True, then the full exception and traceback is
shown to the user, otherwise it is suppressed. If config itself is None,
then the traceback and exception is attempted to be written to a logfile.
If this is successful, the traceback is suppressed, otherwise it is shown
to the user. sys.exit is always called with a nonzero status.
"""
tb_str = "".join(traceback.format_exception(exc_type, exc_value, trace))
logger.debug("Exiting abnormally:%s%s", os.linesep, tb_str)
if issubclass(exc_type, Exception) and (config is None or not config.debug):
if config is None:
logfile = "certbot.log"
try:
with open(logfile, "w") as logfd:
traceback.print_exception(
exc_type, exc_value, trace, file=logfd)
assert "--debug" not in sys.argv # config is None if this explodes
except: # pylint: disable=bare-except
sys.exit(tb_str)
if "--debug" in sys.argv:
sys.exit(tb_str)
if issubclass(exc_type, errors.Error):
sys.exit(exc_value)
else:
# Here we're passing a client or ACME error out to the client at the shell
# Tell the user a bit about what happened, without overwhelming
# them with a full traceback
err = traceback.format_exception_only(exc_type, exc_value)[0]
# Typical error from the ACME module:
# acme.messages.Error: urn:ietf:params:acme:error:malformed :: The
# request message was malformed :: Error creating new registration
# :: Validation of contact mailto:none@longrandomstring.biz failed:
# Server failure at resolver
if (messages.is_acme_error(err) and ":: " in err and
config.verbose_count <= cli.flag_default("verbose_count")):
# prune ACME error code, we have a human description
_code, _sep, err = err.partition(":: ")
msg = "An unexpected error occurred:\n" + err + "Please see the "
if config is None:
msg += "logfile '{0}' for more details.".format(logfile)
else:
msg += "logfiles in {0} for more details.".format(config.logs_dir)
sys.exit(msg)
else:
sys.exit(tb_str)

View File

@@ -1,47 +1,37 @@
"""Certbot main entry point."""
from __future__ import print_function
import atexit
import functools
import logging.handlers
import os
import sys
import time
import traceback
import zope.component
from acme import jose
from acme import messages
from acme import errors as acme_errors
import certbot
from certbot import account
from certbot import cert_manager
from certbot import client
from certbot import cli
from certbot import crypto_util
from certbot import colored_logging
from certbot import client
from certbot import configuration
from certbot import constants
from certbot import crypto_util
from certbot import eff
from certbot import errors
from certbot import hooks
from certbot import interfaces
from certbot import util
from certbot import reporter
from certbot import log
from certbot import renewal
from certbot import reporter
from certbot import util
from certbot.display import util as display_util, ops as display_ops
from certbot.plugins import disco as plugins_disco
from certbot.plugins import selection as plug_sel
_PERM_ERR_FMT = os.linesep.join((
"The following error was encountered:", "{0}",
"If running as non-root, set --config-dir, "
"--work-dir, and --logs-dir to writeable paths."))
USER_CANCELLED = ("User chose to cancel the operation and may "
"reinvoke the client.")
@@ -704,138 +694,11 @@ def renew(config, unused_plugins):
hooks.run_saved_post_hooks()
def setup_log_file_handler(config, logfile, fmt):
"""Setup file debug logging."""
log_file_path = os.path.join(config.logs_dir, logfile)
try:
handler = logging.handlers.RotatingFileHandler(
log_file_path, maxBytes=2 ** 20, backupCount=1000)
except IOError as error:
raise errors.Error(_PERM_ERR_FMT.format(error))
# rotate on each invocation, rollover only possible when maxBytes
# is nonzero and backupCount is nonzero, so we set maxBytes as big
# as possible not to overrun in single CLI invocation (1MB).
handler.doRollover() # TODO: creates empty letsencrypt.log.1 file
handler.setLevel(logging.DEBUG)
handler_formatter = logging.Formatter(fmt=fmt)
handler_formatter.converter = time.gmtime # don't use localtime
handler.setFormatter(handler_formatter)
return handler, log_file_path
def _cli_log_handler(level, fmt):
handler = colored_logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt))
handler.setLevel(level)
return handler
def setup_logging(config):
"""Sets up logging to logfiles and the terminal.
:param certbot.interface.IConfig config: Configuration object
"""
cli_fmt = "%(message)s"
file_fmt = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
logfile = "letsencrypt.log"
if config.quiet:
level = constants.QUIET_LOGGING_LEVEL
else:
level = -config.verbose_count * 10
file_handler, log_file_path = setup_log_file_handler(
config, logfile=logfile, fmt=file_fmt)
cli_handler = _cli_log_handler(level, cli_fmt)
# TODO: use fileConfig?
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG) # send all records to handlers
root_logger.addHandler(cli_handler)
root_logger.addHandler(file_handler)
logger.debug("Root logging level set at %d", level)
logger.info("Saving debug log to %s", log_file_path)
def _handle_exception(exc_type, exc_value, trace, config):
"""Logs exceptions and reports them to the user.
Config is used to determine how to display exceptions to the user. In
general, if config.debug is True, then the full exception and traceback is
shown to the user, otherwise it is suppressed. If config itself is None,
then the traceback and exception is attempted to be written to a logfile.
If this is successful, the traceback is suppressed, otherwise it is shown
to the user. sys.exit is always called with a nonzero status.
"""
tb_str = "".join(traceback.format_exception(exc_type, exc_value, trace))
logger.debug("Exiting abnormally:%s%s", os.linesep, tb_str)
if issubclass(exc_type, Exception) and (config is None or not config.debug):
if config is None:
logfile = "certbot.log"
try:
with open(logfile, "w") as logfd:
traceback.print_exception(
exc_type, exc_value, trace, file=logfd)
assert "--debug" not in sys.argv # config is None if this explodes
except: # pylint: disable=bare-except
sys.exit(tb_str)
if "--debug" in sys.argv:
sys.exit(tb_str)
if issubclass(exc_type, errors.Error):
sys.exit(exc_value)
else:
# Here we're passing a client or ACME error out to the client at the shell
# Tell the user a bit about what happened, without overwhelming
# them with a full traceback
err = traceback.format_exception_only(exc_type, exc_value)[0]
# Typical error from the ACME module:
# acme.messages.Error: urn:ietf:params:acme:error:malformed :: The
# request message was malformed :: Error creating new registration
# :: Validation of contact mailto:none@longrandomstring.biz failed:
# Server failure at resolver
if (messages.is_acme_error(err) and ":: " in err and
config.verbose_count <= cli.flag_default("verbose_count")):
# prune ACME error code, we have a human description
_code, _sep, err = err.partition(":: ")
msg = "An unexpected error occurred:\n" + err + "Please see the "
if config is None:
msg += "logfile '{0}' for more details.".format(logfile)
else:
msg += "logfiles in {0} for more details.".format(config.logs_dir)
sys.exit(msg)
else:
sys.exit(tb_str)
def make_or_verify_core_dir(directory, mode, uid, strict):
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:param bool strict: require directory to be owned by current user
:raises .errors.Error: if the directory cannot be made or verified
"""
try:
util.make_or_verify_dir(directory, mode, uid, strict)
except OSError as error:
raise errors.Error(_PERM_ERR_FMT.format(error))
def make_or_verify_needed_dirs(config):
"""Create or verify existence of config, work, or logs directories"""
make_or_verify_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE,
"""Create or verify existence of config and work directories"""
util.make_or_verify_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
make_or_verify_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
# TODO: logs might contain sensitive data such as contents of the
# private key! #525
make_or_verify_core_dir(config.logs_dir, 0o700,
util.make_or_verify_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
@@ -868,7 +731,7 @@ def _post_logging_setup(config, plugins, cli_args):
def main(cli_args=sys.argv[1:]):
"""Command line argument parsing and main script execution."""
sys.excepthook = functools.partial(_handle_exception, config=None)
log.pre_arg_parse_setup()
plugins = plugins_disco.PluginsRegistry.find_all()
# note: arg parser internally handles --help (and exits afterwards)
@@ -878,20 +741,16 @@ def main(cli_args=sys.argv[1:]):
make_or_verify_needed_dirs(config)
# Setup logging ASAP, otherwise "No handlers could be found for
# logger ..." TODO: this should be done before plugins discovery
setup_logging(config)
log.post_arg_parse_setup(config)
_post_logging_setup(config, plugins, cli_args)
sys.excepthook = functools.partial(_handle_exception, config=config)
set_displayer(config)
# Reporter
report = reporter.Reporter(config)
zope.component.provideUtility(report)
atexit.register(report.atexit_print_messages)
util.atexit_register(report.print_messages)
return config.func(config, plugins)

View File

@@ -3,7 +3,6 @@ from __future__ import print_function
import collections
import logging
import os
import sys
import textwrap
@@ -16,11 +15,6 @@ from certbot import util
logger = logging.getLogger(__name__)
# Store the pid of the process that first imported this module so that
# atexit_print_messages side-effects such as error reporting can be limited to
# this process and not any fork()'d children.
INITIAL_PID = os.getpid()
@zope.interface.implementer(interfaces.IReporter)
class Reporter(object):
@@ -60,19 +54,6 @@ class Reporter(object):
self.messages.put(self._msg_type(priority, msg, on_crash))
logger.debug("Reporting to user: %s", msg)
def atexit_print_messages(self, pid=None):
"""Function to be registered with atexit to print messages.
:param int pid: Process ID
"""
if pid is None:
pid = INITIAL_PID
# This ensures that messages are only printed from the process that
# created the Reporter.
if pid == os.getpid():
self.print_messages()
def print_messages(self):
"""Prints messages to the user and clears the message queue.

View File

@@ -1,41 +0,0 @@
"""Tests for certbot.colored_logging."""
import logging
import unittest
import six
from certbot import util
class StreamHandlerTest(unittest.TestCase):
"""Tests for certbot.colored_logging."""
def setUp(self):
from certbot import colored_logging
self.stream = six.StringIO()
self.stream.isatty = lambda: True
self.handler = colored_logging.StreamHandler(self.stream)
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(self.handler)
def test_format(self):
msg = 'I did a thing'
self.logger.debug(msg)
self.assertEqual(self.stream.getvalue(), '{0}\n'.format(msg))
def test_format_and_red_level(self):
msg = 'I did another thing'
self.handler.red_level = logging.DEBUG
self.logger.debug(msg)
self.assertEqual(self.stream.getvalue(),
'{0}{1}{2}\n'.format(util.ANSI_SGR_RED,
msg,
util.ANSI_SGR_RESET))
if __name__ == "__main__":
unittest.main() # pragma: no cover

199
certbot/tests/log_test.py Normal file
View File

@@ -0,0 +1,199 @@
"""Tests for certbot.log."""
import logging
import traceback
import logging.handlers
import os
import sys
import time
import unittest
import mock
import six
from acme import messages
from certbot import constants
from certbot import errors
from certbot import util
from certbot.tests import util as test_util
class PreArgParseSetupTest(unittest.TestCase):
"""Tests for certbot.log.pre_arg_parse_setup."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.log import pre_arg_parse_setup
return pre_arg_parse_setup(*args, **kwargs)
def test_it(self):
with mock.patch('certbot.log.except_hook') as mock_except_hook:
with mock.patch('certbot.log.sys') as mock_sys:
self._call()
mock_sys.excepthook(1, 2, 3)
mock_except_hook.assert_called_once_with(1, 2, 3, config=None)
class PostArgParseSetupTest(test_util.TempDirTestCase):
"""Tests for certbot.log.post_arg_parse_setup."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.log import post_arg_parse_setup
return post_arg_parse_setup(*args, **kwargs)
def setUp(self):
super(PostArgParseSetupTest, self).setUp()
self.config = mock.MagicMock(
logs_dir=self.tempdir, quiet=False,
verbose_count=constants.CLI_DEFAULTS['verbose_count'])
self.root_logger = mock.MagicMock()
def test_common(self):
with mock.patch('certbot.log.logging.getLogger') as mock_get_logger:
mock_get_logger.return_value = self.root_logger
with mock.patch('certbot.log.except_hook') as mock_except_hook:
with mock.patch('certbot.log.sys') as mock_sys:
mock_sys.version_info = sys.version_info
self._call(self.config)
self.assertEqual(self.root_logger.addHandler.call_count, 2)
self.assertTrue(os.path.exists(os.path.join(
self.config.logs_dir, 'letsencrypt.log')))
mock_sys.excepthook(1, 2, 3)
mock_except_hook.assert_called_once_with(1, 2, 3, config=self.config)
stderr_handler = self.root_logger.addHandler.call_args_list[0][0][0]
level = stderr_handler.level
if self.config.quiet:
self.assertEqual(level, constants.QUIET_LOGGING_LEVEL)
else:
self.assertEqual(level, -self.config.verbose_count * 10)
def test_quiet(self):
self.config.quiet = True
self.test_common()
class SetupLogFileHandlerTest(test_util.TempDirTestCase):
"""Tests for certbot.log.setup_log_file_handler."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.log import setup_log_file_handler
return setup_log_file_handler(*args, **kwargs)
def setUp(self):
super(SetupLogFileHandlerTest, self).setUp()
self.config = mock.MagicMock(logs_dir=self.tempdir)
@mock.patch('certbot.main.logging.handlers.RotatingFileHandler')
def test_failure(self, mock_handler):
mock_handler.side_effect = IOError
try:
self._call(self.config, 'test.log', '%(message)s')
except errors.Error as err:
self.assertTrue('--logs-dir' in str(err))
else: # pragma: no cover
self.fail('Error not raised.')
def test_success(self):
log_file = 'test.log'
handler, log_path = self._call(self.config, log_file, '%(message)s')
self.assertEqual(handler.level, logging.DEBUG)
self.assertEqual(handler.formatter.converter, time.gmtime)
expected_path = os.path.join(self.config.logs_dir, log_file)
self.assertEqual(log_path, expected_path)
class ColoredStreamHandlerTest(unittest.TestCase):
"""Tests for certbot.log."""
def setUp(self):
from certbot import log
self.stream = six.StringIO()
self.stream.isatty = lambda: True
self.handler = log.ColoredStreamHandler(self.stream)
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(self.handler)
def test_format(self):
msg = 'I did a thing'
self.logger.debug(msg)
self.assertEqual(self.stream.getvalue(), '{0}\n'.format(msg))
def test_format_and_red_level(self):
msg = 'I did another thing'
self.handler.red_level = logging.DEBUG
self.logger.debug(msg)
self.assertEqual(self.stream.getvalue(),
'{0}{1}{2}\n'.format(util.ANSI_SGR_RED,
msg,
util.ANSI_SGR_RESET))
class ExceptHookTest(unittest.TestCase):
"""Tests for certbot.log.except_hook."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.log import except_hook
return except_hook(*args, **kwargs)
@mock.patch('certbot.log.sys')
def test_except_hook(self, mock_sys):
config = mock.MagicMock()
mock_open = mock.mock_open()
with mock.patch('certbot.log.open', mock_open, create=True):
exception = Exception('detail')
config.verbose_count = 1
self._call(
Exception, exc_value=exception, trace=None, config=None)
mock_open().write.assert_any_call(''.join(
traceback.format_exception_only(Exception, exception)))
error_msg = mock_sys.exit.call_args_list[0][0][0]
self.assertTrue('unexpected error' in error_msg)
with mock.patch('certbot.log.open', mock_open, create=True):
mock_open.side_effect = [KeyboardInterrupt]
error = errors.Error('detail')
self._call(
errors.Error, exc_value=error, trace=None, config=None)
# assert_any_call used because sys.exit doesn't exit in cli.py
mock_sys.exit.assert_any_call(''.join(
traceback.format_exception_only(errors.Error, error)))
bad_typ = messages.ERROR_PREFIX + 'triffid'
exception = messages.Error(detail='alpha', typ=bad_typ, title='beta')
config = mock.MagicMock(debug=False, verbose_count=-3)
self._call(
messages.Error, exc_value=exception, trace=None, config=config)
error_msg = mock_sys.exit.call_args_list[-1][0][0]
self.assertTrue('unexpected error' in error_msg)
self.assertTrue('acme:error' not in error_msg)
self.assertTrue('alpha' in error_msg)
self.assertTrue('beta' in error_msg)
config = mock.MagicMock(debug=False, verbose_count=1)
self._call(
messages.Error, exc_value=exception, trace=None, config=config)
error_msg = mock_sys.exit.call_args_list[-1][0][0]
self.assertTrue('unexpected error' in error_msg)
self.assertTrue('acme:error' in error_msg)
self.assertTrue('alpha' in error_msg)
interrupt = KeyboardInterrupt('detail')
self._call(
KeyboardInterrupt, exc_value=interrupt, trace=None, config=None)
mock_sys.exit.assert_called_with(''.join(
traceback.format_exception_only(KeyboardInterrupt, interrupt)))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -18,7 +18,6 @@ from acme import jose
from certbot import account
from certbot import cli
from certbot import colored_logging
from certbot import constants
from certbot import configuration
from certbot import crypto_util
@@ -288,81 +287,6 @@ class RevokeTest(test_util.TempDirTestCase):
self.mock_success_revoke.assert_not_called()
class SetupLogFileHandlerTest(test_util.TempDirTestCase):
"""Tests for certbot.main.setup_log_file_handler."""
def setUp(self):
super(SetupLogFileHandlerTest, self).setUp()
self.config = mock.Mock(spec_set=['logs_dir'],
logs_dir=self.tempdir)
def _call(self, *args, **kwargs):
from certbot.main import setup_log_file_handler
return setup_log_file_handler(*args, **kwargs)
@mock.patch('certbot.main.logging.handlers.RotatingFileHandler')
def test_ioerror(self, mock_handler):
mock_handler.side_effect = IOError
self.assertRaises(errors.Error, self._call,
self.config, "test.log", "%s")
class SetupLoggingTest(test_util.TempDirTestCase):
"""Tests for certbot.main.setup_logging."""
def setUp(self):
super(SetupLoggingTest, self).setUp()
self.config = mock.Mock(
logs_dir=self.tempdir,
noninteractive_mode=False, quiet=False,
verbose_count=constants.CLI_DEFAULTS['verbose_count'])
@classmethod
def _call(cls, *args, **kwargs):
from certbot.main import setup_logging
return setup_logging(*args, **kwargs)
@mock.patch('certbot.main.logging.getLogger')
def test_defaults(self, mock_get_logger):
self._call(self.config)
cli_handler = mock_get_logger().addHandler.call_args_list[0][0][0]
self.assertEqual(cli_handler.level, -self.config.verbose_count * 10)
self.assertTrue(
isinstance(cli_handler, colored_logging.StreamHandler))
@mock.patch('certbot.main.logging.getLogger')
def test_quiet_mode(self, mock_get_logger):
self.config.quiet = self.config.noninteractive_mode = True
self._call(self.config)
cli_handler = mock_get_logger().addHandler.call_args_list[0][0][0]
self.assertEqual(cli_handler.level, constants.QUIET_LOGGING_LEVEL)
self.assertTrue(
isinstance(cli_handler, colored_logging.StreamHandler))
class MakeOrVerifyCoreDirTest(test_util.TempDirTestCase):
"""Tests for certbot.main.make_or_verify_core_dir."""
def _call(self, *args, **kwargs):
from certbot.main import make_or_verify_core_dir
return make_or_verify_core_dir(*args, **kwargs)
def test_success(self):
new_dir = os.path.join(self.tempdir, 'new')
self._call(new_dir, 0o700, os.geteuid(), False)
self.assertTrue(os.path.exists(new_dir))
@mock.patch('certbot.main.util.make_or_verify_dir')
def test_failure(self, mock_make_or_verify):
mock_make_or_verify.side_effect = OSError
self.assertRaises(errors.Error, self._call,
self.tempdir, 0o700, os.geteuid(), False)
class DetermineAccountTest(unittest.TestCase):
"""Tests for certbot.main._determine_account."""
@@ -1249,59 +1173,5 @@ class UnregisterTest(unittest.TestCase):
self.assertFalse(acme_client.acme.deactivate_registration.called)
class TestHandleException(unittest.TestCase):
"""Test main._handle_exception"""
@mock.patch('certbot.main.sys')
def test_handle_exception(self, mock_sys):
# pylint: disable=protected-access
from acme import messages
config = mock.MagicMock()
mock_open = mock.mock_open()
with mock.patch('certbot.main.open', mock_open, create=True):
exception = Exception('detail')
config.verbose_count = 1
main._handle_exception(
Exception, exc_value=exception, trace=None, config=None)
mock_open().write.assert_any_call(''.join(
traceback.format_exception_only(Exception, exception)))
error_msg = mock_sys.exit.call_args_list[0][0][0]
self.assertTrue('unexpected error' in error_msg)
with mock.patch('certbot.main.open', mock_open, create=True):
mock_open.side_effect = [KeyboardInterrupt]
error = errors.Error('detail')
main._handle_exception(
errors.Error, exc_value=error, trace=None, config=None)
# assert_any_call used because sys.exit doesn't exit in cli.py
mock_sys.exit.assert_any_call(''.join(
traceback.format_exception_only(errors.Error, error)))
bad_typ = messages.ERROR_PREFIX + 'triffid'
exception = messages.Error(detail='alpha', typ=bad_typ, title='beta')
config = mock.MagicMock(debug=False, verbose_count=-3)
main._handle_exception(
messages.Error, exc_value=exception, trace=None, config=config)
error_msg = mock_sys.exit.call_args_list[-1][0][0]
self.assertTrue('unexpected error' in error_msg)
self.assertTrue('acme:error' not in error_msg)
self.assertTrue('alpha' in error_msg)
self.assertTrue('beta' in error_msg)
config = mock.MagicMock(debug=False, verbose_count=1)
main._handle_exception(
messages.Error, exc_value=exception, trace=None, config=config)
error_msg = mock_sys.exit.call_args_list[-1][0][0]
self.assertTrue('unexpected error' in error_msg)
self.assertTrue('acme:error' in error_msg)
self.assertTrue('alpha' in error_msg)
interrupt = KeyboardInterrupt('detail')
main._handle_exception(
KeyboardInterrupt, exc_value=interrupt, trace=None, config=None)
mock_sys.exit.assert_called_with(''.join(
traceback.format_exception_only(KeyboardInterrupt, interrupt)))
if __name__ == '__main__':
unittest.main() # pragma: no cover

View File

@@ -38,18 +38,6 @@ class ReporterTest(unittest.TestCase):
self.reporter.print_messages()
self.assertEqual(sys.stdout.getvalue(), "")
@mock.patch('certbot.reporter.os.getpid')
def test_atexit_print_messages(self, mock_getpid):
self._add_messages()
mock_getpid.return_value = 42
with mock.patch('certbot.reporter.INITIAL_PID', 42):
self.reporter.atexit_print_messages()
output = sys.stdout.getvalue()
self.assertTrue("IMPORTANT NOTES:" in output)
self.assertTrue("High" in output)
self.assertTrue("Med" in output)
self.assertTrue("Low" in output)
def test_tty_successful_exit(self):
sys.stdout.isatty = lambda: True
self._successful_exit_common()

View File

@@ -73,6 +73,25 @@ class ExeExistsTest(unittest.TestCase):
self.assertFalse(self._call("exe"))
class MakeOrVerifyCoreDirTest(test_util.TempDirTestCase):
"""Tests for certbot.util.make_or_verify_core_dir."""
def _call(self, *args, **kwargs):
from certbot.util import make_or_verify_core_dir
return make_or_verify_core_dir(*args, **kwargs)
def test_success(self):
new_dir = os.path.join(self.tempdir, 'new')
self._call(new_dir, 0o700, os.geteuid(), False)
self.assertTrue(os.path.exists(new_dir))
@mock.patch('certbot.main.util.make_or_verify_dir')
def test_failure(self, mock_make_or_verify):
mock_make_or_verify.side_effect = OSError
self.assertRaises(errors.Error, self._call,
self.tempdir, 0o700, os.geteuid(), False)
class MakeOrVerifyDirTest(test_util.TempDirTestCase):
"""Tests for certbot.util.make_or_verify_dir.
@@ -473,5 +492,37 @@ class OsInfoTest(unittest.TestCase):
("windows", "95"))
class AtexitRegisterTest(unittest.TestCase):
"""Tests for certbot.util.atexit_register."""
def setUp(self):
self.func = mock.MagicMock()
self.args = ('hi',)
self.kwargs = {'answer': 42}
@classmethod
def _call(cls, *args, **kwargs):
from certbot.util import atexit_register
return atexit_register(*args, **kwargs)
def test_called(self):
self._test_common(os.getpid())
self.func.assert_called_with(*self.args, **self.kwargs)
def test_not_called(self):
self._test_common(initial_pid=-1)
self.assertFalse(self.func.called)
def _test_common(self, initial_pid):
with mock.patch('certbot.util._INITIAL_PID', initial_pid):
with mock.patch('certbot.util.atexit') as mock_atexit:
self._call(self.func, *self.args, **self.kwargs)
# _INITAL_PID must be mocked when calling atexit_func
self.assertTrue(mock_atexit.register.called)
args, kwargs = mock_atexit.register.call_args
atexit_func = args[0]
atexit_func(*args[1:], **kwargs) # pylint: disable=star-args
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -1,5 +1,6 @@
"""Utilities for all Certbot."""
import argparse
import atexit
import collections
# distutils.version under virtualenv confuses pylint
# For more info, see: https://github.com/PyCQA/pylint/issues/73
@@ -38,6 +39,16 @@ ANSI_SGR_RED = "\033[31m"
ANSI_SGR_RESET = "\033[0m"
PERM_ERR_FMT = os.linesep.join((
"The following error was encountered:", "{0}",
"If running as non-root, set --config-dir, "
"--work-dir, and --logs-dir to writeable paths."))
# Stores importing process ID to be used by atexit_register()
_INITIAL_PID = os.getpid()
def run_script(params, log=logger.error):
"""Run the script with the given params.
@@ -92,6 +103,23 @@ def exe_exists(exe):
return False
def make_or_verify_core_dir(directory, mode, uid, strict):
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:param bool strict: require directory to be owned by current user
:raises .errors.Error: if the directory cannot be made or verified
"""
try:
make_or_verify_dir(directory, mode, uid, strict)
except OSError as error:
raise errors.Error(PERM_ERR_FMT.format(error))
def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False):
"""Make sure directory exists with proper permissions.
@@ -533,3 +561,20 @@ def is_staging(srv):
:rtype bool:
"""
return srv == constants.STAGING_URI or "staging" in srv
def atexit_register(func, *args, **kwargs):
"""Sets func to be called before the program exits.
Special care is taken to ensure func is only called when the process
that first imports this module exits rather than any child processes.
:param function func: function to be called in case of an error
"""
atexit.register(_atexit_call, func, *args, **kwargs)
def _atexit_call(func, *args, **kwargs):
if _INITIAL_PID == os.getpid():
func(*args, **kwargs)