1
0
mirror of https://github.com/certbot/certbot.git synced 2025-08-06 16:42:41 +03:00

[Windows] Security model for files permissions - STEP 3a (#6964)

This PR implements the filesystem.chmod method from #6497.

* Implement filesystem.chmod

* Conditionally add pywin32 on setuptools versions that support environment markers.

* Update apache plugin requirements

* Use a try/except import approach similar to lock

* Add comments about well-known SIDs

* Add main command

* Call filesystem.chmod in tests, remove one test

* Add test for os module

* Update environment marker

* Ensure we are not building wheels using an old version of setuptools

* Added a link to list of NTFS rights

* Simplify sid comparison

* Enable coverage

* Sometimes, double-quote is the solution

* Add entrypoint

* Add unit tests to filesystem

* Resolve recursively the link, add doc

* Move imports to the top of the file

* Remove string conversion of the ACL, fix setup

* Ensure admins have all permissions

* Simplify dacl comparison

* Conditionally raise for windows temporary workaround

* Add a test to check filesystem.chown is protected against symlink loops
This commit is contained in:
Adrien Ferrand
2019-06-20 19:52:43 +02:00
committed by Brad Warren
parent 5078b58de9
commit e9bcaaa576
20 changed files with 421 additions and 29 deletions

View File

@@ -5,6 +5,7 @@ from acme.magic_typing import List, Set # pylint: disable=unused-import, no-nam
from certbot import errors from certbot import errors
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.plugins import common from certbot.plugins import common
from certbot_apache.obj import VirtualHost # pylint: disable=unused-import from certbot_apache.obj import VirtualHost # pylint: disable=unused-import
@@ -169,7 +170,7 @@ class ApacheHttp01(common.TLSSNI01):
def _set_up_challenges(self): def _set_up_challenges(self):
if not os.path.isdir(self.challenge_dir): if not os.path.isdir(self.challenge_dir):
os.makedirs(self.challenge_dir) os.makedirs(self.challenge_dir)
os.chmod(self.challenge_dir, 0o755) filesystem.chmod(self.challenge_dir, 0o755)
responses = [] responses = []
for achall in self.achalls: for achall in self.achalls:
@@ -185,7 +186,7 @@ class ApacheHttp01(common.TLSSNI01):
self.configurator.reverter.register_file_creation(True, name) self.configurator.reverter.register_file_creation(True, name)
with open(name, 'wb') as f: with open(name, 'wb') as f:
f.write(validation.encode()) f.write(validation.encode())
os.chmod(name, 0o644) filesystem.chmod(name, 0o644)
return response return response

View File

@@ -16,6 +16,7 @@ from certbot import achallenges
from certbot import crypto_util from certbot import crypto_util
from certbot import errors from certbot import errors
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.tests import acme_util from certbot.tests import acme_util
from certbot.tests import util as certbot_util from certbot.tests import util as certbot_util
@@ -1366,7 +1367,7 @@ class MultipleVhostsTest(util.ApacheTest):
self.config.parser.modules.add("mod_ssl.c") self.config.parser.modules.add("mod_ssl.c")
self.config.parser.modules.add("socache_shmcb_module") self.config.parser.modules.add("socache_shmcb_module")
tmp_path = os.path.realpath(tempfile.mkdtemp("vhostroot")) tmp_path = os.path.realpath(tempfile.mkdtemp("vhostroot"))
os.chmod(tmp_path, 0o755) filesystem.chmod(tmp_path, 0o755)
mock_p = "certbot_apache.configurator.ApacheConfigurator._get_ssl_vhost_path" mock_p = "certbot_apache.configurator.ApacheConfigurator._get_ssl_vhost_path"
mock_a = "certbot_apache.parser.ApacheParser.add_include" mock_a = "certbot_apache.parser.ApacheParser.add_include"

View File

@@ -1,3 +1,3 @@
# Remember to update setup.py to match the package versions below. # Remember to update setup.py to match the package versions below.
acme[dev]==0.29.0 acme[dev]==0.29.0
certbot[dev]==0.34.0 -e .[dev]

View File

@@ -10,7 +10,7 @@ version = '0.36.0.dev0'
# acme/certbot version. # acme/certbot version.
install_requires = [ install_requires = [
'acme>=0.29.0', 'acme>=0.29.0',
'certbot>=0.34.0', 'certbot>=0.36.0.dev0',
'mock', 'mock',
'python-augeas', 'python-augeas',
'setuptools', 'setuptools',

View File

@@ -2,6 +2,38 @@
from __future__ import absolute_import from __future__ import absolute_import
import os # pylint: disable=os-module-forbidden import os # pylint: disable=os-module-forbidden
import stat
try:
import ntsecuritycon # pylint: disable=import-error
import win32security # pylint: disable=import-error
except ImportError:
POSIX_MODE = True
else:
POSIX_MODE = False
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
def chmod(file_path, mode):
# type: (str, int) -> None
"""
Apply a POSIX mode on given file_path:
* for Linux, the POSIX mode will be directly applied using chmod,
* for Windows, the POSIX mode will be translated into a Windows DACL that make sense for
Certbot context, and applied to the file using kernel calls.
The definition of the Windows DACL that correspond to a POSIX mode, in the context of Certbot,
is explained at https://github.com/certbot/certbot/issues/6356 and is implemented by the
method _generate_windows_flags().
:param str file_path: Path of the file
:param int mode: POSIX mode to apply
"""
if POSIX_MODE:
os.chmod(file_path, mode)
else:
_apply_win_mode(file_path, mode)
def replace(src, dst): def replace(src, dst):
@@ -18,3 +50,126 @@ def replace(src, dst):
else: else:
# Otherwise, use os.rename() that behaves like os.replace() on Linux. # Otherwise, use os.rename() that behaves like os.replace() on Linux.
os.rename(src, dst) os.rename(src, dst)
def _apply_win_mode(file_path, mode):
"""
This function converts the given POSIX mode into a Windows ACL list, and applies it to the
file given its path. If the given path is a symbolic link, it will resolved to apply the
mode on the targeted file.
"""
original_path = file_path
inspected_paths = [] # type: List[str]
while os.path.islink(file_path):
link_path = file_path
file_path = os.readlink(file_path)
if not os.path.isabs(file_path):
file_path = os.path.join(os.path.dirname(link_path), file_path)
if file_path in inspected_paths:
raise RuntimeError('Error, link {0} is a loop!'.format(original_path))
inspected_paths.append(file_path)
# Get owner sid of the file
security = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION)
user = security.GetSecurityDescriptorOwner()
# New DACL, that will overwrite existing one (including inherited permissions)
dacl = _generate_dacl(user, mode)
# Apply the new DACL
security.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(file_path, win32security.DACL_SECURITY_INFORMATION, security)
def _generate_dacl(user_sid, mode):
analysis = _analyze_mode(mode)
# Get standard accounts from "well-known" sid
# See the list here:
# https://support.microsoft.com/en-us/help/243330/well-known-security-identifiers-in-windows-operating-systems
system = win32security.ConvertStringSidToSid('S-1-5-18')
admins = win32security.ConvertStringSidToSid('S-1-5-32-544')
everyone = win32security.ConvertStringSidToSid('S-1-1-0')
# New dacl, without inherited permissions
dacl = win32security.ACL()
# If user is already system or admins, any ACE defined here would be superseded by
# the full control ACE that will be added after.
if user_sid not in [system, admins]:
# Handle user rights
user_flags = _generate_windows_flags(analysis['user'])
if user_flags:
dacl.AddAccessAllowedAce(win32security.ACL_REVISION, user_flags, user_sid)
# Handle everybody rights
everybody_flags = _generate_windows_flags(analysis['all'])
if everybody_flags:
dacl.AddAccessAllowedAce(win32security.ACL_REVISION, everybody_flags, everyone)
# Handle administrator rights
full_permissions = _generate_windows_flags({'read': True, 'write': True, 'execute': True})
dacl.AddAccessAllowedAce(win32security.ACL_REVISION, full_permissions, system)
dacl.AddAccessAllowedAce(win32security.ACL_REVISION, full_permissions, admins)
return dacl
def _analyze_mode(mode):
return {
'user': {
'read': mode & stat.S_IRUSR,
'write': mode & stat.S_IWUSR,
'execute': mode & stat.S_IXUSR,
},
'all': {
'read': mode & stat.S_IROTH,
'write': mode & stat.S_IWOTH,
'execute': mode & stat.S_IXOTH,
},
}
def _generate_windows_flags(rights_desc):
# Some notes about how each POSIX right is interpreted.
#
# For the rights read and execute, we have a pretty bijective relation between
# POSIX flags and their generic counterparts on Windows, so we use them directly
# (respectively ntsecuritycon.FILE_GENERIC_READ and ntsecuritycon.FILE_GENERIC_EXECUTE).
#
# But ntsecuritycon.FILE_GENERIC_WRITE does not correspond to what one could expect from a
# write access on Linux: for Windows, FILE_GENERIC_WRITE does not include delete, move or
# rename. This is something that requires ntsecuritycon.FILE_ALL_ACCESS.
# So to reproduce the write right as POSIX, we will apply ntsecuritycon.FILE_ALL_ACCESS
# substracted of the rights corresponding to POSIX read and POSIX execute.
#
# Finally, having read + write + execute gives a ntsecuritycon.FILE_ALL_ACCESS,
# so a "Full Control" on the file.
#
# A complete list of the rights defined on NTFS can be found here:
# https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-server-2003/cc783530(v=ws.10)#permissions-for-files-and-folders
flag = 0
if rights_desc['read']:
flag = flag | ntsecuritycon.FILE_GENERIC_READ
if rights_desc['write']:
flag = flag | (ntsecuritycon.FILE_ALL_ACCESS
^ ntsecuritycon.FILE_GENERIC_READ
^ ntsecuritycon.FILE_GENERIC_EXECUTE
# Despite bit `512` being present in ntsecuritycon.FILE_ALL_ACCESS, it is
# not effectively applied to the file or the directory.
# As _generate_windows_flags is also used to compare two dacls, we remove
# it right now to have flags that contain only the bits effectively applied
# by Windows.
^ 512)
if rights_desc['execute']:
flag = flag | ntsecuritycon.FILE_GENERIC_EXECUTE
return flag
def _compare_dacls(dacl1, dacl2):
"""
This method compare the two given DACLs to check if they are identical.
Identical means here that they contains the same set of ACEs in the same order.
"""
return ([dacl1.GetAce(index) for index in range(0, dacl1.GetAceCount())] ==
[dacl2.GetAce(index) for index in range(0, dacl2.GetAceCount())])

View File

@@ -32,6 +32,25 @@ std_sys.modules[__name__ + '.path'] = path
del ourselves, std_os, std_sys del ourselves, std_os, std_sys
# Chmod is the root of all evil for our security model on Windows. With the default implementation
# of os.chmod on Windows, almost all bits on mode will be ignored, and only a general RO or RW will
# be applied. The DACL, the inner mechanism to control file access on Windows, will stay on its
# default definition, giving effectively at least read permissions to any one, as the default
# permissions on root path will be inherit by the file (as NTFS state), and root path can be read
# by anyone. So the given mode needs to be translated into a secured and not inherited DACL that
# will be applied to this file using filesystem.chmod, calling internally the win32security
# module to construct and apply the DACL. Complete security model to translate a POSIX mode into
# a suitable DACL on Windows for Certbot can be found here:
# https://github.com/certbot/certbot/issues/6356
# Basically, it states that appropriate permissions will be set for the owner, nothing for the
# group, appropriate permissions for the "Everyone" group, and all permissions to the
# "Administrators" group + "System" user, as they can do everything anyway.
def chmod(*unused_args, **unused_kwargs): # pylint: disable=function-redefined
"""Method os.chmod() is forbidden"""
raise RuntimeError('Usage of os.chmod() is forbidden. '
'Use certbot.compat.filesystem.chmod() instead.')
# Because of the blocking strategy on file handlers on Windows, rename does not behave as expected # Because of the blocking strategy on file handlers on Windows, rename does not behave as expected
# with POSIX systems: an exception will be raised if dst already exists. # with POSIX systems: an exception will be raised if dst already exists.
def rename(*unused_args, **unused_kwargs): def rename(*unused_args, **unused_kwargs):

View File

@@ -20,6 +20,7 @@ from certbot import interfaces
from certbot import reverter from certbot import reverter
from certbot import util from certbot import util
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.plugins.storage import PluginStorage from certbot.plugins.storage import PluginStorage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -482,9 +483,9 @@ def dir_setup(test_dir, pkg): # pragma: no cover
config_dir = expanded_tempdir("config") config_dir = expanded_tempdir("config")
work_dir = expanded_tempdir("work") work_dir = expanded_tempdir("work")
os.chmod(temp_dir, constants.CONFIG_DIRS_MODE) filesystem.chmod(temp_dir, constants.CONFIG_DIRS_MODE)
os.chmod(config_dir, constants.CONFIG_DIRS_MODE) filesystem.chmod(config_dir, constants.CONFIG_DIRS_MODE)
os.chmod(work_dir, constants.CONFIG_DIRS_MODE) filesystem.chmod(work_dir, constants.CONFIG_DIRS_MODE)
test_configs = pkg_resources.resource_filename( test_configs = pkg_resources.resource_filename(
pkg, os.path.join("testdata", test_dir)) pkg, os.path.join("testdata", test_dir))

View File

@@ -8,7 +8,7 @@ import six
from acme import challenges from acme import challenges
from certbot import achallenges from certbot import achallenges
from certbot.compat import os from certbot.compat import filesystem
from certbot.tests import acme_util from certbot.tests import acme_util
from certbot.tests import util as test_util from certbot.tests import util as test_util
@@ -60,4 +60,4 @@ def write(values, path):
with open(path, "wb") as f: with open(path, "wb") as f:
config.write(outfile=f) config.write(outfile=f)
os.chmod(path, 0o600) filesystem.chmod(path, 0o600)

View File

@@ -19,6 +19,7 @@ from certbot import achallenges
from certbot import errors from certbot import errors
from certbot.compat import misc from certbot.compat import misc
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.display import util as display_util from certbot.display import util as display_util
from certbot.tests import acme_util from certbot.tests import acme_util
from certbot.tests import util as test_util from certbot.tests import util as test_util
@@ -132,14 +133,14 @@ class AuthenticatorTest(unittest.TestCase):
permission_canary = os.path.join(self.path, "rnd") permission_canary = os.path.join(self.path, "rnd")
with open(permission_canary, "w") as f: with open(permission_canary, "w") as f:
f.write("thingimy") f.write("thingimy")
os.chmod(self.path, 0o000) filesystem.chmod(self.path, 0o000)
try: try:
open(permission_canary, "r") open(permission_canary, "r")
print("Warning, running tests as root skips permissions tests...") print("Warning, running tests as root skips permissions tests...")
except IOError: except IOError:
# ok, permissions work, test away... # ok, permissions work, test away...
self.assertRaises(errors.PluginError, self.auth.perform, []) self.assertRaises(errors.PluginError, self.auth.perform, [])
os.chmod(self.path, 0o700) filesystem.chmod(self.path, 0o700)
@test_util.skip_on_windows('On Windows, there is no chown.') @test_util.skip_on_windows('On Windows, there is no chown.')
@mock.patch("certbot.plugins.webroot.os.chown") @mock.patch("certbot.plugins.webroot.os.chown")

View File

@@ -143,7 +143,7 @@ def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_d
# Copy permissions from the old version of the file, if it exists. # Copy permissions from the old version of the file, if it exists.
if os.path.exists(o_filename): if os.path.exists(o_filename):
current_permissions = stat.S_IMODE(os.lstat(o_filename).st_mode) current_permissions = stat.S_IMODE(os.lstat(o_filename).st_mode)
os.chmod(n_filename, current_permissions) filesystem.chmod(n_filename, current_permissions)
with open(n_filename, "wb") as f: with open(n_filename, "wb") as f:
config.write(outfile=f) config.write(outfile=f)
@@ -1110,7 +1110,7 @@ class RenewableCert(object):
stat.S_IROTH) stat.S_IROTH)
mode = BASE_PRIVKEY_MODE | old_mode mode = BASE_PRIVKEY_MODE | old_mode
os.chown(target["privkey"], -1, os.stat(old_privkey).st_gid) os.chown(target["privkey"], -1, os.stat(old_privkey).st_gid)
os.chmod(target["privkey"], mode) filesystem.chmod(target["privkey"], mode)
# Save everything else # Save everything else
with open(target["cert"], "wb") as f: with open(target["cert"], "wb") as f:

