mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
Make Certbot runnable on Windows (#6296)
* Add and use a compatibility layer to allow certbot to be run on windows. * Fix path comparison * Corrections on compat and util for tests * Less intrusive way to parse prefix in webroot plugin working for both linux and windows. * Disable pylint import-error for some optional imports in compat.py * Ensure path is normalized before prefixes are generated in webroot plugin * Same prefixes in linux and windows, in fact root path is not needed in webroot plugin * Check that user has administrative rights before continuing on windows (necessary for symlink creation) * More straightforward way to test administrative rights on windows * Try to resolve import error in travis ci * OK. We go for full introspection to trick the ci. * Move the administrative rights control to the certbot entrypoint * Add comment for a really non trivial code. * Allow some commands to be run on a shell without admin rights * Avoid races conditions on windows for lock files * Add sphinx doc to the compat functions. * Remove irrelevant Windows error in the lock mechanism. * Some corrections on compat
This commit is contained in:
committed by
Brad Warren
parent
5d1c6d28d5
commit
85a859d63f
@@ -17,6 +17,7 @@ import zope.component
|
||||
from acme import fields as acme_fields
|
||||
from acme import messages
|
||||
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
@@ -140,7 +141,7 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
"""
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
util.make_or_verify_dir(config.accounts_dir, 0o700, os.geteuid(),
|
||||
util.make_or_verify_dir(config.accounts_dir, 0o700, compat.os_geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
def _account_dir_path(self, account_id):
|
||||
@@ -323,7 +324,7 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
|
||||
def _save(self, account, acme, regr_only):
|
||||
account_dir_path = self._account_dir_path(account.id)
|
||||
util.make_or_verify_dir(account_dir_path, 0o700, os.geteuid(),
|
||||
util.make_or_verify_dir(account_dir_path, 0o700, compat.os_geteuid(),
|
||||
self.config.strict_permissions)
|
||||
try:
|
||||
with open(self._regr_path(account_dir_path), "w") as regr_file:
|
||||
|
||||
@@ -8,6 +8,7 @@ import traceback
|
||||
import zope.component
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import compat
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
@@ -104,7 +105,7 @@ def lineage_for_certname(cli_config, certname):
|
||||
"""Find a lineage object with name certname."""
|
||||
configs_dir = cli_config.renewal_configs_dir
|
||||
# Verify the directory is there
|
||||
util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid())
|
||||
util.make_or_verify_dir(configs_dir, mode=0o755, uid=compat.os_geteuid())
|
||||
try:
|
||||
renewal_file = storage.renewal_file_for_certname(cli_config, certname)
|
||||
except errors.CertStorageError:
|
||||
@@ -374,7 +375,7 @@ def _search_lineages(cli_config, func, initial_rv, *args):
|
||||
"""
|
||||
configs_dir = cli_config.renewal_configs_dir
|
||||
# Verify the directory is there
|
||||
util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid())
|
||||
util.make_or_verify_dir(configs_dir, mode=0o755, uid=compat.os_geteuid())
|
||||
|
||||
rv = initial_rv
|
||||
for renewal_file in storage.renewal_conf_files(cli_config):
|
||||
|
||||
@@ -24,6 +24,7 @@ import certbot
|
||||
from certbot import account
|
||||
from certbot import auth_handler
|
||||
from certbot import cli
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import crypto_util
|
||||
from certbot import eff
|
||||
@@ -447,7 +448,7 @@ class Client(object):
|
||||
"""
|
||||
for path in cert_path, chain_path, fullchain_path:
|
||||
util.make_or_verify_dir(
|
||||
os.path.dirname(path), 0o755, os.geteuid(),
|
||||
os.path.dirname(path), 0o755, compat.os_geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
|
||||
|
||||
140
certbot/compat.py
Normal file
140
certbot/compat.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""
|
||||
Compatibility layer to run certbot both on Linux and Windows.
|
||||
|
||||
The approach used here is similar to Modernizr for Web browsers.
|
||||
We do not check the plateform type to determine if a particular logic is supported.
|
||||
Instead, we apply a logic, and then fallback to another logic if first logic
|
||||
is not supported at runtime.
|
||||
|
||||
Then logic chains are abstracted into single functions to be exposed to certbot.
|
||||
"""
|
||||
import os
|
||||
import select
|
||||
import sys
|
||||
import errno
|
||||
import ctypes
|
||||
|
||||
from certbot import errors
|
||||
|
||||
try:
|
||||
# Linux specific
|
||||
import fcntl # pylint: disable=import-error
|
||||
except ImportError:
|
||||
# Windows specific
|
||||
import msvcrt # pylint: disable=import-error
|
||||
|
||||
UNPRIVILEGED_SUBCOMMANDS_ALLOWED = [
|
||||
'certificates', 'enhance', 'revoke', 'delete',
|
||||
'register', 'unregister', 'config_changes', 'plugins']
|
||||
def raise_for_non_administrative_windows_rights(subcommand):
|
||||
"""
|
||||
On Windows, raise if current shell does not have the administrative rights.
|
||||
Do nothing on Linux.
|
||||
|
||||
:param str subcommand: The subcommand (like 'certonly') passed to the certbot client.
|
||||
|
||||
:raises .errors.Error: If the provided subcommand must be run on a shell with
|
||||
administrative rights, and current shell does not have these rights.
|
||||
|
||||
"""
|
||||
# Why not simply try ctypes.windll.shell32.IsUserAnAdmin() and catch AttributeError ?
|
||||
# Because windll exists only on a Windows runtime, and static code analysis engines
|
||||
# do not like at all non existent objects when run from Linux (even if we handle properly
|
||||
# all the cases in the code).
|
||||
# So we access windll only by reflection to trick theses engines.
|
||||
if hasattr(ctypes, 'windll') and subcommand not in UNPRIVILEGED_SUBCOMMANDS_ALLOWED:
|
||||
windll = getattr(ctypes, 'windll')
|
||||
if windll.shell32.IsUserAnAdmin() == 0:
|
||||
raise errors.Error(
|
||||
'Error, "{0}" subcommand must be run on a shell with administrative rights.'
|
||||
.format(subcommand))
|
||||
|
||||
def os_geteuid():
|
||||
"""
|
||||
Get current user uid
|
||||
|
||||
:returns: The current user uid.
|
||||
:rtype: int
|
||||
|
||||
"""
|
||||
try:
|
||||
# Linux specific
|
||||
return os.geteuid()
|
||||
except AttributeError:
|
||||
# Windows specific
|
||||
return 0
|
||||
|
||||
def readline_with_timeout(timeout, prompt):
|
||||
"""
|
||||
Read user input to return the first line entered, or raise after specified timeout.
|
||||
|
||||
:param float timeout: The timeout in seconds given to the user.
|
||||
:param str prompt: The prompt message to display to the user.
|
||||
|
||||
:returns: The first line entered by the user.
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
try:
|
||||
# Linux specific
|
||||
#
|
||||
# Call to select can only be done like this on UNIX
|
||||
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
|
||||
if not rlist:
|
||||
raise errors.Error(
|
||||
"Timed out waiting for answer to prompt '{0}'".format(prompt))
|
||||
return rlist[0].readline()
|
||||
except OSError:
|
||||
# Windows specific
|
||||
#
|
||||
# No way with select to make a timeout to the user input on Windows,
|
||||
# as select only supports socket in this case.
|
||||
# So no timeout on Windows for now.
|
||||
return sys.stdin.readline()
|
||||
|
||||
def lock_file(fd):
|
||||
"""
|
||||
Lock the file linked to the specified file descriptor.
|
||||
|
||||
:param int fd: The file descriptor of the file to lock.
|
||||
|
||||
"""
|
||||
if 'fcntl' in sys.modules:
|
||||
# Linux specific
|
||||
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
else:
|
||||
# Windows specific
|
||||
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
|
||||
|
||||
def release_locked_file(fd, path):
|
||||
"""
|
||||
Remove, close, and release a lock file specified by its file descriptor and its path.
|
||||
|
||||
:param int fd: The file descriptor of the lock file.
|
||||
:param str path: The path of the lock file.
|
||||
|
||||
"""
|
||||
# Linux specific
|
||||
#
|
||||
# It is important the lock file is removed before it's released,
|
||||
# otherwise:
|
||||
#
|
||||
# process A: open lock file
|
||||
# process B: release lock file
|
||||
# process A: lock file
|
||||
# process A: check device and inode
|
||||
# process B: delete file
|
||||
# process C: open and lock a different file at the same path
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError as err:
|
||||
if err.errno == errno.EACCES:
|
||||
# Windows specific
|
||||
# We will not be able to remove a file before closing it.
|
||||
# To avoid race conditions described for Linux, we will not delete the lockfile,
|
||||
# just close it to be reused on the next Certbot call.
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
os.close(fd)
|
||||
@@ -25,6 +25,7 @@ from OpenSSL import SSL # type: ignore
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
from acme.magic_typing import IO # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import compat
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
@@ -60,7 +61,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"):
|
||||
|
||||
config = zope.component.getUtility(interfaces.IConfig)
|
||||
# Save file
|
||||
util.make_or_verify_dir(key_dir, 0o700, os.geteuid(),
|
||||
util.make_or_verify_dir(key_dir, 0o700, compat.os_geteuid(),
|
||||
config.strict_permissions)
|
||||
key_f, key_path = util.unique_file(
|
||||
os.path.join(key_dir, keyname), 0o600, "wb")
|
||||
@@ -91,7 +92,7 @@ def init_save_csr(privkey, names, path):
|
||||
privkey.pem, names, must_staple=config.must_staple)
|
||||
|
||||
# Save CSR
|
||||
util.make_or_verify_dir(path, 0o755, os.geteuid(),
|
||||
util.make_or_verify_dir(path, 0o755, compat.os_geteuid(),
|
||||
config.strict_permissions)
|
||||
csr_f, csr_filename = util.unique_file(
|
||||
os.path.join(path, "csr-certbot.pem"), 0o644, "wb")
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
"""Certbot display."""
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
import sys
|
||||
import textwrap
|
||||
|
||||
import zope.interface
|
||||
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import interfaces
|
||||
from certbot import errors
|
||||
@@ -79,13 +79,8 @@ def input_with_timeout(prompt=None, timeout=36000.0):
|
||||
sys.stdout.write(prompt)
|
||||
sys.stdout.flush()
|
||||
|
||||
# select can only be used like this on UNIX
|
||||
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
|
||||
if not rlist:
|
||||
raise errors.Error(
|
||||
"Timed out waiting for answer to prompt '{0}'".format(prompt))
|
||||
line = compat.readline_with_timeout(timeout, prompt)
|
||||
|
||||
line = rlist[0].readline()
|
||||
if not line:
|
||||
raise EOFError
|
||||
return line.rstrip('\n')
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""Implements file locks for locking files and directories in UNIX."""
|
||||
import errno
|
||||
import fcntl
|
||||
import logging
|
||||
import os
|
||||
|
||||
from certbot import compat
|
||||
from certbot import errors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -74,7 +74,7 @@ class LockFile(object):
|
||||
|
||||
"""
|
||||
try:
|
||||
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
compat.lock_file(fd)
|
||||
except IOError as err:
|
||||
if err.errno in (errno.EACCES, errno.EAGAIN):
|
||||
logger.debug(
|
||||
@@ -118,22 +118,7 @@ class LockFile(object):
|
||||
|
||||
def release(self):
|
||||
"""Remove, close, and release the lock file."""
|
||||
# It is important the lock file is removed before it's released,
|
||||
# otherwise:
|
||||
#
|
||||
# process A: open lock file
|
||||
# process B: release lock file
|
||||
# process A: lock file
|
||||
# process A: check device and inode
|
||||
# process B: delete file
|
||||
# process C: open and lock a different file at the same path
|
||||
#
|
||||
# Calling os.remove on a file that's in use doesn't work on
|
||||
# Windows, but neither does locking with fcntl.
|
||||
try:
|
||||
os.remove(self._path)
|
||||
compat.release_locked_file(self._fd, self._path)
|
||||
finally:
|
||||
try:
|
||||
os.close(self._fd)
|
||||
finally:
|
||||
self._fd = None
|
||||
self._fd = None
|
||||
|
||||
@@ -23,6 +23,7 @@ import traceback
|
||||
|
||||
from acme import messages
|
||||
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
@@ -133,7 +134,7 @@ def setup_log_file_handler(config, logfile, fmt):
|
||||
# TODO: logs might contain sensitive data such as contents of the
|
||||
# private key! #525
|
||||
util.set_up_core_dir(
|
||||
config.logs_dir, 0o700, os.geteuid(), config.strict_permissions)
|
||||
config.logs_dir, 0o700, compat.os_geteuid(), config.strict_permissions)
|
||||
log_file_path = os.path.join(config.logs_dir, logfile)
|
||||
try:
|
||||
handler = logging.handlers.RotatingFileHandler(
|
||||
|
||||
@@ -19,6 +19,7 @@ from certbot import account
|
||||
from certbot import cert_manager
|
||||
from certbot import cli
|
||||
from certbot import client
|
||||
from certbot import compat
|
||||
from certbot import configuration
|
||||
from certbot import constants
|
||||
from certbot import crypto_util
|
||||
@@ -1289,16 +1290,16 @@ def make_or_verify_needed_dirs(config):
|
||||
|
||||
"""
|
||||
util.set_up_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE,
|
||||
os.geteuid(), config.strict_permissions)
|
||||
compat.os_geteuid(), config.strict_permissions)
|
||||
util.set_up_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE,
|
||||
os.geteuid(), config.strict_permissions)
|
||||
compat.os_geteuid(), config.strict_permissions)
|
||||
|
||||
hook_dirs = (config.renewal_pre_hooks_dir,
|
||||
config.renewal_deploy_hooks_dir,
|
||||
config.renewal_post_hooks_dir,)
|
||||
for hook_dir in hook_dirs:
|
||||
util.make_or_verify_dir(hook_dir,
|
||||
uid=os.geteuid(),
|
||||
uid=compat.os_geteuid(),
|
||||
strict=config.strict_permissions)
|
||||
|
||||
|
||||
@@ -1333,6 +1334,7 @@ def main(cli_args=sys.argv[1:]):
|
||||
:raises errors.Error: error if plugin command is not supported
|
||||
|
||||
"""
|
||||
|
||||
log.pre_arg_parse_setup()
|
||||
|
||||
plugins = plugins_disco.PluginsRegistry.find_all()
|
||||
@@ -1346,6 +1348,10 @@ def main(cli_args=sys.argv[1:]):
|
||||
config = configuration.NamespaceConfig(args)
|
||||
zope.component.provideUtility(config)
|
||||
|
||||
# On windows, shell without administrative right cannot create symlinks required by certbot.
|
||||
# So we check the rights before continuing.
|
||||
compat.raise_for_non_administrative_windows_rights(config.verb)
|
||||
|
||||
try:
|
||||
log.post_arg_parse_setup(config)
|
||||
make_or_verify_needed_dirs(config)
|
||||
|
||||
@@ -9,18 +9,19 @@ logger = logging.getLogger(__name__)
|
||||
def get_prefixes(path):
|
||||
"""Retrieves all possible path prefixes of a path, in descending order
|
||||
of length. For instance,
|
||||
/a/b/c/ => ['/a/b/c/', '/a/b/c', '/a/b', '/a', '/']
|
||||
(linux) /a/b/c returns ['/a/b/c', '/a/b', '/a', '/']
|
||||
(windows) C:\\a\\b\\c returns ['C:\\a\\b\\c', 'C:\\a\\b', 'C:\\a', 'C:']
|
||||
:param str path: the path to break into prefixes
|
||||
|
||||
:returns: all possible path prefixes of given path in descending order
|
||||
:rtype: `list` of `str`
|
||||
"""
|
||||
prefix = path
|
||||
prefix = os.path.normpath(path)
|
||||
prefixes = []
|
||||
while len(prefix) > 0:
|
||||
prefixes.append(prefix)
|
||||
prefix, _ = os.path.split(prefix)
|
||||
# break once we hit '/'
|
||||
# break once we hit the root path
|
||||
if prefix == prefixes[-1]:
|
||||
break
|
||||
return prefixes
|
||||
|
||||
@@ -9,9 +9,9 @@ class GetPrefixTest(unittest.TestCase):
|
||||
"""Tests for certbot.plugins.get_prefixes."""
|
||||
def test_get_prefix(self):
|
||||
from certbot.plugins.util import get_prefixes
|
||||
self.assertEqual(get_prefixes("/a/b/c/"), ['/a/b/c/', '/a/b/c', '/a/b', '/a', '/'])
|
||||
self.assertEqual(get_prefixes("/"), ["/"])
|
||||
self.assertEqual(get_prefixes("a"), ["a"])
|
||||
self.assertEqual(get_prefixes('/a/b/c'), ['/a/b/c', '/a/b', '/a', '/'])
|
||||
self.assertEqual(get_prefixes('/'), ['/'])
|
||||
self.assertEqual(get_prefixes('a'), ['a'])
|
||||
|
||||
class PathSurgeryTest(unittest.TestCase):
|
||||
"""Tests for certbot.plugins.path_surgery."""
|
||||
|
||||
@@ -170,7 +170,9 @@ to serve all files under specified web root ({0})."""
|
||||
old_umask = os.umask(0o022)
|
||||
try:
|
||||
stat_path = os.stat(path)
|
||||
for prefix in sorted(util.get_prefixes(self.full_roots[name]), key=len):
|
||||
# We ignore the last prefix in the next iteration,
|
||||
# as it does not correspond to a folder path ('/' or 'C:')
|
||||
for prefix in sorted(util.get_prefixes(self.full_roots[name])[:-1], key=len):
|
||||
try:
|
||||
# This is coupled with the "umask" call above because
|
||||
# os.mkdir's "mode" parameter may not always work:
|
||||
@@ -180,7 +182,7 @@ to serve all files under specified web root ({0})."""
|
||||
# Set owner as parent directory if possible
|
||||
try:
|
||||
os.chown(prefix, stat_path.st_uid, stat_path.st_gid)
|
||||
except OSError as exception:
|
||||
except (OSError, AttributeError) as exception:
|
||||
logger.info("Unable to change owner and uid of webroot directory")
|
||||
logger.debug("Error was: %s", exception)
|
||||
except OSError as exception:
|
||||
|
||||
@@ -10,6 +10,7 @@ import traceback
|
||||
import six
|
||||
import zope.component
|
||||
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
@@ -65,7 +66,7 @@ class Reverter(object):
|
||||
self.config = config
|
||||
|
||||
util.make_or_verify_dir(
|
||||
config.backup_dir, constants.CONFIG_DIRS_MODE, os.geteuid(),
|
||||
config.backup_dir, constants.CONFIG_DIRS_MODE, compat.os_geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
def revert_temporary_config(self):
|
||||
@@ -219,7 +220,7 @@ class Reverter(object):
|
||||
|
||||
"""
|
||||
util.make_or_verify_dir(
|
||||
cp_dir, constants.CONFIG_DIRS_MODE, os.geteuid(),
|
||||
cp_dir, constants.CONFIG_DIRS_MODE, compat.os_geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
op_fd, existing_filepaths = self._read_and_append(
|
||||
@@ -433,7 +434,7 @@ class Reverter(object):
|
||||
cp_dir = self.config.in_progress_dir
|
||||
|
||||
util.make_or_verify_dir(
|
||||
cp_dir, constants.CONFIG_DIRS_MODE, os.geteuid(),
|
||||
cp_dir, constants.CONFIG_DIRS_MODE, compat.os_geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
return cp_dir
|
||||
|
||||
@@ -48,18 +48,23 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
|
||||
mock_constants.TEMP_CHECKPOINT_DIR = 't'
|
||||
|
||||
self.assertEqual(
|
||||
self.config.accounts_dir, os.path.join(
|
||||
self.config.config_dir, 'acc/acme-server.org:443/new'))
|
||||
os.path.normpath(self.config.accounts_dir),
|
||||
os.path.normpath(os.path.join(self.config.config_dir, 'acc/acme-server.org:443/new')))
|
||||
self.assertEqual(
|
||||
self.config.backup_dir, os.path.join(self.config.work_dir, 'backups'))
|
||||
os.path.normpath(self.config.backup_dir),
|
||||
os.path.normpath(os.path.join(self.config.work_dir, 'backups')))
|
||||
self.assertEqual(
|
||||
self.config.csr_dir, os.path.join(self.config.config_dir, 'csr'))
|
||||
os.path.normpath(self.config.csr_dir),
|
||||
os.path.normpath(os.path.join(self.config.config_dir, 'csr')))
|
||||
self.assertEqual(
|
||||
self.config.in_progress_dir, os.path.join(self.config.work_dir, '../p'))
|
||||
os.path.normpath(self.config.in_progress_dir),
|
||||
os.path.normpath(os.path.join(self.config.work_dir, '../p')))
|
||||
self.assertEqual(
|
||||
self.config.key_dir, os.path.join(self.config.config_dir, 'keys'))
|
||||
os.path.normpath(self.config.key_dir),
|
||||
os.path.normpath(os.path.join(self.config.config_dir, 'keys')))
|
||||
self.assertEqual(
|
||||
self.config.temp_checkpoint_dir, os.path.join(self.config.work_dir, 't'))
|
||||
os.path.normpath(self.config.temp_checkpoint_dir),
|
||||
os.path.normpath(os.path.join(self.config.work_dir, 't')))
|
||||
|
||||
def test_absolute_paths(self):
|
||||
from certbot.configuration import NamespaceConfig
|
||||
|
||||
@@ -34,7 +34,7 @@ class InputWithTimeoutTest(unittest.TestCase):
|
||||
def test_input(self, prompt=None):
|
||||
expected = "foo bar"
|
||||
stdin = six.StringIO(expected + "\n")
|
||||
with mock.patch("certbot.display.util.select.select") as mock_select:
|
||||
with mock.patch("certbot.compat.select.select") as mock_select:
|
||||
mock_select.return_value = ([stdin], [], [],)
|
||||
self.assertEqual(self._call(prompt), expected)
|
||||
|
||||
@@ -321,11 +321,7 @@ class FileOutputDisplayTest(unittest.TestCase):
|
||||
|
||||
|
||||
class NoninteractiveDisplayTest(unittest.TestCase):
|
||||
"""Test non-interactive display.
|
||||
|
||||
These tests are pretty easy!
|
||||
|
||||
"""
|
||||
"""Test non-interactive display. These tests are pretty easy!"""
|
||||
def setUp(self):
|
||||
super(NoninteractiveDisplayTest, self).setUp()
|
||||
self.mock_stdout = mock.MagicMock()
|
||||
|
||||
@@ -89,7 +89,7 @@ class LockFileTest(test_util.TempDirTestCase):
|
||||
lock_file.release()
|
||||
self.assertFalse(os.path.exists(self.lock_path))
|
||||
|
||||
@mock.patch('certbot.lock.fcntl.lockf')
|
||||
@mock.patch('certbot.compat.fcntl.lockf')
|
||||
def test_unexpected_lockf_err(self, mock_lockf):
|
||||
msg = 'hi there'
|
||||
mock_lockf.side_effect = IOError(msg)
|
||||
|
||||
@@ -19,6 +19,7 @@ from six.moves import reload_module # pylint: disable=import-error
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import account
|
||||
from certbot import cli
|
||||
from certbot import compat
|
||||
from certbot import constants
|
||||
from certbot import configuration
|
||||
from certbot import crypto_util
|
||||
@@ -1577,7 +1578,7 @@ class MakeOrVerifyNeededDirs(test_util.ConfigTestCase):
|
||||
for core_dir in (self.config.config_dir, self.config.work_dir,):
|
||||
mock_util.set_up_core_dir.assert_any_call(
|
||||
core_dir, constants.CONFIG_DIRS_MODE,
|
||||
os.geteuid(), self.config.strict_permissions
|
||||
compat.os_geteuid(), self.config.strict_permissions
|
||||
)
|
||||
|
||||
hook_dirs = (self.config.renewal_pre_hooks_dir,
|
||||
@@ -1586,7 +1587,7 @@ class MakeOrVerifyNeededDirs(test_util.ConfigTestCase):
|
||||
for hook_dir in hook_dirs:
|
||||
# default mode of 755 is used
|
||||
mock_util.make_or_verify_dir.assert_any_call(
|
||||
hook_dir, uid=os.geteuid(),
|
||||
hook_dir, uid=compat.os_geteuid(),
|
||||
strict=self.config.strict_permissions)
|
||||
|
||||
|
||||
|
||||
@@ -362,7 +362,6 @@ def lock_and_call(func, lock_path):
|
||||
child.join()
|
||||
assert child.exitcode == 0
|
||||
|
||||
|
||||
def hold_lock(cv, lock_path): # pragma: no cover
|
||||
"""Acquire a file lock at lock_path and wait to release it.
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import mock
|
||||
import six
|
||||
from six.moves import reload_module # pylint: disable=import-error
|
||||
|
||||
from certbot import compat
|
||||
from certbot import errors
|
||||
import certbot.tests.util as test_util
|
||||
|
||||
@@ -116,7 +117,7 @@ class SetUpCoreDirTest(test_util.TempDirTestCase):
|
||||
@mock.patch('certbot.util.lock_dir_until_exit')
|
||||
def test_success(self, mock_lock):
|
||||
new_dir = os.path.join(self.tempdir, 'new')
|
||||
self._call(new_dir, 0o700, os.geteuid(), False)
|
||||
self._call(new_dir, 0o700, compat.os_geteuid(), False)
|
||||
self.assertTrue(os.path.exists(new_dir))
|
||||
self.assertEqual(mock_lock.call_count, 1)
|
||||
|
||||
@@ -124,7 +125,7 @@ class SetUpCoreDirTest(test_util.TempDirTestCase):
|
||||
def test_failure(self, mock_make_or_verify):
|
||||
mock_make_or_verify.side_effect = OSError
|
||||
self.assertRaises(errors.Error, self._call,
|
||||
self.tempdir, 0o700, os.geteuid(), False)
|
||||
self.tempdir, 0o700, compat.os_geteuid(), False)
|
||||
|
||||
|
||||
class MakeOrVerifyDirTest(test_util.TempDirTestCase):
|
||||
|
||||
Reference in New Issue
Block a user