1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Fixing up postfix plugin

- Finishing refactor of postconf/postfix command-line utilities
 - Plugin uses starttls_policy plugin to specify per-domain policies

Cleaning up TLS policy code.

Print warning when setting configuration parameter that is overridden by master.

Update client to use new policy API

Cleanup and test fixes

Documentation fix

smaller fixes

Policy is now an enhancement and reverting works

Added a README, and small documentation fixes throughout

Moving testing infra from starttls repo to certbot-postfix

fixing tests and lint

Changes against new policy API

starttls-everywhere => starttls-policy

testing(postfix): Added more varieties of certificates to test against.

Moar fixes against policy API.

Address comments on README and setup.py

Address small comments on postconf and util

Address comments in installer

Python 3 fixes and Postconf tester extends TempDir test class

Mock out postconf calls from tests and test coverage for master overrides

More various fixes. Everything minus testing done

Remove STARTTLS policy enhancement from this branch.

sphinx quickstart

99% test coverage

some cleanup and testfixing

cleanup leftover files

Remove print statement

testfix for python 3.4

Revert dockerfile change

mypy fix

fix(postfix): brad's comments

test(postfix): coverage to 100

test(postfix): mypy

import mypy types

fix(postfix docs): add .rst files and fix build

fix(postfix): tls_only and server_only params behave nicely together

some cleanup

lint

fix more comments

bump version number
This commit is contained in:
Sydney Li
2018-02-14 16:20:16 -08:00
committed by sydneyli
parent 5025b4ea96
commit 4ba153949d
24 changed files with 1438 additions and 938 deletions

View File

@@ -1,2 +1,4 @@
include LICENSE.txt
include README.rst
recursive-include certbot_postfix/testdata *
recursive-include certbot_postfix/docs *

View File

@@ -1 +1,10 @@
==========================
Postfix plugin for Certbot
==========================
To install your certs with this plugin, run:
``certbot install --installer postfix --cert-path <path to cert> --key-path <path to key> -d <MX hostname>``
And there you go! If you'd like to obtain these certificates via certbot, there's more documentation on how to do this `here <https://certbot.eff.org/docs/using.html#getting-certificates-and-choosing-plugins>`_.

View File

@@ -0,0 +1,63 @@
"""Postfix plugin constants."""
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Dict, Tuple, Union
# pylint: enable=unused-import, no-name-in-module
MINIMUM_VERSION = (2, 11,)
# If the value of a default VAR is a tuple, then the values which
# come LATER in the tuple are more strict/more secure.
# Certbot will default to the first value in the tuple, but will
# not override "more secure" settings.
ACCEPTABLE_SERVER_SECURITY_LEVELS = ("may", "encrypt")
ACCEPTABLE_CLIENT_SECURITY_LEVELS = ("may", "encrypt",
"dane", "dane-only",
"fingerprint",
"verify", "secure")
ACCEPTABLE_CIPHER_LEVELS = ("medium", "high")
# Exporting certain ciphers to prevent logjam: https://weakdh.org/sysadmin.html
EXCLUDE_CIPHERS = ("aNULL, eNULL, EXPORT, DES, RC4, MD5, PSK, aECDH, "
"EDH-DSS-DES-CBC3-SHA, EDH-RSA-DES-CBC3-SHA, KRB5-DES, CBC3-SHA",)
TLS_VERSIONS = ("SSLv2", "SSLv3", "TLSv1", "TLSv1.1", "TLSv1.2")
# Should NOT use SSLv2/3.
ACCEPTABLE_TLS_VERSIONS = ("TLSv1", "TLSv1.1", "TLSv1.2")
# Variables associated with enabling opportunistic TLS.
TLS_SERVER_VARS = {
"smtpd_tls_security_level": ACCEPTABLE_SERVER_SECURITY_LEVELS,
} # type:Dict[str, Tuple[str, ...]]
TLS_CLIENT_VARS = {
"smtp_tls_security_level": ACCEPTABLE_CLIENT_SECURITY_LEVELS,
} # type:Dict[str, Tuple[str, ...]]
# Default variables for a secure MTA server [receiver].
DEFAULT_SERVER_VARS = {
"smtpd_tls_auth_only": ("yes",),
"smtpd_tls_mandatory_protocols": ("!SSLv2, !SSLv3",),
"smtpd_tls_protocols": ("!SSLv2, !SSLv3",),
"smtpd_tls_ciphers": ACCEPTABLE_CIPHER_LEVELS,
"smtpd_tls_mandatory_ciphers": ACCEPTABLE_CIPHER_LEVELS,
"smtpd_tls_exclude_ciphers": EXCLUDE_CIPHERS,
"smtpd_tls_eecdh_grade": ("strong",),
} # type:Dict[str, Tuple[str, ...]]
# Default variables for a secure MTA client [sender].
DEFAULT_CLIENT_VARS = {
"smtp_tls_ciphers": ACCEPTABLE_CIPHER_LEVELS,
"smtp_tls_exclude_ciphers": EXCLUDE_CIPHERS,
"smtp_tls_mandatory_ciphers": ACCEPTABLE_CIPHER_LEVELS,
} # type:Dict[str, Tuple[str, ...]]
CLI_DEFAULTS = dict(
config_dir="/etc/postfix",
ctl="postfix",
config_utility="postconf",
tls_only=False,
ignore_master_overrides=False,
server_only=False,
)
"""CLI defaults."""

View File