View File

@@ -12,6 +12,7 @@ import certbot.tests.util as test_util
from certbot import account from certbot import account
from certbot import errors from certbot import errors
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot import util from certbot import util
KEY = test_util.load_vector("rsa512_key.pem") KEY = test_util.load_vector("rsa512_key.pem")
@@ -423,7 +424,7 @@ class ClientTest(ClientTestCommon):
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
certs = ["cert_512.pem", "cert-san_512.pem"] certs = ["cert_512.pem", "cert-san_512.pem"]
tmp_path = tempfile.mkdtemp() tmp_path = tempfile.mkdtemp()
os.chmod(tmp_path, 0o755) # TODO: really?? filesystem.chmod(tmp_path, 0o755) # TODO: really??
cert_pem = test_util.load_vector(certs[0]) cert_pem = test_util.load_vector(certs[0])
chain_pem = (test_util.load_vector(certs[0]) + test_util.load_vector(certs[1])) chain_pem = (test_util.load_vector(certs[0]) + test_util.load_vector(certs[1]))

View File

@@ -1,7 +1,157 @@
"""Tests for certbot.compat.filesystem""" """Tests for certbot.compat.filesystem"""
import unittest
try:
import win32api # pylint: disable=import-error
import win32security # pylint: disable=import-error
import ntsecuritycon # pylint: disable=import-error
POSIX_MODE = False
except ImportError:
POSIX_MODE = True
import certbot.tests.util as test_util import certbot.tests.util as test_util
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem from certbot.compat import filesystem
from certbot.tests.util import TempDirTestCase
EVERYBODY_SID = 'S-1-1-0'
SYSTEM_SID = 'S-1-5-18'
ADMINS_SID = 'S-1-5-32-544'
@unittest.skipIf(POSIX_MODE, reason='Test specific to Windows security')
class WindowsChmodTests(TempDirTestCase):
"""Unit tests for Windows chmod function in filesystem module"""
def setUp(self):
super(WindowsChmodTests, self).setUp()
self.probe_path = _create_probe(self.tempdir)
def test_symlink_resolution(self):
link_path = os.path.join(self.tempdir, 'link')
os.symlink(self.probe_path, link_path)
ref_dacl_probe = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
ref_dacl_link = _get_security_dacl(link_path).GetSecurityDescriptorDacl()
filesystem.chmod(link_path, 0o700)
# Assert the real file is impacted, not the link.
cur_dacl_probe = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
cur_dacl_link = _get_security_dacl(link_path).GetSecurityDescriptorDacl()
self.assertFalse(filesystem._compare_dacls(ref_dacl_probe, cur_dacl_probe)) # pylint: disable=protected-access
self.assertTrue(filesystem._compare_dacls(ref_dacl_link, cur_dacl_link)) # pylint: disable=protected-access
def test_symlink_loop_mitigation(self):
link1_path = os.path.join(self.tempdir, 'link1')
link2_path = os.path.join(self.tempdir, 'link2')
link3_path = os.path.join(self.tempdir, 'link3')
os.symlink(link1_path, link2_path)
os.symlink(link2_path, link3_path)
os.symlink(link3_path, link1_path)
with self.assertRaises(RuntimeError) as error:
filesystem.chmod(link1_path, 0o755)
self.assertTrue('link1 is a loop!' in str(error.exception))
def test_world_permission(self):
everybody = win32security.ConvertStringSidToSid(EVERYBODY_SID)
filesystem.chmod(self.probe_path, 0o700)
dacl = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
self.assertFalse([dacl.GetAce(index) for index in range(0, dacl.GetAceCount())
if dacl.GetAce(index)[2] == everybody])
filesystem.chmod(self.probe_path, 0o704)
dacl = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
self.assertTrue([dacl.GetAce(index) for index in range(0, dacl.GetAceCount())
if dacl.GetAce(index)[2] == everybody])
def test_group_permissions_noop(self):
filesystem.chmod(self.probe_path, 0o700)
ref_dacl_probe = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
filesystem.chmod(self.probe_path, 0o740)
cur_dacl_probe = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
self.assertTrue(filesystem._compare_dacls(ref_dacl_probe, cur_dacl_probe)) # pylint: disable=protected-access
def test_admin_permissions(self):
system = win32security.ConvertStringSidToSid(SYSTEM_SID)
admins = win32security.ConvertStringSidToSid(ADMINS_SID)
filesystem.chmod(self.probe_path, 0o400)
dacl = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
system_aces = [dacl.GetAce(index) for index in range(0, dacl.GetAceCount())
if dacl.GetAce(index)[2] == system]
admin_aces = [dacl.GetAce(index) for index in range(0, dacl.GetAceCount())
if dacl.GetAce(index)[2] == admins]
self.assertEqual(len(system_aces), 1)
self.assertEqual(len(admin_aces), 1)
self.assertEqual(system_aces[0][1], ntsecuritycon.FILE_ALL_ACCESS ^ 512)
self.assertEqual(admin_aces[0][1], ntsecuritycon.FILE_ALL_ACCESS ^ 512)
def test_read_flag(self):
self._test_flag(4, ntsecuritycon.FILE_GENERIC_READ)
def test_execute_flag(self):
self._test_flag(1, ntsecuritycon.FILE_GENERIC_EXECUTE)
def test_write_flag(self):
self._test_flag(2, (ntsecuritycon.FILE_ALL_ACCESS
^ ntsecuritycon.FILE_GENERIC_READ
^ ntsecuritycon.FILE_GENERIC_EXECUTE
^ 512))
def test_full_flag(self):
self._test_flag(7, (ntsecuritycon.FILE_ALL_ACCESS
^ 512))
def _test_flag(self, everyone_mode, windows_flag):
# Note that flag is tested against `everyone`, not `user`, because practically these unit
# tests are executed with admin privilege, so current user is effectively the admins group,
# and so will always have all rights.
filesystem.chmod(self.probe_path, 0o700 | everyone_mode)
dacl = _get_security_dacl(self.probe_path).GetSecurityDescriptorDacl()
everybody = win32security.ConvertStringSidToSid(EVERYBODY_SID)
acls_everybody = [dacl.GetAce(index) for index in range(0, dacl.GetAceCount())
if dacl.GetAce(index)[2] == everybody]
self.assertEqual(len(acls_everybody), 1)
acls_everybody = acls_everybody[0]
self.assertEqual(acls_everybody[1], windows_flag)
def test_user_admin_dacl_consistency(self):
# Set ownership of target to authenticated user
authenticated_user, _, _ = win32security.LookupAccountName("", win32api.GetUserName())
security_owner = _get_security_owner(self.probe_path)
_set_owner(self.probe_path, security_owner, authenticated_user)
filesystem.chmod(self.probe_path, 0o700)
security_dacl = _get_security_dacl(self.probe_path)
# We expect three ACE: one for admins, one for system, and one for the user
self.assertEqual(security_dacl.GetSecurityDescriptorDacl().GetAceCount(), 3)
# Set ownership of target to Administrators user group
admin_user = win32security.ConvertStringSidToSid(ADMINS_SID)
security_owner = _get_security_owner(self.probe_path)
_set_owner(self.probe_path, security_owner, admin_user)
filesystem.chmod(self.probe_path, 0o700)
security_dacl = _get_security_dacl(self.probe_path)
# We expect only two ACE: one for admins, one for system,
# since the user is also the admins group
self.assertEqual(security_dacl.GetSecurityDescriptorDacl().GetAceCount(), 2)
class OsReplaceTest(test_util.TempDirTestCase): class OsReplaceTest(test_util.TempDirTestCase):
@@ -19,3 +169,28 @@ class OsReplaceTest(test_util.TempDirTestCase):
self.assertFalse(os.path.exists(src)) self.assertFalse(os.path.exists(src))
self.assertTrue(os.path.exists(dst)) self.assertTrue(os.path.exists(dst))
def _get_security_dacl(target):
return win32security.GetFileSecurity(target, win32security.DACL_SECURITY_INFORMATION)
def _get_security_owner(target):
return win32security.GetFileSecurity(target, win32security.OWNER_SECURITY_INFORMATION)
def _set_owner(target, security_owner, user):
security_owner.SetSecurityDescriptorOwner(user, False)
win32security.SetFileSecurity(
target, win32security.OWNER_SECURITY_INFORMATION, security_owner)
def _create_probe(tempdir):
filesystem.chmod(tempdir, 0o744)
probe_path = os.path.join(tempdir, 'probe')
open(probe_path, 'w').close()
return probe_path
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -0,0 +1,15 @@
"""Unit test for os module."""
import unittest
from certbot.compat import os
class OsTest(unittest.TestCase):
"""Unit tests for os module."""
def test_forbidden_methods(self):
for method in ['chmod', 'rename', 'replace']:
self.assertRaises(RuntimeError, getattr(os, method))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -7,6 +7,7 @@ from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-
from certbot import errors from certbot import errors
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.tests import util from certbot.tests import util
@@ -65,7 +66,7 @@ class HookTest(util.ConfigTestCase):
"""Common base class for hook tests.""" """Common base class for hook tests."""
@classmethod @classmethod
def _call(cls, *args, **kwargs): def _call(cls, *args, **kwargs): # pragma: no cover
"""Calls the method being tested with the given arguments.""" """Calls the method being tested with the given arguments."""
raise NotImplementedError raise NotImplementedError
@@ -494,7 +495,7 @@ def create_hook(file_path):
""" """
open(file_path, "w").close() open(file_path, "w").close()
os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IXUSR) filesystem.chmod(file_path, os.stat(file_path).st_mode | stat.S_IXUSR)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -15,6 +15,7 @@ import certbot.tests.util as test_util
from certbot import errors from certbot import errors
from certbot.compat import misc from certbot.compat import misc
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.storage import ALL_FOUR from certbot.storage import ALL_FOUR
CERT = test_util.load_cert('cert_512.pem') CERT = test_util.load_cert('cert_512.pem')
@@ -132,7 +133,7 @@ class BaseRenewableCertTest(test_util.ConfigTestCase):
with open(link, "wb") as f: with open(link, "wb") as f:
f.write(kind.encode('ascii') if value is None else value) f.write(kind.encode('ascii') if value is None else value)
if kind == "privkey": if kind == "privkey":
os.chmod(link, 0o600) filesystem.chmod(link, 0o600)
def _write_out_ex_kinds(self): def _write_out_ex_kinds(self):
for kind in ALL_FOUR: for kind in ALL_FOUR:
@@ -572,7 +573,7 @@ class RenewableCertTests(BaseRenewableCertTest):
self.test_rc.update_all_links_to(1) self.test_rc.update_all_links_to(1)
self.assertTrue(misc.compare_file_modes( self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 1)).st_mode, 0o600)) os.stat(self.test_rc.version("privkey", 1)).st_mode, 0o600))
os.chmod(self.test_rc.version("privkey", 1), 0o444) filesystem.chmod(self.test_rc.version("privkey", 1), 0o444)
# If no new key, permissions should be the same (we didn't write any keys) # If no new key, permissions should be the same (we didn't write any keys)
self.test_rc.save_successor(1, b"newcert", None, b"new chain", self.config) self.test_rc.save_successor(1, b"newcert", None, b"new chain", self.config)
self.assertTrue(misc.compare_file_modes( self.assertTrue(misc.compare_file_modes(
@@ -582,7 +583,7 @@ class RenewableCertTests(BaseRenewableCertTest):
self.assertTrue(misc.compare_file_modes( self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 3)).st_mode, 0o644)) os.stat(self.test_rc.version("privkey", 3)).st_mode, 0o644))
# If permissions reverted, next renewal will also revert permissions of new key # If permissions reverted, next renewal will also revert permissions of new key
os.chmod(self.test_rc.version("privkey", 3), 0o400) filesystem.chmod(self.test_rc.version("privkey", 3), 0o400)
self.test_rc.save_successor(3, b"newcert", b"new_privkey", b"new chain", self.config) self.test_rc.save_successor(3, b"newcert", b"new_privkey", b"new chain", self.config)
self.assertTrue(misc.compare_file_modes( self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 4)).st_mode, 0o600)) os.stat(self.test_rc.version("privkey", 4)).st_mode, 0o600))
@@ -776,7 +777,7 @@ class RenewableCertTests(BaseRenewableCertTest):
with open(temp, "w") as f: with open(temp, "w") as f:
f.write("[renewalparams]\nuseful = value # A useful value\n" f.write("[renewalparams]\nuseful = value # A useful value\n"
"useless = value # Not needed\n") "useless = value # Not needed\n")
os.chmod(temp, 0o640) filesystem.chmod(temp, 0o640)
target = {} target = {}
for x in ALL_FOUR: for x in ALL_FOUR:
target[x] = "somewhere" target[x] = "somewhere"

View File

@@ -27,6 +27,7 @@ from certbot import lock
from certbot import storage from certbot import storage
from certbot import util from certbot import util
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
from certbot.display import util as display_util from certbot.display import util as display_util
@@ -340,8 +341,13 @@ class TempDirTestCase(unittest.TestCase):
def handle_rw_files(_, path, __): def handle_rw_files(_, path, __):
"""Handle read-only files, that will fail to be removed on Windows.""" """Handle read-only files, that will fail to be removed on Windows."""
os.chmod(path, stat.S_IWRITE) filesystem.chmod(path, stat.S_IWRITE)
os.remove(path) try:
os.remove(path)
except (IOError, OSError):
# TODO: remote the try/except once all logic from windows file permissions is merged
if os.name != 'nt':
raise
shutil.rmtree(self.tempdir, onerror=handle_rw_files) shutil.rmtree(self.tempdir, onerror=handle_rw_files)

View File

@@ -11,6 +11,7 @@ import certbot.tests.util as test_util
from certbot import errors from certbot import errors
from certbot.compat import misc from certbot.compat import misc
from certbot.compat import os from certbot.compat import os
from certbot.compat import filesystem
class RunScriptTest(unittest.TestCase): class RunScriptTest(unittest.TestCase):
@@ -188,17 +189,19 @@ class CheckPermissionsTest(test_util.TempDirTestCase):
return check_permissions(self.tempdir, mode, self.uid) return check_permissions(self.tempdir, mode, self.uid)
def test_ok_mode(self): def test_ok_mode(self):
os.chmod(self.tempdir, 0o600) filesystem.chmod(self.tempdir, 0o600)
self.assertTrue(self._call(0o600)) self.assertTrue(self._call(0o600))
# TODO: reactivate the test when all logic from windows file permissions is merged.
@test_util.broken_on_windows
def test_wrong_mode(self): def test_wrong_mode(self):
os.chmod(self.tempdir, 0o400) filesystem.chmod(self.tempdir, 0o400)
try: try:
self.assertFalse(self._call(0o600)) self.assertFalse(self._call(0o600))
finally: finally:
# Without proper write permissions, Windows is unable to delete a folder, # Without proper write permissions, Windows is unable to delete a folder,
# even with admin permissions. Write access must be explicitly set first. # even with admin permissions. Write access must be explicitly set first.
os.chmod(self.tempdir, 0o700) filesystem.chmod(self.tempdir, 0o700)
class UniqueFileTest(test_util.TempDirTestCase): class UniqueFileTest(test_util.TempDirTestCase):