@@ -1,127 +1,139 @@
"""Certbot installer plugin for Postfix."""
"""certbot installer plugin for postfix."""
import logging
import os
import subprocess
import zope.interface
import zope.component
import six
from certbot import errors
from certbot import interfaces
from certbot import util as certbot_util
from certbot.plugins import common as plugins_common
from certbot.plugins import util as plugins_util
from certbot_postfix import constants
from certbot_postfix import postconf
from certbot_postfix import util
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Callable, Dict, List
# pylint: enable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class Installer(plugins_common.Installer):
"""Certbot installer plugin for Postfix.
:ivar str config_dir: Postfix configuration directory to modify
:ivar dict proposed_changes: configuration parameters and values to
be written to the Postfix config when save() is called
:ivar list save_notes: documentation for proposed changes. This is
cleared and stored in Certbot checkpoints when save() is called
:ivar postconf: Wrapper for Postfix configuration command-line tool.
:type postconf: :class: `certbot_postfix.postconf.ConfigMain`
:ivar postfix: Wrapper for Postfix command-line tool.
:type postfix: :class: `certbot_postfix.util.PostfixUtil`
"""
description = "Configure TLS with the Postfix MTA"
@classmethod
def add_parser_arguments(cls, add):
add("ctl", default="postfix",
add("ctl", default=constants.CLI_DEFAULTS["ctl"],
help="Path to the 'postfix' control program.")
add("config-dir", help="Path to the directory containing the "
# This directory points to Postfix's configuration directory.
add("config-dir", default=constants.CLI_DEFAULTS["config_dir"],
help="Path to the directory containing the "
"Postfix main.cf file to modify instead of using the "
"default configuration paths.")
add("config-utility", default="postconf",
add("config-utility", default=constants.CLI_DEFAULTS["config_utility"],
help="Path to the 'postconf' executable.")
add("tls-only", action="store_true", default=constants.CLI_DEFAULTS["tls_only"],
help="Only set params to enable opportunistic TLS and install certificates.")
add("server-only", action="store_true", default=constants.CLI_DEFAULTS["server_only"],
help="Only set server params (prefixed with smtpd*)")
add("ignore-master-overrides", action="store_true",
default=constants.CLI_DEFAULTS["ignore_master_overrides"],
help="Ignore errors reporting overridden TLS parameters in master.cf.")
def __init__(self, *args, **kwargs):
super(Installer, self).__init__(*args, **kwargs)
self.config_dir = None
self.proposed_changes = {}
self.save_notes = []
# Wrapper around postconf commands
self.postfix = None
self.postconf = None
# Files to save
self.save_notes = [] # type: List[str]
self._enhance_func = {} # type: Dict[str, Callable[[str, str], None]]
# Since we only need to enable TLS once for all domains,
# keep track of whether this enhancement was already called.
self._tls_enabled = False
def prepare(self):
"""Prepare the installer.
Finish up any additional initialization.
:raises errors.PluginError: when an unexpected error occurs
:raises errors.MisconfigurationError: when the config is invalid
:raises errors.NoInstallationError: when can't find installation
:raises errors.NotSupportedError: when version is not supported
"""
# Verify postfix and postconf are installed
for param in ("ctl", "config_utility",):
self._verify_executable_is_available(param)
self._set_config_dir()
self._check_version()
self.config_test()
self._lock_config_dir()
def _verify_executable_is_available(self, config_name):
"""Asserts the program in the specified config param is found.
:param str config_name: name of the config param
:raises .NoInstallationError: when the executable isn't found
"""
if not certbot_util.exe_exists(self.conf(config_name)):
if not plugins_util.path_surgery(self.conf(config_name)):
raise errors.NoInstallationError(
util.verify_exe_exists(self.conf(param),
"Cannot find executable '{0}'. You can provide the "
"path to this command with --{1}".format(
self.conf(config_name),
self.option_name(config_name)))
self.conf(param),
self.option_name(param)))
def _set_config_dir(self):
"""Ensure self.config_dir is set to the correct path.
# Set up CLI tools
self.postfix = util.PostfixUtil(self.conf('config-dir'))
self.postconf = postconf.ConfigMain(self.conf('config-utility'),
self.conf('ignore-master-overrides'),
self.conf('config-dir'))
If the configuration directory to use was set by the user, we'll
use that value, otherwise, we'll find the default path using
'postconf'.
# Ensure current configuration is valid.
self.config_test()
# Check Postfix version
self._check_version()
self._lock_config_dir()
self.install_ssl_dhparams()
def config_test(self):
"""Test to see that the current Postfix configuration is valid.
:raises errors.MisconfigurationError: If the configuration is invalid.
"""
if self.conf("config-dir") is None:
self.config_dir = self._get_config_var("config_directory")
else:
self.config_dir = self.conf("config-dir")
self.postfix.test()
def _check_version(self):
"""Verifies that the installed Postfix version is supported.
:raises errors.NotSupportedError: if the version is unsupported
"""
if self._get_version() < (2, 6,):
raise errors.NotSupportedError('Postfix version is too old')
if self._get_version() < constants.MINIMUM_VERSION:
version_string = '.'.join([str(n) for n in constants.MINIMUM_VERSION])
raise errors.NotSupportedError('Postfix version must be at least %s' % version_string)
def _lock_config_dir(self):
"""Stop two Postfix plugins from modifying the config at once.
:raises .PluginError: if unable to acquire the lock
"""
try:
certbot_util.lock_dir_until_exit(self.config_dir)
certbot_util.lock_dir_until_exit(self.conf('config-dir'))
except (OSError, errors.LockError):
logger.debug("Encountered error:", exc_info=True)
raise errors.PluginError(
"Unable to lock %s", self.config_dir)
"Unable to lock %s" % self.conf('config-dir'))
def more_info(self):
"""Human-readable string to help the user.
Should describe the steps taken and any relevant info to help the user
decide which plugin to use.
:rtype str:
"""Human-readable string to help the user. Describes steps taken and any relevant
info to help the user decide which plugin to use.
:rtype: str
"""
return (
"Configures Postfix to try to authenticate mail servers, use "
@@ -129,22 +141,19 @@ class Installer(plugins_common.Installer):
"Server root: {root}{0}"
"Version: {version}".format(
os.linesep,
root=self.config_dir,
root=self.conf('config-dir'),
version='.'.join([str(i) for i in self._get_version()]))
)
def _get_version(self):
"""Return the mail version of Postfix.
Version is returned as a tuple. (e.g. '2.11.3' is (2, 11, 3))
"""Return the version of Postfix, as a tuple. (e.g. '2.11.3' is (2, 11, 3))
:returns: version
:rtype: tuple
:raises .PluginError: Unable to find Postfix version.
:raises errors.PluginError: Unable to find Postfix version.
"""
mail_version = self._get_config_default("mail_version")
mail_version = self.postconf.get_default("mail_version")
return tuple(int(i) for i in mail_version.split('.'))
def get_all_names(self):
@@ -153,9 +162,34 @@ class Installer(plugins_common.Installer):
:rtype: `set` of `str`
"""
return set(self._get_config_var(var)
return certbot_util.get_filtered_names(self.postconf.get(var)
for var in ('mydomain', 'myhostname', 'myorigin',))
def _set_vars(self, var_dict):
"""Sets all parameters in var_dict to config file. If current value is already set
as more secure (acceptable), then don't set/overwrite it.
"""
for param, acceptable in six.iteritems(var_dict):
if not util.is_acceptable_value(param, self.postconf.get(param), acceptable):
self.postconf.set(param, acceptable[0], acceptable)
def _confirm_changes(self):
"""Confirming outstanding updates for configuration parameters.
:raises errors.PluginError: when user rejects the configuration changes.
"""
updates = self.postconf.get_changes()
output_string = "Postfix TLS configuration parameters to update in main.cf:\n"
for name, value in six.iteritems(updates):
output_string += "{0} = {1}\n".format(name, value)
output_string += "Is this okay?\n"
if not zope.component.getUtility(interfaces.IDisplay).yesno(output_string,
force_interactive=True, default=True):
raise errors.PluginError(
"Manually rejected configuration changes.\n"
"Try using --tls-only or --server-only to change a particular"
"subset of configuration parameters.")
def deploy_cert(self, domain, cert_path,
key_path, chain_path, fullchain_path):
"""Configure the Postfix SMTP server to use the given TLS cert.
@@ -171,22 +205,26 @@ class Installer(plugins_common.Installer):
"""
# pylint: disable=unused-argument
if self._tls_enabled:
return
self._tls_enabled = True
self.save_notes.append("Configuring TLS for {0}".format(domain))
self._set_config_var("smtpd_tls_cert_file", fullchain_path)
self._set_config_var("smtpd_tls_key_file", key_path)
self._set_config_var("smtpd_tls_mandatory_protocols", "!SSLv2, !SSLv3")
self._set_config_var("smtpd_tls_protocols", "!SSLv2, !SSLv3")
# Don't configure opportunistic TLS if it's currently mandatory
if self._get_config_var("smtpd_tls_security_level") != "encrypt":
self._set_config_var("smtpd_tls_security_level", "may")
self.postconf.set("smtpd_tls_cert_file", cert_path)
self.postconf.set("smtpd_tls_key_file", key_path)
self._set_vars(constants.TLS_SERVER_VARS)
if not self.conf('server_only'):
self._set_vars(constants.TLS_CLIENT_VARS)
if not self.conf('tls_only'):
self._set_vars(constants.DEFAULT_SERVER_VARS)
if not self.conf('server_only'):
self._set_vars(constants.DEFAULT_CLIENT_VARS)
# Despite the name, this option also supports 2048-bit DH params.
# http://www.postfix.org/FORWARD_SECRECY_README.html#server_fs
self.postconf.set("smtpd_tls_dh1024_param_file", self.ssl_dhparams)
self._confirm_changes()
def enhance(self, domain, enhancement, options=None):
"""Raises an exception for request for unsupported enhancement.
:raises .PluginError: this is always raised as no enhancements
are currently supported
"""Raises an exception since this installer doesn't support any enhancements.
"""
# pylint: disable=unused-argument
raise errors.PluginError(
@@ -211,248 +249,40 @@ class Installer(plugins_common.Installer):
be quickly reversed in the future (challenges)
:raises errors.PluginError: when save is unsuccessful
"""
if self.proposed_changes:
save_files = set((os.path.join(self.config_dir, "main.cf"),))
self.add_to_checkpoint(save_files,
"\n".join(self.save_notes), temporary)
self._write_config_changes()
self.proposed_changes.clear()
save_files = set((os.path.join(self.conf('config-dir'), "main.cf"),))
self.add_to_checkpoint(save_files,
"\n".join(self.save_notes), temporary)
self.postconf.flush()
del self.save_notes[:]
if title and not temporary:
self.finalize_checkpoint(title)
def config_test(self):
"""Make sure the configuration is valid.
def recovery_routine(self):
super(Installer, self).recovery_routine()
self.postconf = postconf.ConfigMain(self.conf('config-utility'),
self.conf('ignore-master-overrides'),
self.conf('config-dir'))
:raises .MisconfigurationError: if the config is invalid
def rollback_checkpoints(self, rollback=1):
"""Rollback saved checkpoints.
:param int rollback: Number of checkpoints to revert
:raises .errors.PluginError: If there is a problem with the input or
the function is unable to correctly revert the configuration
"""
try:
self._run_postfix_subcommand("check")
except subprocess.CalledProcessError:
raise errors.MisconfigurationError(
"Postfix failed internal configuration check.")
super(Installer, self).rollback_checkpoints(rollback)
self.postconf = postconf.ConfigMain(self.conf('config-utility'),
self.conf('ignore-master-overrides'),
self.conf('config-dir'))
def restart(self):
"""Restart or refresh the server content.
:raises .PluginError: when server cannot be restarted
"""
logger.info("Reloading Postfix configuration...")
if self._is_postfix_running():
self._reload()
else:
self._start()
self.postfix.restart()
def _is_postfix_running(self):
"""Is Postfix currently running?
Uses the 'postfix status' command to determine if Postfix is
currently running using the specified configuration files.
:returns: True if Postfix is running, otherwise, False
:rtype: bool
"""
try:
self._run_postfix_subcommand("status")
except subprocess.CalledProcessError:
return False
return True
def _reload(self):
"""Instructions Postfix to reload its configuration.
If Postfix isn't currently running, this method will fail.
:raises .PluginError: when Postfix cannot reload
"""
try:
self._run_postfix_subcommand("reload")
except subprocess.CalledProcessError:
raise errors.PluginError(
"Postfix failed to reload its configuration.")
def _start(self):
"""Instructions Postfix to start running.
:raises .PluginError: when Postfix cannot start
"""
try:
self._run_postfix_subcommand("start")
except subprocess.CalledProcessError:
raise errors.PluginError("Postfix failed to start")
def _run_postfix_subcommand(self, subcommand):
"""Runs a subcommand of the 'postfix' control program.
If the command fails, the exception is logged at the DEBUG
level.
:param str subcommand: subcommand to run
:raises subprocess.CalledProcessError: if the command fails
"""
cmd = [self.conf("ctl")]
if self.conf("config-dir") is not None:
cmd.extend(("-c", self.conf("config-dir"),))
cmd.append(subcommand)
util.check_call(cmd)
def _get_config_default(self, name):
"""Return the default value of the specified config parameter.
:param str name: name of the Postfix config default to return
:returns: default for the specified configuration parameter if it
exists, otherwise, None
:rtype: str or types.NoneType
:raises errors.PluginError: if an error occurs while running postconf
or parsing its output
"""
try:
return self._get_value_from_postconf(("-d", name,))
except (subprocess.CalledProcessError, errors.PluginError):
raise errors.PluginError("Unable to determine the default value of"
" the Postfix parameter {0}".format(name))
def _get_config_var(self, name):
"""Return the value of the specified Postfix config parameter.
If there is an unsaved change modifying the value of the
specified config parameter, the value after this proposed change
is returned rather than the current value. If the value is
unset, `None` is returned.
:param str name: name of the Postfix config parameter to return
:returns: value of the parameter included in postconf_args
:rtype: str or types.NoneType
:raises errors.PluginError: if an error occurs while running postconf
or parsing its output
"""
if name in self.proposed_changes:
return self.proposed_changes[name]
try:
return self._get_value_from_postconf((name,))
except (subprocess.CalledProcessError, errors.PluginError):
raise errors.PluginError("Unable to determine the value of"
" the Postfix parameter {0}".format(name))
def _set_config_var(self, name, value):
"""Set the Postfix config parameter name to value.
This method only stores the requested change in memory. The
Postfix configuration is not modified until save() is called.
If there's already an identical in progress change or the
Postfix configuration parameter already has the specified value,
no changes are made.
:param str name: name of the Postfix config parameter
:param str value: value to set the Postfix config parameter to
"""
if self._get_config_var(name) != value:
self.proposed_changes[name] = value
self.save_notes.append("\t* Set {0} to {1}".format(name, value))
def _write_config_changes(self):
"""Write proposed changes to the Postfix config.
:raises errors.PluginError: if an error occurs
"""
try:
self._run_postconf_command(
"{0}={1}".format(name, value)
for name, value in self.proposed_changes.items())
except subprocess.CalledProcessError:
raise errors.PluginError(
"An error occurred while updating your Postfix config.")
def _get_value_from_postconf(self, postconf_args):
"""Runs postconf and extracts the specified config value.
It is assumed that the name of the Postfix config parameter to
parse from the output is the last value in postconf_args. If the
value is unset, `None` is returned. If an error occurs, the
relevant information is logged before an exception is raised.
:param collections.Iterable args: arguments to postconf
:returns: value of the parameter included in postconf_args
:rtype: str or types.NoneType
:raises errors.PluginError: if unable to parse postconf output
:raises subprocess.CalledProcessError: if postconf fails
"""
name = postconf_args[-1]
output = self._run_postconf_command(postconf_args)
try:
return self._parse_postconf_output(output, name)
except errors.PluginError:
logger.debug("An error occurred while parsing postconf output",
exc_info=True)
raise
def _run_postconf_command(self, args):
"""Runs a postconf command using the selected config.
If postconf exits with a nonzero status, the error is logged
before an exception is raised.
:param collections.Iterable args: additional arguments to postconf
:returns: stdout output of postconf
:rtype: str
:raises subprocess.CalledProcessError: if the command fails
"""
cmd = [self.conf("config-utility")]
if self.conf("config-dir") is not None:
cmd.extend(("-c", self.conf("config-dir"),))
cmd.extend(args)
return util.check_output(cmd)
def _parse_postconf_output(self, output, name):
"""Parses postconf output and returns the specified value.
If the specified Postfix parameter is unset, `None` is returned.
It is assumed that most one configuration parameter will be
included in the given output.
:param str output: output from postconf
:param str name: name of the Postfix config parameter to obtain
:returns: value of the parameter included in postconf_args
:rtype: str or types.NoneType
:raises errors.PluginError: if unable to parse postconf ouput
"""
expected_prefix = name + " ="
if output.count("\n") != 1 or not output.startswith(expected_prefix):
raise errors.PluginError(
"Unexpected output '{0}' from postconf".format(output))
value = output[len(expected_prefix):].strip()
return value if value else None

View File

@@ -1,351 +0,0 @@
"""Tests for certbot_postfix.installer."""
import functools
import os
import subprocess
import unittest
import mock
from certbot import errors
from certbot.tests import util as certbot_test_util
class InstallerTest(certbot_test_util.ConfigTestCase):
# pylint: disable=too-many-public-methods
def setUp(self):
super(InstallerTest, self).setUp()
self.config.postfix_ctl = "postfix"
self.config.postfix_config_dir = self.tempdir
self.config.postfix_config_utility = "postconf"
self.mock_postfix = MockPostfix(self.tempdir,
{"mail_version": "3.1.4"})
def test_add_parser_arguments(self):
options = set(('ctl', 'config-dir', 'config-utility',))
mock_add = mock.MagicMock()
from certbot_postfix import installer
installer.Installer.add_parser_arguments(mock_add)
for call in mock_add.call_args_list:
self.assertTrue(call[0][0] in options)
def test_no_postconf_prepare(self):
installer = self._create_installer()
installer_path = "certbot_postfix.installer"
exe_exists_path = installer_path + ".certbot_util.exe_exists"
path_surgery_path = installer_path + ".plugins_util.path_surgery"
with mock.patch(path_surgery_path, return_value=False):
with mock.patch(exe_exists_path, return_value=False):
self.assertRaises(errors.NoInstallationError,
installer.prepare)
def test_postconf_error(self):
installer = self._create_installer()
check_output_path = "certbot_postfix.installer.util.check_output"
exe_exists_path = "certbot_postfix.installer.certbot_util.exe_exists"
with mock.patch(check_output_path) as mock_check_output:
mock_check_output.side_effect = subprocess.CalledProcessError(42,
"a")
with mock.patch(exe_exists_path, return_value=True):
self.assertRaises(errors.PluginError, installer.prepare)
def test_unexpected_postconf(self):
installer = self._create_installer()
check_output_path = "certbot_postfix.installer.util.check_output"
exe_exists_path = "certbot_postfix.installer.certbot_util.exe_exists"
with mock.patch(check_output_path) as mock_check_output:
mock_check_output.return_value = "foobar"
with mock.patch(exe_exists_path, return_value=True):
self.assertRaises(errors.PluginError, installer.prepare)
def test_set_config_dir(self):
self.config.postfix_config_dir = os.path.join(self.tempdir, "subdir")
os.mkdir(self.config.postfix_config_dir)
installer = self._create_installer()
expected = self.config.postfix_config_dir
self.config.postfix_config_dir = None
self.mock_postfix.set_value("config_directory", expected)
exe_exists_path = "certbot_postfix.installer.certbot_util.exe_exists"
with mock.patch(exe_exists_path, return_value=True):
self._mock_postfix_and_call(installer.prepare)
self.assertEqual(installer.config_dir, expected)
@mock.patch("certbot_postfix.installer.certbot_util.exe_exists")
def test_old_version(self, mock_exe_exists):
installer = self._create_installer()
mock_exe_exists.return_value = True
self.mock_postfix.set_value("mail_version", "0.0.1")
self._mock_postfix_and_call(
self.assertRaises, errors.NotSupportedError, installer.prepare)
def test_lock_error(self):
assert_raises = functools.partial(self.assertRaises,
errors.PluginError,
self._create_prepared_installer)
certbot_test_util.lock_and_call(assert_raises, self.tempdir)
def test_more_info(self):
installer = self._create_prepared_installer()
version = "3.1.2"
self.mock_postfix.set_value("mail_version", version)
output = self._mock_postfix_and_call(installer.more_info)
self.assertTrue("Postfix" in output)
self.assertTrue(self.tempdir in output)
self.assertTrue(version in output)
def test_get_all_names(self):
config = {"mydomain": "example.org",
"myhostname": "mail.example.org",
"myorigin": "example.org"}
for name, value in config.items():
self.mock_postfix.set_value(name, value)
installer = self._create_prepared_installer()
result = self._mock_postfix_and_call(installer.get_all_names)
self.assertEqual(result, set(config.values()))
def test_deploy(self):
installer = self._create_prepared_installer()
def deploy_cert(domain):
"""Calls deploy_cert for the given domain.
:param str domain: domain to deploy cert for
"""
installer.deploy_cert(domain, "foo", "bar", "baz", "qux")
self._mock_postfix_and_call(deploy_cert, "example.org")
# No calls to postconf are expected so mock isn't needed
deploy_cert("mail.example.org")
def test_deploy_and_save(self):
self._test_deploy_and_save_common({"smtpd_tls_security_level": "may"})
def test_deploy_and_save2(self):
self.mock_postfix.set_value("smtpd_tls_security_level", "encrypt")
self._test_deploy_and_save_common({"smtpd_tls_security_level":
"encrypt"})
def _test_deploy_and_save_common(self, expected_config):
key_path = "key_path"
fullchain_path = "fullchain_path"
installer = self._create_prepared_installer()
for i, domain in enumerate(("example.org", "mail.example.org",)):
self._mock_postfix_and_call(
installer.deploy_cert, domain, "unused",
key_path, "unused", fullchain_path)
if i:
# No mock because Postfix utilities aren't expected to be used
installer.save("noop")
else:
self._mock_postfix_and_call(installer.save, "real save")
expected_config.setdefault("smtpd_tls_cert_file", fullchain_path)
expected_config.setdefault("smtpd_tls_key_file", key_path)
for key, value in expected_config.items():
self.assertEqual(self.mock_postfix.get_value(key), value)
def test_save_error(self):
installer = self._create_prepared_installer()
self._mock_postfix_and_call(
installer.deploy_cert, "example.org", "foo", "bar", "baz", "qux")
check_call_path = "certbot_postfix.installer.util.check_output"
with mock.patch(check_call_path) as mock_check_call:
mock_check_call.side_effect = subprocess.CalledProcessError(42,
"foo")
self.assertRaises(errors.PluginError, installer.save)
def test_enhance(self):
self.assertRaises(errors.PluginError,
self._create_prepared_installer().enhance,
"example.org", "redirect")
def test_supported_enhancements(self):
self.assertEqual(
self._create_prepared_installer().supported_enhancements(), [])
@mock.patch("certbot_postfix.installer.subprocess.check_call")
def test_config_test_failure(self, mock_check_call):
installer = self._create_prepared_installer()
mock_check_call.side_effect = subprocess.CalledProcessError(42, "foo")
self.assertRaises(errors.MisconfigurationError, installer.config_test)
@mock.patch("certbot_postfix.installer.subprocess.check_call")
def test_postfix_reload_failure(self, mock_check_call):
installer = self._create_prepared_installer()
mock_check_call.side_effect = [
None, subprocess.CalledProcessError(42, "foo")
]
self.assertRaises(errors.PluginError, installer.restart)
def test_postfix_reload_success(self):
with mock.patch("certbot_postfix.installer.subprocess.check_call"):
installer = self._create_prepared_installer()
installer.restart()
@mock.patch("certbot_postfix.installer.subprocess.check_call")
def test_postfix_start_failure(self, mock_check_call):
installer = self._create_prepared_installer()
mock_check_call.side_effect = subprocess.CalledProcessError(42, "foo")
self.assertRaises(errors.PluginError, installer.restart)
@mock.patch("certbot_postfix.installer.subprocess.check_call")
def test_postfix_start_success(self, mock_check_call):
installer = self._create_prepared_installer()
mock_check_call.side_effect = [
subprocess.CalledProcessError(42, "foo"), None
]
installer.restart()
def _create_prepared_installer(self):
"""Creates and returns a new prepared Postfix Installer.
Calls in prepare() are mocked out so the Postfix version check
is successful.
:returns: a prepared Postfix installer
:rtype: certbot_postfix.installer.Installer
"""
installer = self._create_installer()
exe_exists_path = "certbot_postfix.installer.certbot_util.exe_exists"
with mock.patch(exe_exists_path, return_value=True):
self._mock_postfix_and_call(installer.prepare)
return installer
def _create_installer(self):
"""Creates and returns a new Postfix Installer.
:returns: a new Postfix installer
:rtype: certbot_postfix.installer.Installer
"""
name = "postfix"
from certbot_postfix import installer
return installer.Installer(self.config, name)
def _mock_postfix_and_call(self, func, *args, **kwargs):
"""Calls func with mocked responses from Postfix utilities.
:param callable func: function to call with mocked args
:param tuple args: positional arguments to func
:param dict kwargs: keyword arguments to func
:returns: the return value of func
"""
check_call_path = "certbot_postfix.installer.subprocess.check_call"
check_output_path = "certbot_postfix.installer.util.check_output"
with mock.patch(check_call_path) as mock_check_call:
mock_check_call.side_effect = self.mock_postfix
with mock.patch(check_output_path) as mock_check_output:
mock_check_output.side_effect = self.mock_postfix
return func(*args, **kwargs)
class MockPostfix(object):
"""A callable to mimic Postfix command line utilities.
This is best used a side effect to a mock object. All calls to
'postfix' are noops. For calls to 'postconf', values that are set in
the constructor or through mocked out runs of postconf are
remembered and properly returned if the installer attempts to fetch
the value. If the Postfix installer attempts to obtain a value that
hasn't yet been set, a dummy value is returned.
:ivar str config_path: path to Postfix main.cf file
"""
def __init__(self, config_dir, initial_values):
"""Create Postfix configuration.
:param str config_dir: path for Postfix config dir
:param dict initial_values: initial Postfix config values
"""
initial_values["config_directory"] = config_dir
self.config_path = os.path.join(config_dir, "main.cf")
self._write_config(initial_values)
def __call__(self, args, *unused_args, **unused_kwargs):
cmd = os.path.basename(args[0])
if cmd == "postfix":
return
elif cmd != "postconf": # pragma: no cover
assert False, "Unexpected command '{0}'".format(''.join(args))
output = []
skip = False
for arg in args[1:]:
if skip:
skip = False
elif arg[0] == "-":
if arg == "-c":
skip = True
elif "=" in arg:
name, _, value = arg.partition("=")
self.set_value(name, value)
else:
output.append("{0} = {1}\n".format(arg, self.get_value(arg)))
return "\n".join(output)
def get_value(self, name):
"""Returns the value for the Postfix config parameter name.
If the value isn't set, an empty string is returned.
:param str name: name of the Postfix config parameter
:returns: value of the named parameter
:rtype: str
"""
return self._read_config().get(name, "")
def set_value(self, name, value):
"""Sets the value for a Postfix config parameter.
:param str name: name of the Postfix config parameter
:param str value: value ot set the parameter to
"""
config = self._read_config()
config[name] = value
self._write_config(config)
def _read_config(self):
config = {}
with open(self.config_path) as f:
for line in f:
key, _, value = line.strip().partition(" = ")
config[key] = value
return config
def _write_config(self, config):
with open(self.config_path, "w") as f:
f.writelines("{0} = {1}\n".format(key, value)
for key, value in config.items())
if __name__ == '__main__':
unittest.main() # pragma: no cover

View File

@@ -1,62 +1,152 @@
"""Classes that wrap the postconf command line utility.
These classes allow you to interact with a Postfix config like it is a
dictionary, with the getting and setting of values in the config being
handled automatically by the class.
"""
import collections
import six
from certbot import errors
from certbot_postfix import util
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Dict, List, Tuple
# pylint: enable=unused-import, no-name-in-module
class ReadOnlyMainMap(util.PostfixUtilBase, collections.Mapping):
"""A read-only view of a Postfix main.cf file."""
class ConfigMain(util.PostfixUtilBase):
"""A parser for Postfix's main.cf file."""
_modifiers = None
"""An iterable containing additional CLI flags for postconf."""
def __getitem__(self, name):
return next(_parse_main_output(self._get_output([name])))[1]
def __iter__(self):
for name, _ in _parse_main_output(self._get_output()):
yield name
def __len__(self):
return sum(1 for _ in _parse_main_output(self._get_output()))
def _call(self, extra_args=None):
"""Runs Postconf and returns the result.
If self._modifiers is set, it is provided on the command line to
postconf before any values in extra_args.
:param list extra_args: additional arguments for the command
:returns: data written to stdout and stderr
:rtype: `tuple` of `str`
:raises subprocess.CalledProcessError: if the command fails
def __init__(self, executable, ignore_master_overrides=False, config_dir=None):
super(ConfigMain, self).__init__(executable, config_dir)
# Whether to ignore overrides from master.
self._ignore_master_overrides = ignore_master_overrides
# List of all current Postfix parameters, from `postconf` command.
self._db = {} # type: Dict[str, str]
# List of current master.cf overrides from Postfix config. Dictionary
# of parameter name => list of tuples (service name, paramter value)
# Note: We should never modify master without explicit permission.
self._master_db = {} # type: Dict[str, List[Tuple[str, str]]]
# List of all changes requested to the Postfix parameters as they are now
# in _db. These changes are flushed to `postconf` on `flush`.
self._updated = {} # type: Dict[str, str]
self._read_from_conf()
def _read_from_conf(self):
"""Reads initial parameter state from `main.cf` into this object.
"""
all_extra_args = []
for args_list in (self._modifiers, extra_args,):
if args_list is not None:
all_extra_args.extend(args_list)
out = self._get_output()
for name, value in _parse_main_output(out):
self._db[name] = value
out = self._get_output_master()
for name, value in _parse_main_output(out):
service, param_name = name.rsplit("/", 1)
if param_name not in self._master_db:
self._master_db[param_name] = []
self._master_db[param_name].append((service, value))
return super(ReadOnlyMainMap, self)._call(all_extra_args)
def _get_output_master(self):
"""Retrieves output for `master.cf` parameters."""
return self._get_output('-P')
def get_default(self, name):
"""Retrieves default value of parameter `name` from postfix parameters.
:param str name: The name of the parameter to fetch.
:returns: The default value of parameter `name`.
:rtype: str
"""
out = self._get_output(['-d', name])
_, value = next(_parse_main_output(out), (None, None))
return value
def get(self, name):
"""Retrieves working value of parameter `name` from postfix parameters.
:param str name: The name of the parameter to fetch.
:returns: The value of parameter `name`.
:rtype: str
"""
if name in self._updated:
return self._updated[name]
return self._db[name]
def get_master_overrides(self, name):
"""Retrieves list of overrides for parameter `name` in postfix's Master config
file.
:returns: List of tuples (service, value), meaning that parameter `name`
is overridden as `value` for `service`.
:rtype: `list` of `tuple` of `str`
"""
if name in self._master_db:
return self._master_db[name]
return None
def set(self, name, value, acceptable_overrides=None):
"""Sets parameter `name` to `value`. If `name` is overridden by a particular service in
`master.cf`, reports any of these parameter conflicts as long as
`ignore_master_overrides` was not set.
.. note:: that this function does not flush these parameter values to main.cf;
To do that, use `flush`.
:param str name: The name of the parameter to set.
:param str value: The value of the parameter.
:param tuple acceptable_overrides: If the master configuration file overrides `value`
with a value in acceptable_overrides.
"""
if name not in self._db:
raise KeyError("Parameter name %s is not a valid Postfix parameter name.", name)
# Check to see if this parameter is overridden by master.
overrides = self.get_master_overrides(name)
if not self._ignore_master_overrides and overrides is not None:
util.report_master_overrides(name, overrides, acceptable_overrides)
if value != self._db[name]:
# _db contains the "original" state of parameters. We only care about
# writes if they cause a delta from the original state.
self._updated[name] = value
elif name in self._updated:
# If this write reverts a previously updated parameter back to the
# original DB's state, we don't have to keep track of it in _updated.
del self._updated[name]
def flush(self):
"""Flushes all parameter changes made using `self.set`, to `main.cf`
:raises error.PluginError: When flush to main.cf fails for some reason.
"""
if len(self._updated) == 0:
return
args = ['-e']
for name, value in six.iteritems(self._updated):
args.append('{0}={1}'.format(name, value))
try:
self._get_output(args)
except IOError as e:
raise errors.PluginError("Unable to save to Postfix config: %v", e)
for name, value in six.iteritems(self._updated):
self._db[name] = value
self._updated = {}
def get_changes(self):
""" Return queued changes to main.cf.
:rtype: dict[str, str]
"""
return self._updated
def _parse_main_output(output):
"""Parses the raw output from Postconf about main.cf.
Expects the output to look like:
.. code-block:: none
name1 = value1
name2 = value2
:param str output: data postconf wrote to stdout about main.cf
:returns: generator providing key-value pairs from main.cf
:rtype: generator
:rtype: Iterator[tuple(str, str)]
"""
for line in output.splitlines():
name, _, value = line.partition(" =")
yield name, value.strip()

View File

@@ -0,0 +1 @@
""" Certbot Postfix Tests """

View File

@@ -0,0 +1,314 @@
"""Tests for certbot_postfix.installer."""
from contextlib import contextmanager
import copy
import functools
import os
import pkg_resources
import six
import unittest
import mock
from certbot import errors
from certbot.tests import util as certbot_test_util
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Dict, Tuple, Union
# pylint: enable=unused-import, no-name-in-module
DEFAULT_MAIN_CF = {
"smtpd_tls_cert_file": "",
"smtpd_tls_key_file": "",
"smtpd_tls_dh1024_param_file": "",
"smtpd_tls_security_level": "none",
"smtpd_tls_auth_only": "",
"smtpd_tls_mandatory_protocols": "",
"smtpd_tls_protocols": "",
"smtpd_tls_ciphers": "",
"smtpd_tls_exclude_ciphers": "",
"smtpd_tls_mandatory_ciphers": "",
"smtpd_tls_eecdh_grade": "medium",
"smtp_tls_security_level": "",
"smtp_tls_ciphers": "",
"smtp_tls_exclude_ciphers": "",
"smtp_tls_mandatory_ciphers": "",
"mail_version": "3.2.3"
}
def _main_cf_with(obj):
main_cf = copy.copy(DEFAULT_MAIN_CF)
main_cf.update(obj)
return main_cf
class InstallerTest(certbot_test_util.ConfigTestCase):
# pylint: disable=too-many-public-methods
def setUp(self):
super(InstallerTest, self).setUp()
_config_file = pkg_resources.resource_filename("certbot_postfix.tests",
os.path.join("testdata", "config.json"))
self.config.postfix_ctl = "postfix"
self.config.postfix_config_dir = self.tempdir
self.config.postfix_config_utility = "postconf"
self.config.postfix_tls_only = False
self.config.postfix_server_only = False
self.config.config_dir = self.tempdir
@mock.patch("certbot_postfix.installer.util.is_acceptable_value")
def test_set_vars(self, mock_is_acceptable_value):
mock_is_acceptable_value.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
mock_is_acceptable_value.return_value = False
@mock.patch("certbot_postfix.installer.util.is_acceptable_value")
def test_acceptable_value(self, mock_is_acceptable_value):
mock_is_acceptable_value.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
mock_is_acceptable_value.return_value = False
@certbot_test_util.patch_get_utility()
def test_confirm_changes_no_raises_error(self, mock_util):
mock_util().yesno.return_value = False
with create_installer(self.config) as installer:
installer.prepare()
self.assertRaises(errors.PluginError, installer.deploy_cert,
"example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
@certbot_test_util.patch_get_utility()
def test_save(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.postconf.flush = mock.Mock()
installer.reverter = mock.Mock()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
installer.save()
self.assertEqual(installer.save_notes, [])
self.assertEqual(installer.postconf.flush.call_count, 1)
self.assertEqual(installer.reverter.add_to_checkpoint.call_count, 1)
@certbot_test_util.patch_get_utility()
def test_save_with_title(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.postconf.flush = mock.Mock()
installer.reverter = mock.Mock()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
installer.save(title="new_file!")
self.assertEqual(installer.reverter.finalize_checkpoint.call_count, 1)
@certbot_test_util.patch_get_utility()
def test_rollback_checkpoints_resets_postconf(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
installer.rollback_checkpoints()
self.assertEqual(installer.postconf.get_changes(), {})
@certbot_test_util.patch_get_utility()
def test_recovery_routine_resets_postconf(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
installer.recovery_routine()
self.assertEqual(installer.postconf.get_changes(), {})
def test_restart(self):
with create_installer(self.config) as installer:
installer.prepare()
installer.restart()
self.assertEqual(installer.postfix.restart.call_count, 1)
def test_add_parser_arguments(self):
options = set(("ctl", "config-dir", "config-utility",
"tls-only", "server-only", "ignore-master-overrides"))
mock_add = mock.MagicMock()
from certbot_postfix import installer
installer.Installer.add_parser_arguments(mock_add)
for call in mock_add.call_args_list:
self.assertTrue(call[0][0] in options)
def test_no_postconf_prepare(self):
with create_installer(self.config) as installer:
installer_path = "certbot_postfix.installer"
exe_exists_path = installer_path + ".certbot_util.exe_exists"
path_surgery_path = "certbot_postfix.util.plugins_util.path_surgery"
with mock.patch(path_surgery_path, return_value=False):
with mock.patch(exe_exists_path, return_value=False):
self.assertRaises(errors.NoInstallationError,
installer.prepare)
def test_old_version(self):
with create_installer(self.config, main_cf=_main_cf_with({"mail_version": "0.0.1"}))\
as installer:
self.assertRaises(errors.NotSupportedError, installer.prepare)
def test_lock_error(self):
with create_installer(self.config) as installer:
assert_raises = functools.partial(self.assertRaises,
errors.PluginError,
installer.prepare)
certbot_test_util.lock_and_call(assert_raises, self.tempdir)
@mock.patch('certbot.util.lock_dir_until_exit')
def test_dir_locked(self, lock_dir):
with create_installer(self.config) as installer:
lock_dir.side_effect = errors.LockError
self.assertRaises(errors.PluginError, installer.prepare)
def test_more_info(self):
with create_installer(self.config) as installer:
installer.prepare()
output = installer.more_info()
self.assertTrue("Postfix" in output)
self.assertTrue(self.tempdir in output)
self.assertTrue(DEFAULT_MAIN_CF["mail_version"] in output)
def test_get_all_names(self):
config = {"mydomain": "example.org",
"myhostname": "mail.example.org",
"myorigin": "example.org"}
with create_installer(self.config, main_cf=_main_cf_with(config)) as installer:
installer.prepare()
result = installer.get_all_names()
self.assertEqual(result, set(config.values()))
@certbot_test_util.patch_get_utility()
def test_deploy(self, mock_util):
mock_util().yesno.return_value = True
from certbot_postfix import constants
with create_installer(self.config) as installer:
installer.prepare()
# pylint: disable=protected-access
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
changes = installer.postconf.get_changes()
expected = {} # type: Dict[str, Tuple[str, ...]]
expected.update(constants.TLS_SERVER_VARS)
expected.update(constants.DEFAULT_SERVER_VARS)
expected.update(constants.DEFAULT_CLIENT_VARS)
self.assertEqual(changes["smtpd_tls_key_file"], "key_path")
self.assertEqual(changes["smtpd_tls_cert_file"], "cert_path")
for name, value in six.iteritems(expected):
self.assertEqual(changes[name], value[0])
@certbot_test_util.patch_get_utility()
def test_tls_only(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.conf = lambda x: x == "tls_only"
installer.postconf.set = mock.Mock()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
self.assertEqual(installer.postconf.set.call_count, 4)
@certbot_test_util.patch_get_utility()
def test_server_only(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.conf = lambda x: x == "server_only"
installer.postconf.set = mock.Mock()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
self.assertEqual(installer.postconf.set.call_count, 11)
@certbot_test_util.patch_get_utility()
def test_tls_and_server_only(self, mock_util):
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
installer.conf = lambda x: True
installer.postconf.set = mock.Mock()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
self.assertEqual(installer.postconf.set.call_count, 3)
@certbot_test_util.patch_get_utility()
def test_deploy_twice(self, mock_util):
# Deploying twice on the same installer shouldn't do anything!
mock_util().yesno.return_value = True
with create_installer(self.config) as installer:
installer.prepare()
from certbot_postfix.postconf import ConfigMain
with mock.patch.object(ConfigMain, "set", wraps=installer.postconf.set) as fake_set:
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
self.assertEqual(fake_set.call_count, 15)
fake_set.reset_mock()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
fake_set.assert_not_called()
@certbot_test_util.patch_get_utility()
def test_deploy_already_secure(self, mock_util):
# Should not overwrite "more-secure" parameters
mock_util().yesno.return_value = True
more_secure = {
"smtpd_tls_security_level": "encrypt",
"smtpd_tls_protocols": "!SSLv3, !SSLv2, !TLSv1",
"smtpd_tls_eecdh_grade": "strong"
}
with create_installer(self.config,\
main_cf=_main_cf_with(more_secure)) as installer:
installer.prepare()
installer.deploy_cert("example.com", "cert_path", "key_path",
"chain_path", "fullchain_path")
for param in more_secure.keys():
self.assertFalse(param in installer.postconf.get_changes())
def test_enhance(self):
with create_installer(self.config) as installer:
installer.prepare()
self.assertRaises(errors.PluginError,
installer.enhance,
"example.org", "redirect")
def test_supported_enhancements(self):
with create_installer(self.config) as installer:
installer.prepare()
self.assertEqual(installer.supported_enhancements(), [])
@contextmanager
def create_installer(config, main_cf=DEFAULT_MAIN_CF):
# pylint: disable=dangerous-default-value
"""Creates a Postfix installer with calls to `postconf` and `postfix` mocked out.
In particular, creates a ConfigMain object that does regular things, but seeds it
with values from `main_cf` and `master_cf` dicts.
"""
from certbot_postfix.postconf import ConfigMain
from certbot_postfix import installer
def _mock_init_postconf(postconf, executable, ignore_master_overrides=False, config_dir=None):
# pylint: disable=protected-access,unused-argument
postconf._ignore_master_overrides = ignore_master_overrides
postconf._db = main_cf
postconf._master_db = {}
postconf._updated = {}
# override get_default to get from main
postconf.get_default = lambda name: main_cf[name]
with mock.patch.object(ConfigMain, "__init__", _mock_init_postconf):
exe_exists_path = "certbot_postfix.installer.certbot_util.exe_exists"
with mock.patch(exe_exists_path, return_value=True):
with mock.patch("certbot_postfix.installer.util.PostfixUtil",
return_value=mock.Mock()):
yield installer.Installer(config, "postfix")
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -0,0 +1,107 @@
"""Tests for certbot_postfix.postconf."""
import mock
import unittest
from certbot import errors
class PostConfTest(unittest.TestCase):
"""Tests for certbot_postfix.util.PostConf."""
def setUp(self):
from certbot_postfix.postconf import ConfigMain
super(PostConfTest, self).setUp()
with mock.patch('certbot_postfix.util.PostfixUtilBase._get_output') as mock_call:
with mock.patch('certbot_postfix.postconf.ConfigMain._get_output_master') as \
mock_master_call:
with mock.patch('certbot_postfix.postconf.util.verify_exe_exists') as verify_exe:
verify_exe.return_value = True
mock_call.return_value = ('default_parameter = value\n'
'extra_param =\n'
'overridden_by_master = default\n')
mock_master_call.return_value = (
'service/type/overridden_by_master = master_value\n'
'service2/type/overridden_by_master = master_value2\n'
)
self.config = ConfigMain('postconf', False)
@mock.patch('certbot_postfix.util.PostfixUtilBase._get_output')
@mock.patch('certbot_postfix.postconf.util.verify_exe_exists')
def test_get_output_master(self, mock_verify_exe, mock_get_output):
from certbot_postfix.postconf import ConfigMain
mock_verify_exe.return_value = True
ConfigMain('postconf', lambda x, y, z: None)
mock_get_output.assert_called_with('-P')
@mock.patch('certbot_postfix.util.PostfixUtilBase._get_output')
def test_read_default(self, mock_get_output):
mock_get_output.return_value = 'param = default_value'
self.assertEqual(self.config.get_default('param'), 'default_value')
@mock.patch('certbot_postfix.util.PostfixUtilBase._call')
def test_set(self, mock_call):
self.config.set('extra_param', 'other_value')
self.assertEqual(self.config.get('extra_param'), 'other_value')
self.config.flush()
mock_call.assert_called_with(['-e', 'extra_param=other_value'])
def test_set_bad_param_name(self):
self.assertRaises(KeyError, self.config.set, 'nonexistent_param', 'some_value')
@mock.patch('certbot_postfix.util.PostfixUtilBase._call')
def test_write_revert(self, mock_call):
self.config.set('default_parameter', 'fake_news')
# revert config set
self.config.set('default_parameter', 'value')
self.config.flush()
mock_call.assert_not_called()
@mock.patch('certbot_postfix.util.PostfixUtilBase._call')
def test_write_default(self, mock_call):
self.config.set('default_parameter', 'value')
self.config.flush()
mock_call.assert_not_called()
def test_master_overrides(self):
self.assertEqual(self.config.get_master_overrides('overridden_by_master'),
[('service/type', 'master_value'),
('service2/type', 'master_value2')])
def test_set_check_override(self):
self.assertRaises(errors.PluginError, self.config.set,
'overridden_by_master', 'new_value')
def test_ignore_check_override(self):
# pylint: disable=protected-access
self.config._ignore_master_overrides = True
self.config.set('overridden_by_master', 'new_value')
def test_check_acceptable_overrides(self):
self.config.set('overridden_by_master', 'new_value',
('master_value', 'master_value2'))
@mock.patch('certbot_postfix.util.PostfixUtilBase._get_output')
def test_flush(self, mock_out):
self.config.set('default_parameter', 'new_value')
self.config.set('extra_param', 'another_value')
self.config.flush()
arguments = mock_out.call_args_list[-1][0][0]
self.assertEquals('-e', arguments[0])
self.assertTrue('default_parameter=new_value' in arguments)
self.assertTrue('extra_param=another_value' in arguments)
@mock.patch('certbot_postfix.util.PostfixUtilBase._get_output')
def test_flush_updates_object(self, mock_out):
self.config.set('default_parameter', 'new_value')
self.config.flush()
mock_out.reset_mock()
self.config.set('default_parameter', 'new_value')
mock_out.assert_not_called()
@mock.patch('certbot_postfix.util.PostfixUtilBase._get_output')
def test_flush_throws_error_on_fail(self, mock_out):
mock_out.side_effect = [IOError("oh no!")]
self.config.set('default_parameter', 'new_value')
self.assertRaises(errors.PluginError, self.config.flush)
if __name__ == '__main__': # pragma: no cover
unittest.main()

View File

@@ -0,0 +1,205 @@
"""Tests for certbot_postfix.util."""
import subprocess
import unittest
import mock
from certbot import errors
class PostfixUtilBaseTest(unittest.TestCase):
"""Tests for certbot_postfix.util.PostfixUtilBase."""
@classmethod
def _create_object(cls, *args, **kwargs):
from certbot_postfix.util import PostfixUtilBase
return PostfixUtilBase(*args, **kwargs)
@mock.patch('certbot_postfix.util.verify_exe_exists')
def test_no_exe(self, mock_verify):
expected_error = errors.NoInstallationError
mock_verify.side_effect = expected_error
self.assertRaises(expected_error, self._create_object, 'nonexistent')
def test_object_creation(self):
with mock.patch('certbot_postfix.util.verify_exe_exists'):
self._create_object('existent')
@mock.patch('certbot_postfix.util.check_all_output')
def test_call_extends_args(self, mock_output):
# pylint: disable=protected-access
with mock.patch('certbot_postfix.util.verify_exe_exists'):
mock_output.return_value = 'expected'
postfix = self._create_object('executable')
postfix._call(['many', 'extra', 'args'])
mock_output.assert_called_with(['executable', 'many', 'extra', 'args'])
postfix._call()
mock_output.assert_called_with(['executable'])
def test_create_with_config(self):
# pylint: disable=protected-access
with mock.patch('certbot_postfix.util.verify_exe_exists'):
postfix = self._create_object('exec', 'config_dir')
self.assertEqual(postfix._base_command, ['exec', '-c', 'config_dir'])
class PostfixUtilTest(unittest.TestCase):
def setUp(self):
# pylint: disable=protected-access
from certbot_postfix.util import PostfixUtil
with mock.patch('certbot_postfix.util.verify_exe_exists'):
self.postfix = PostfixUtil()
self.postfix._call = mock.Mock()
self.mock_call = self.postfix._call
def test_test(self):
self.postfix.test()
self.mock_call.assert_called_with(['check'])
def test_test_raises_error_when_check_fails(self):
self.mock_call.side_effect = [subprocess.CalledProcessError(1, "")]
self.assertRaises(errors.MisconfigurationError, self.postfix.test)
self.mock_call.assert_called_with(['check'])
def test_restart_while_running(self):
self.mock_call.side_effect = [subprocess.CalledProcessError(1, ""), None]
self.postfix.restart()
self.mock_call.assert_called_with(['start'])
def test_restart_while_not_running(self):
self.postfix.restart()
self.mock_call.assert_called_with(['reload'])
def test_restart_raises_error_when_reload_fails(self):
self.mock_call.side_effect = [None, subprocess.CalledProcessError(1, "")]
self.assertRaises(errors.PluginError, self.postfix.restart)
self.mock_call.assert_called_with(['reload'])
def test_restart_raises_error_when_start_fails(self):
self.mock_call.side_effect = [
subprocess.CalledProcessError(1, ""),
subprocess.CalledProcessError(1, "")]
self.assertRaises(errors.PluginError, self.postfix.restart)
self.mock_call.assert_called_with(['start'])
class CheckAllOutputTest(unittest.TestCase):
"""Tests for certbot_postfix.util.check_all_output."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot_postfix.util import check_all_output
return check_all_output(*args, **kwargs)
@mock.patch('certbot_postfix.util.logger')
@mock.patch('certbot_postfix.util.subprocess.Popen')
def test_command_error(self, mock_popen, mock_logger):
command = 'foo'
retcode = 42
output = 'bar'
err = 'baz'
mock_popen().communicate.return_value = (output, err)
mock_popen().poll.return_value = 42
self.assertRaises(subprocess.CalledProcessError, self._call, command)
log_args = mock_logger.debug.call_args[0]
for value in (command, retcode, output, err,):
self.assertTrue(value in log_args)
@mock.patch('certbot_postfix.util.subprocess.Popen')
def test_success(self, mock_popen):
command = 'foo'
expected = ('bar', '')
mock_popen().communicate.return_value = expected
mock_popen().poll.return_value = 0
self.assertEqual(self._call(command), expected)
def test_stdout_error(self):
self.assertRaises(ValueError, self._call, stdout=None)
def test_stderr_error(self):
self.assertRaises(ValueError, self._call, stderr=None)
def test_universal_newlines_error(self):
self.assertRaises(ValueError, self._call, universal_newlines=False)
class VerifyExeExistsTest(unittest.TestCase):
"""Tests for certbot_postfix.util.verify_exe_exists."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot_postfix.util import verify_exe_exists
return verify_exe_exists(*args, **kwargs)
@mock.patch('certbot_postfix.util.certbot_util.exe_exists')
@mock.patch('certbot_postfix.util.plugins_util.path_surgery')
def test_failure(self, mock_exe_exists, mock_path_surgery):
mock_exe_exists.return_value = mock_path_surgery.return_value = False
self.assertRaises(errors.NoInstallationError, self._call, 'foo')
@mock.patch('certbot_postfix.util.certbot_util.exe_exists')
def test_simple_success(self, mock_exe_exists):
mock_exe_exists.return_value = True
self._call('foo')
@mock.patch('certbot_postfix.util.certbot_util.exe_exists')
@mock.patch('certbot_postfix.util.plugins_util.path_surgery')
def test_successful_surgery(self, mock_exe_exists, mock_path_surgery):
mock_exe_exists.return_value = False
mock_path_surgery.return_value = True
self._call('foo')
class TestUtils(unittest.TestCase):
""" Testing random utility functions in util.py
"""
def test_report_master_overrides(self):
from certbot_postfix.util import report_master_overrides
self.assertRaises(errors.PluginError, report_master_overrides, 'name',
[('service/type', 'value')])
# Shouldn't raise error
report_master_overrides('name', [('service/type', 'value')],
acceptable_overrides=('value',))
def test_no_acceptable_value(self):
from certbot_postfix.util import is_acceptable_value
self.assertFalse(is_acceptable_value('name', 'value', None))
def test_is_acceptable_value(self):
from certbot_postfix.util import is_acceptable_value
self.assertTrue(is_acceptable_value('name', 'value', ('value',)))
self.assertFalse(is_acceptable_value('name', 'bad', ('value',)))
def test_is_acceptable_tuples(self):
from certbot_postfix.util import is_acceptable_value
self.assertTrue(is_acceptable_value('name', 'value', ('value', 'value1')))
self.assertFalse(is_acceptable_value('name', 'bad', ('value', 'value1')))
def test_is_acceptable_protocols(self):
from certbot_postfix.util import is_acceptable_value
# SSLv2 and SSLv3 are both not supported, unambiguously
self.assertFalse(is_acceptable_value('tls_mandatory_protocols_lol',
'SSLv2, SSLv3', None))
self.assertFalse(is_acceptable_value('tls_protocols_lol',
'SSLv2, SSLv3', None))
self.assertFalse(is_acceptable_value('tls_protocols_lol',
'!SSLv2, !TLSv1', None))
self.assertFalse(is_acceptable_value('tls_protocols_lol',
'!SSLv2, SSLv3, !SSLv3, ', None))
self.assertTrue(is_acceptable_value('tls_protocols_lol',
'!SSLv2, !SSLv3', None))
self.assertTrue(is_acceptable_value('tls_protocols_lol',
'!SSLv3, !TLSv1, !SSLv2', None))
# TLSv1.2 is supported unambiguously
self.assertFalse(is_acceptable_value('tls_protocols_lol',
'TLSv1, TLSv1.1,', None))
self.assertFalse(is_acceptable_value('tls_protocols_lol',
'TLSv1.2, !TLSv1.2,', None))
self.assertTrue(is_acceptable_value('tls_protocols_lol',
'TLSv1.2, ', None))
self.assertTrue(is_acceptable_value('tls_protocols_lol',
'TLSv1, TLSv1.1, TLSv1.2', None))
if __name__ == '__main__': # pragma: no cover
unittest.main()

View File

@@ -1,14 +1,16 @@
"""Utility functions for use in the Postfix installer."""
import logging
import re
import subprocess
from certbot import errors
from certbot import util as certbot_util
from certbot.plugins import util as plugins_util
from certbot_postfix import constants
logger = logging.getLogger(__name__)
COMMAND = "postfix"
class PostfixUtilBase(object):
"""A base class for wrapping Postfix command line utilities."""
@@ -22,9 +24,13 @@ class PostfixUtilBase(object):
:raises .NoInstallationError: when the executable isn't found
"""
self.executable = executable
verify_exe_exists(executable)
self._set_base_command(config_dir)
self.config_dir = None
self._base_command = [executable]
def _set_base_command(self, config_dir):
self._base_command = [self.executable]
if config_dir is not None:
self._base_command.extend(('-c', config_dir,))
@@ -59,6 +65,77 @@ class PostfixUtilBase(object):
"""
return self._call(extra_args)[0]
class PostfixUtil(PostfixUtilBase):
"""Wrapper around Postfix CLI tool.
"""
def __init__(self, config_dir=None):
super(PostfixUtil, self).__init__(COMMAND, config_dir)
def test(self):
"""Make sure the configuration is valid.
:raises .MisconfigurationError: if the config is invalid
"""
try:
self._call(["check"])
except subprocess.CalledProcessError as e:
logger.debug("Could not check postfix configuration:\n%s", e)
raise errors.MisconfigurationError(
"Postfix failed internal configuration check.")
def restart(self):
"""Restart or refresh the server content.
:raises .PluginError: when server cannot be restarted
"""
logger.info("Reloading Postfix configuration...")
if self._is_running():
self._reload()
else:
self._start()
def _is_running(self):
"""Is Postfix currently running?
Uses the 'postfix status' command to determine if Postfix is
currently running using the specified configuration files.
:returns: True if Postfix is running, otherwise, False
:rtype: bool
"""
try:
self._call(["status"])
except subprocess.CalledProcessError:
return False
return True
def _start(self):
"""Instructions Postfix to start running.
:raises .PluginError: when Postfix cannot start
"""
try:
self._call(["start"])
except subprocess.CalledProcessError:
raise errors.PluginError("Postfix failed to start")
def _reload(self):
"""Instructs Postfix to reload its configuration.
If Postfix isn't currently running, this method will fail.
:raises .PluginError: when Postfix cannot reload
"""
try:
self._call(["reload"])
except subprocess.CalledProcessError:
raise errors.PluginError(
"Postfix failed to reload its configuration")
def check_all_output(*args, **kwargs):
"""A version of subprocess.check_output that also captures stderr.
@@ -107,7 +184,7 @@ def check_all_output(*args, **kwargs):
return (output, err)
def verify_exe_exists(exe):
def verify_exe_exists(exe, message=None):
"""Ensures an executable with the given name is available.
If an executable isn't found for the given path or name, extra
@@ -115,83 +192,101 @@ def verify_exe_exists(exe):
utilities that may not be available in the default cron PATH.
:param str exe: executable path or name
:param str message: Error message to print.
:raises .NoInstallationError: when the executable isn't found
"""
if message is None:
message = "Cannot find executable '{0}'.".format(exe)
if not (certbot_util.exe_exists(exe) or plugins_util.path_surgery(exe)):
raise errors.NoInstallationError(
"Cannot find executable '{0}'.".format(exe))
raise errors.NoInstallationError(message)
def report_master_overrides(name, overrides, acceptable_overrides=None):
"""If the value for a parameter `name` is overridden by other services,
report a warning to notify the user. If `parameter` is a TLS version parameter
(i.e., `parameter` contains 'tls_protocols' or 'tls_mandatory_protocols'), then
`acceptable_overrides` isn't used each value in overrides is inspected for secure TLS
versions.
def check_call(*args, **kwargs):
"""A simple wrapper of subprocess.check_call that logs errors.
:param tuple args: positional arguments to subprocess.check_call
:param dict kargs: keyword arguments to subprocess.check_call
:raises subprocess.CalledProcessError: if the call fails
:param str name: The name of the parameter that is being overridden.
:param list overrides: The values that other services are setting for `name`.
Each override is a tuple: (service name, value)
:param tuple acceptable_overrides: Override values that are acceptable. For instance, if
another service is overriding our parameter with a more secure option, we don't have
to warn. If this is set to None, errors are raised for *any* overrides of `name`!
"""
try:
subprocess.check_call(*args, **kwargs)
except subprocess.CalledProcessError:
cmd = _get_cmd(*args, **kwargs)
logger.debug("%s exited with a non-zero status.",
"".join(cmd), exc_info=True)
raise
error_string = ""
for override in overrides:
service, value = override
# If this override is acceptable:
if acceptable_overrides is not None and \
is_acceptable_value(name, value, acceptable_overrides):
continue
error_string += " {0}: {1}\n".format(service, value)
if error_string:
raise errors.PluginError("{0} is overridden with less secure options by the "
"following services in master.cf:\n".format(name) + error_string)
def check_output(*args, **kwargs):
"""Backported version of subprocess.check_output for Python 2.6+.
This is the same as subprocess.check_output from newer versions of
Python, except:
1. The return value is a string rather than a byte string. To
accomplish this, the caller cannot set the parameter
universal_newlines.
2. If the command exits with a nonzero status, output is not
included in the raised subprocess.CalledProcessError because
subprocess.CalledProcessError on Python 2.6 does not support this.
Instead, the failure including the output is logged.
:param tuple args: positional arguments for Popen
:param dict kwargs: keyword arguments for Popen
:returns: data printed to stdout
:rtype: str
:raises ValueError: if arguments are invalid
:raises subprocess.CalledProcessError: if the command fails
def is_acceptable_value(parameter, value, acceptable=None):
""" Returns whether the `value` for this `parameter` is acceptable,
given a tuple of `acceptable` values. If `parameter` is a TLS version parameter
(i.e., `parameter` contains 'tls_protocols' or 'tls_mandatory_protocols'), then
`acceptable` isn't used and `value` is inspected for secure TLS versions.
:param str parameter: The name of the parameter being set.
:param str value: Proposed new value for parameter.
:param tuple acceptable: List of acceptable values for parameter.
"""
for keyword in ('stdout', 'universal_newlines',):
if keyword in kwargs:
raise ValueError(
keyword + ' argument not allowed, it will be overridden.')
kwargs['stdout'] = subprocess.PIPE
kwargs['universal_newlines'] = True
process = subprocess.Popen(*args, **kwargs)
output, unused_err = process.communicate()
retcode = process.poll()
if retcode:
cmd = _get_cmd(*args, **kwargs)
logger.debug(
"'%s' exited with %d. Output was:\n%s",
cmd, retcode, output, exc_info=True)
raise subprocess.CalledProcessError(retcode, cmd)
return output
# Check if param value is a comma-separated list of protocols.
# Otherwise, just check whether the value is in the acceptable list.
if 'tls_protocols' in parameter or 'tls_mandatory_protocols' in parameter:
return _has_acceptable_tls_versions(value)
if acceptable is not None:
return value in acceptable
return False
def _get_cmd(*args, **kwargs):
"""Return the command from Popen args.
:param tuple args: Popen args
:param dict kwargs: Popen kwargs
def _has_acceptable_tls_versions(parameter_string):
"""
cmd = kwargs.get('args')
return args[0] if cmd is None else cmd
Checks to see if the list of TLS protocols is acceptable.
This requires that TLSv1.2 is supported, and neither SSLv2 nor SSLv3 are supported.
Should be a string of protocol names delimited by commas, spaces, or colons.
Postfix's documents suggest listing protocols to exclude, like "!SSLv2, !SSLv3".
Listing the protocols to include, like "TLSv1, TLSv1.1, TLSv1.2" is okay as well,
though not recommended
When these two modes are interspersed, the presence of a single non-negated protocol name
(i.e. "TLSv1" rather than "!TLSv1") automatically excludes all other unnamed protocols.
In addition, the presence of both a protocol name inclusion and exclusion isn't explicitly
documented, so this method should return False if it encounters contradicting statements
about TLSv1.2, SSLv2, or SSLv3. (for instance, "SSLv3, !SSLv3").
"""
if not parameter_string:
return False
bad_versions = list(constants.TLS_VERSIONS)
for version in constants.ACCEPTABLE_TLS_VERSIONS:
del bad_versions[bad_versions.index(version)]
supported_version_list = re.split("[, :]+", parameter_string)
# The presence of any non-"!" protocol listing excludes the others by default.
inclusion_list = False
for version in supported_version_list:
if not version:
continue
if version in bad_versions: # short-circuit if we recognize any bad version
return False
if version[0] != "!":
inclusion_list = True
if inclusion_list: # For any inclusion list, we still require TLS 1.2.
if "TLSv1.2" not in supported_version_list or "!TLSv1.2" in supported_version_list:
return False
else:
for bad_version in bad_versions:
if "!" + bad_version not in supported_version_list:
return False
return True

View File

@@ -1,165 +0,0 @@
"""Tests for certbot_postfix.util."""
import subprocess
import unittest
import mock
from certbot import errors
class PostfixUtilBaseTest(unittest.TestCase):
"""Tests for certbot_postfix.util.PostfixUtilBase."""
@classmethod
def _create_object(cls, *args, **kwargs):
from certbot_postfix.util import PostfixUtilBase
return PostfixUtilBase(*args, **kwargs)
@mock.patch('certbot_postfix.util.verify_exe_exists')
def test_no_exe(self, mock_verify):
expected_error = errors.NoInstallationError
mock_verify.side_effect = expected_error
self.assertRaises(expected_error, self._create_object, 'nonexistent')
def test_object_creation(self):
with mock.patch('certbot_postfix.util.verify_exe_exists'):
self._create_object('existent')
class CheckAllOutputTest(unittest.TestCase):
"""Tests for certbot_postfix.util.check_all_output."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot_postfix.util import check_all_output
return check_all_output(*args, **kwargs)
@mock.patch('certbot_postfix.util.logger')
@mock.patch('certbot_postfix.util.subprocess.Popen')
def test_command_error(self, mock_popen, mock_logger):
command = 'foo'
retcode = 42
output = 'bar'
err = 'baz'
mock_popen().communicate.return_value = (output, err)
mock_popen().poll.return_value = 42
self.assertRaises(subprocess.CalledProcessError, self._call, command)
log_args = mock_logger.debug.call_args[0]
for value in (command, retcode, output, err,):
self.assertTrue(value in log_args)
@mock.patch('certbot_postfix.util.subprocess.Popen')
def test_success(self, mock_popen):
command = 'foo'
expected = ('bar', '')
mock_popen().communicate.return_value = expected
mock_popen().poll.return_value = 0
self.assertEqual(self._call(command), expected)
def test_stdout_error(self):
self.assertRaises(ValueError, self._call, stdout=None)
def test_stderr_error(self):
self.assertRaises(ValueError, self._call, stderr=None)
def test_universal_newlines_error(self):
self.assertRaises(ValueError, self._call, universal_newlines=False)
class VerifyExeExistsTest(unittest.TestCase):
"""Tests for certbot_postfix.util.verify_exe_exists."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot_postfix.util import verify_exe_exists
return verify_exe_exists(*args, **kwargs)
@mock.patch('certbot_postfix.util.certbot_util.exe_exists')
@mock.patch('certbot_postfix.util.plugins_util.path_surgery')
def test_failure(self, mock_exe_exists, mock_path_surgery):
mock_exe_exists.return_value = mock_path_surgery.return_value = False
self.assertRaises(errors.NoInstallationError, self._call, 'foo')
@mock.patch('certbot_postfix.util.certbot_util.exe_exists')
def test_simple_success(self, mock_exe_exists):
mock_exe_exists.return_value = True
self._call('foo')
@mock.patch('certbot_postfix.util.certbot_util.exe_exists')
@mock.patch('certbot_postfix.util.plugins_util.path_surgery')
def test_successful_surgery(self, mock_exe_exists, mock_path_surgery):
mock_exe_exists.return_value = False
mock_path_surgery.return_value = True
self._call('foo')
class CheckCallTest(unittest.TestCase):
"""Tests for certbot_postfix.util.check_call."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot_postfix.util import check_call
return check_call(*args, **kwargs)
@mock.patch('certbot_postfix.util.logger')
@mock.patch('certbot_postfix.util.subprocess.check_call')
def test_failure(self, mock_check_call, mock_logger):
cmd = "postconf smtpd_use_tls=yes".split()
mock_check_call.side_effect = subprocess.CalledProcessError(42, cmd)
self.assertRaises(subprocess.CalledProcessError, self._call, cmd)
self.assertTrue(mock_logger.method_calls)
@mock.patch('certbot_postfix.util.subprocess.check_call')
def test_success(self, mock_check_call):
cmd = "postconf smtpd_use_tls=yes".split()
self._call(cmd)
mock_check_call.assert_called_once_with(cmd)
class CheckOutputTest(unittest.TestCase):
"""Tests for certbot_postfix.util.check_output."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot_postfix.util import check_output
return check_output(*args, **kwargs)
@mock.patch('certbot_postfix.util.logger')
@mock.patch('certbot_postfix.util.subprocess.Popen')
def test_command_error(self, mock_popen, mock_logger):
command = 'foo'
retcode = 42
output = 'bar'
mock_popen().communicate.return_value = (output, '')
mock_popen().poll.return_value = 42
self.assertRaises(subprocess.CalledProcessError, self._call, command)
log_args = mock_logger.debug.call_args[0]
self.assertTrue(command in log_args)
self.assertTrue(retcode in log_args)
self.assertTrue(output in log_args)
@mock.patch('certbot_postfix.util.subprocess.Popen')
def test_success(self, mock_popen):
command = 'foo'
output = 'bar'
mock_popen().communicate.return_value = (output, '')
mock_popen().poll.return_value = 0
self.assertEqual(self._call(command), output)
def test_stdout_error(self):
self.assertRaises(ValueError, self._call, stdout=None)
def test_universal_newlines_error(self):
self.assertRaises(ValueError, self._call, universal_newlines=False)
if __name__ == '__main__': # pragma: no cover
unittest.main()

1
certbot-postfix/docs/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/_build/

View File

@@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = certbot-postfix
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

View File

@@ -0,0 +1,8 @@
=================
API Documentation
=================
.. toctree::
:glob:
api/**

View File

@@ -0,0 +1,5 @@
:mod:`certbot_postfix.installer`
--------------------------------------
.. automodule:: certbot_postfix.installer
:members:

View File

@@ -0,0 +1,5 @@
:mod:`certbot_postfix.postconf`
-------------------------------
.. automodule:: certbot_postfix.postconf
:members:

View File

@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
# -- Project information -----------------------------------------------------
project = u'certbot-postfix'
copyright = u'2018, Certbot Project'
author = u'Certbot Project'
# The short X.Y version
version = u'0'
# The full version, including alpha/beta/rc tags
release = u'0'
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.coverage',
'sphinx.ext.viewcode',
]
autodoc_member_order = 'bysource'
autodoc_default_flags = ['show-inheritance', 'private-members']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = u'en'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
exclude_patterns = [u'_build', 'Thumbs.db', '.DS_Store']
default_role = 'py:obj'
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
# http://docs.readthedocs.org/en/latest/theme.html#how-do-i-use-this-locally-and-on-read-the-docs
# on_rtd is whether we are on readthedocs.org
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if not on_rtd: # only import and set the theme if we're building docs locally
import sphinx_rtd_theme
html_theme = 'sphinx_rtd_theme'
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# otherwise, readthedocs.org uses their theme by default, so no need to specify it
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
# html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'certbot-postfixdoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'certbot-postfix.tex', u'certbot-postfix Documentation',
u'Certbot Project', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'certbot-postfix', u'certbot-postfix Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'certbot-postfix', u'certbot-postfix Documentation',
author, 'certbot-postfix', 'One line description of project.',
'Miscellaneous'),
]
# -- Extension configuration -------------------------------------------------
# -- Options for intersphinx extension ---------------------------------------
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
}
# -- Options for todo extension ----------------------------------------------
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True

View File

@@ -0,0 +1,28 @@
.. certbot-postfix documentation master file, created by
sphinx-quickstart on Wed May 2 16:01:06 2018.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to certbot-postfix's documentation!
===========================================
.. toctree::
:maxdepth: 2
:caption: Contents:
.. automodule:: certbot_postfix
:members:
.. toctree::
:maxdepth: 1
api
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@@ -0,0 +1,36 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
set SPHINXPROJ=certbot-postfix
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
:end
popd

View File

@@ -0,0 +1,2 @@
acme[dev]==0.25.0
certbot[dev]==0.23.0

View File

@@ -1,21 +1,23 @@
import sys
from setuptools import setup
from setuptools import find_packages
version = '0.18.0.dev0'
version = '0.26.0.dev0'
install_requires = [
'acme=={0}'.format(version),
'certbot=={0}'.format(version),
# For pkg_resources. >=1.0 so pip resolves it to a version cryptography
# will tolerate; see #2599:
'setuptools>=1.0',
'acme>=0.25.0',
'certbot>0.23.0',
'setuptools',
'six',
'zope.component',
'zope.interface',
]
docs_extras = [
'Sphinx>=1.0', # autodoc_member_order = 'bysource', autodoc_default_flags
'sphinx_rtd_theme',
]
setup(
name='certbot-postfix',
version=version,
@@ -24,6 +26,7 @@ setup(
author="Certbot Project",
author_email='client-dev@letsencrypt.org',
license='Apache License 2.0',
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*',
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Plugins',
@@ -32,10 +35,8 @@ setup(
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
@@ -50,6 +51,9 @@ setup(
packages=find_packages(),
include_package_data=True,
install_requires=install_requires,
extras_require={
'docs': docs_extras,
},
entry_points={
'certbot.plugins': [
'postfix = certbot_postfix:Installer',

View File

@@ -64,7 +64,7 @@ RENEWER_DEFAULTS = dict(
"""Defaults for renewer script."""
ENHANCEMENTS = ["redirect", "http-header", "ocsp-stapling", "spdy"]
ENHANCEMENTS = ["redirect", "ensure-http-header", "ocsp-stapling", "spdy", "starttls-policy"]
"""List of possible :class:`certbot.interfaces.IInstaller`
enhancements.

View File

@@ -39,6 +39,7 @@ class PluginEntryPoint(object):
"certbot-dns-rfc2136",
"certbot-dns-route53",
"certbot-nginx",
"certbot-postfix",
]
"""Distributions for which prefix will be omitted."""