View File

@@ -3,7 +3,8 @@ import os
import re import re
import sys import sys
from setuptools import find_packages, setup from distutils.version import StrictVersion
from setuptools import find_packages, setup, __version__ as setuptools_version
from setuptools.command.test import test as TestCommand from setuptools.command.test import test as TestCommand
# Workaround for http://bugs.python.org/issue8876, see # Workaround for http://bugs.python.org/issue8876, see
@@ -52,6 +53,17 @@ install_requires = [
'zope.interface', 'zope.interface',
] ]
# Add pywin32 on Windows platforms to handle low-level system calls.
# This dependency needs to be added using environment markers to avoid its installation on Linux.
# However environment markers are supported only with setuptools >= 36.2.
# So this dependency is not added for old Linux distributions with old setuptools,
# in order to allow these systems to build certbot from sources.
if StrictVersion(setuptools_version) >= StrictVersion('36.2'):
install_requires.append("pywin32 ; sys_platform == 'win32'")
elif 'bdist_wheel' in sys.argv[1:]:
raise RuntimeError('Error, you are trying to build certbot wheels using an old version '
'of setuptools. Version 36.2+ of setuptools is required.')
dev_extras = [ dev_extras = [
'astroid==1.6.5', 'astroid==1.6.5',
'coverage', 'coverage',

View File

@@ -12,7 +12,7 @@ DEFAULT_PACKAGES = [
'certbot_dns_sakuracloud', 'certbot_nginx', 'letshelp_certbot'] 'certbot_dns_sakuracloud', 'certbot_nginx', 'letshelp_certbot']
COVER_THRESHOLDS = { COVER_THRESHOLDS = {
'certbot': {'linux': 98, 'windows': 93}, 'certbot': {'linux': 97, 'windows': 96},
'acme': {'linux': 100, 'windows': 99}, 'acme': {'linux': 100, 'windows': 99},
'certbot_apache': {'linux': 100, 'windows': 100}, 'certbot_apache': {'linux': 100, 'windows': 100},
'certbot_dns_cloudflare': {'linux': 98, 'windows': 98}, 'certbot_dns_cloudflare': {'linux': 98, 'windows': 98},

View File

@@ -238,7 +238,7 @@ commands =
--acme-server={env:ACME_SERVER:pebble} \ --acme-server={env:ACME_SERVER:pebble} \
--cov=acme --cov=certbot --cov=certbot_nginx --cov-report= \ --cov=acme --cov=certbot --cov=certbot_nginx --cov-report= \
--cov-config=certbot-ci/certbot_integration_tests/.coveragerc --cov-config=certbot-ci/certbot_integration_tests/.coveragerc
coverage report --include 'certbot/*' --show-missing --fail-under=67 coverage report --include 'certbot/*' --show-missing --fail-under=66
coverage report --include 'certbot-nginx/*' --show-missing --fail-under=74 coverage report --include 'certbot-nginx/*' --show-missing --fail-under=74
passenv = DOCKER_* passenv = DOCKER_*