mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
Implement a consistent realpath function in certbot.compat.filesystem (#7242)
Fixes #7115 This PR creates a `realpath` method in `filesystem`, whose goal is to replace any call to `os.path.realpath` in Certbot. The reason is that `os.path.realpath` is broken on some versions of Python for Windows. See https://bugs.python.org/issue9949. The function created here works consistently across Linux and Windows. As for the other forbidden functions in `os` module, our `certbot.compat.os` will raise an exception if its `path.realpath` function is invoked, and using the `os` module from Python is forbidden from the pylint check implemented in our CI. Every call to `os.path.realpath` is corrected in `certbot` and `certbot-apache` modules. * Forbid os.path.realpath * Finish implementation * Use filesystem.realpath * Control symlink loops also for Linux * Add a test for forbidden method * Import a new object from os.path module * Use same approach of wrapping than certbot.compat.os * Correct errors * Fix dependencies * Make path module internal
This commit is contained in:
committed by
Brad Warren
parent
41a17f913e
commit
71ff47daad
@@ -23,6 +23,7 @@ from certbot import interfaces
|
||||
from certbot import util
|
||||
|
||||
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
from certbot.plugins import common
|
||||
from certbot.plugins.util import path_surgery
|
||||
@@ -895,7 +896,7 @@ class ApacheConfigurator(common.Installer):
|
||||
if not new_vhost:
|
||||
continue
|
||||
internal_path = apache_util.get_internal_aug_path(new_vhost.path)
|
||||
realpath = os.path.realpath(new_vhost.filep)
|
||||
realpath = filesystem.realpath(new_vhost.filep)
|
||||
if realpath not in file_paths:
|
||||
file_paths[realpath] = new_vhost.filep
|
||||
internal_paths[realpath].add(internal_path)
|
||||
@@ -1221,11 +1222,11 @@ class ApacheConfigurator(common.Installer):
|
||||
"""
|
||||
|
||||
if self.conf("vhost-root") and os.path.exists(self.conf("vhost-root")):
|
||||
fp = os.path.join(os.path.realpath(self.option("vhost_root")),
|
||||
fp = os.path.join(filesystem.realpath(self.option("vhost_root")),
|
||||
os.path.basename(non_ssl_vh_fp))
|
||||
else:
|
||||
# Use non-ssl filepath
|
||||
fp = os.path.realpath(non_ssl_vh_fp)
|
||||
fp = filesystem.realpath(non_ssl_vh_fp)
|
||||
|
||||
if fp.endswith(".conf"):
|
||||
return fp[:-(len(".conf"))] + self.option("le_vhost_ext")
|
||||
|
||||
@@ -7,6 +7,7 @@ import zope.interface
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
||||
from certbot_apache import apache_util
|
||||
@@ -65,7 +66,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
try:
|
||||
os.symlink(vhost.filep, enabled_path)
|
||||
except OSError as err:
|
||||
if os.path.islink(enabled_path) and os.path.realpath(
|
||||
if os.path.islink(enabled_path) and filesystem.realpath(
|
||||
enabled_path) == vhost.filep:
|
||||
# Already in shape
|
||||
vhost.enabled = True
|
||||
|
||||
@@ -4,6 +4,7 @@ import unittest
|
||||
import mock
|
||||
|
||||
from certbot import errors
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
||||
from certbot_apache import obj
|
||||
@@ -160,7 +161,7 @@ class MultipleVhostsTestCentOS(util.ApacheTest):
|
||||
"""Make sure we read the sysconfig OPTIONS variable correctly"""
|
||||
# Return nothing for the process calls
|
||||
mock_cfg.return_value = ""
|
||||
self.config.parser.sysconfig_filep = os.path.realpath(
|
||||
self.config.parser.sysconfig_filep = filesystem.realpath(
|
||||
os.path.join(self.config.parser.root, "../sysconfig/httpd"))
|
||||
self.config.parser.variables = {}
|
||||
|
||||
|
||||
@@ -675,8 +675,7 @@ class MultipleVhostsTest(util.ApacheTest):
|
||||
def test_make_vhost_ssl_nonexistent_vhost_path(self):
|
||||
ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[1])
|
||||
self.assertEqual(os.path.dirname(ssl_vhost.filep),
|
||||
os.path.dirname(os.path.realpath(
|
||||
self.vh_truth[1].filep)))
|
||||
os.path.dirname(filesystem.realpath(self.vh_truth[1].filep)))
|
||||
|
||||
def test_make_vhost_ssl(self):
|
||||
ssl_vhost = self.config.make_vhost_ssl(self.vh_truth[0])
|
||||
@@ -1336,7 +1335,7 @@ class MultipleVhostsTest(util.ApacheTest):
|
||||
self.config.parser.modules.add("ssl_module")
|
||||
self.config.parser.modules.add("mod_ssl.c")
|
||||
self.config.parser.modules.add("socache_shmcb_module")
|
||||
tmp_path = os.path.realpath(tempfile.mkdtemp("vhostroot"))
|
||||
tmp_path = filesystem.realpath(tempfile.mkdtemp("vhostroot"))
|
||||
filesystem.chmod(tmp_path, 0o755)
|
||||
mock_p = "certbot_apache.configurator.ApacheConfigurator._get_ssl_vhost_path"
|
||||
mock_a = "certbot_apache.parser.ApacheParser.add_include"
|
||||
|
||||
@@ -79,9 +79,9 @@ class MultipleVhostsTestDebian(util.ApacheTest):
|
||||
|
||||
def test_enable_site_failure(self):
|
||||
self.config.parser.root = "/tmp/nonexistent"
|
||||
with mock.patch("os.path.isdir") as mock_dir:
|
||||
with mock.patch("certbot.compat.os.path.isdir") as mock_dir:
|
||||
mock_dir.return_value = True
|
||||
with mock.patch("os.path.islink") as mock_link:
|
||||
with mock.patch("certbot.compat.os.path.islink") as mock_link:
|
||||
mock_link.return_value = False
|
||||
self.assertRaises(
|
||||
errors.NotSupportedError,
|
||||
|
||||
@@ -4,6 +4,7 @@ import unittest
|
||||
import mock
|
||||
|
||||
from certbot import errors
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
||||
from certbot_apache import obj
|
||||
@@ -160,7 +161,7 @@ class MultipleVhostsTestFedora(util.ApacheTest):
|
||||
"""Make sure we read the sysconfig OPTIONS variable correctly"""
|
||||
# Return nothing for the process calls
|
||||
mock_cfg.return_value = ""
|
||||
self.config.parser.sysconfig_filep = os.path.realpath(
|
||||
self.config.parser.sysconfig_filep = filesystem.realpath(
|
||||
os.path.join(self.config.parser.root, "../sysconfig/httpd"))
|
||||
self.config.parser.variables = {}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import unittest
|
||||
import mock
|
||||
|
||||
from certbot import errors
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
||||
from certbot_apache import obj
|
||||
@@ -81,7 +82,7 @@ class MultipleVhostsTestGentoo(util.ApacheTest):
|
||||
"""Make sure we read the Gentoo APACHE2_OPTS variable correctly"""
|
||||
defines = ['DEFAULT_VHOST', 'INFO',
|
||||
'SSL', 'SSL_DEFAULT_VHOST', 'LANGUAGE']
|
||||
self.config.parser.apacheconfig_filep = os.path.realpath(
|
||||
self.config.parser.apacheconfig_filep = filesystem.realpath(
|
||||
os.path.join(self.config.parser.root, "../conf.d/apache2"))
|
||||
self.config.parser.variables = {}
|
||||
with mock.patch("certbot_apache.override_gentoo.GentooParser.update_modules"):
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
# Remember to update setup.py to match the package versions below.
|
||||
acme[dev]==0.29.0
|
||||
certbot[dev]==0.36.0
|
||||
-e .[dev]
|
||||
|
||||
@@ -10,7 +10,7 @@ version = '0.37.0.dev0'
|
||||
# acme/certbot version.
|
||||
install_requires = [
|
||||
'acme>=0.29.0',
|
||||
'certbot>=0.36.0',
|
||||
'certbot>=0.37.0.dev0',
|
||||
'mock',
|
||||
'python-augeas',
|
||||
'setuptools',
|
||||
|
||||
31
certbot/compat/_path.py
Normal file
31
certbot/compat/_path.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""This compat module wraps os.path to forbid some functions."""
|
||||
# pylint: disable=function-redefined
|
||||
from __future__ import absolute_import
|
||||
|
||||
# First round of wrapping: we import statically all public attributes exposed by the os.path
|
||||
# module. This allows in particular to have pylint, mypy, IDEs be aware that most of os.path
|
||||
# members are available in certbot.compat.path.
|
||||
from os.path import * # type: ignore # pylint: disable=wildcard-import,unused-wildcard-import,redefined-builtin,os-module-forbidden
|
||||
|
||||
# Second round of wrapping: we import dynamically all attributes from the os.path module that have
|
||||
# not yet been imported by the first round (static star import).
|
||||
import os.path as std_os_path # pylint: disable=os-module-forbidden
|
||||
import sys as std_sys
|
||||
|
||||
ourselves = std_sys.modules[__name__]
|
||||
for attribute in dir(std_os_path):
|
||||
# Check if the attribute does not already exist in our module. It could be internal attributes
|
||||
# of the module (__name__, __doc__), or attributes from standard os.path already imported with
|
||||
# `from os.path import *`.
|
||||
if not hasattr(ourselves, attribute):
|
||||
setattr(ourselves, attribute, getattr(std_os_path, attribute))
|
||||
|
||||
# Clean all remaining importables that are not from the core os.path module.
|
||||
del ourselves, std_os_path, std_sys
|
||||
|
||||
|
||||
# Function os.path.realpath is broken on some versions of Python for Windows.
|
||||
def realpath(*unused_args, **unused_kwargs):
|
||||
"""Method os.path.realpath() is forbidden"""
|
||||
raise RuntimeError('Usage of os.path.realpath() is forbidden. '
|
||||
'Use certbot.compat.filesystem.realpath() instead.')
|
||||
@@ -207,13 +207,22 @@ def replace(src, dst):
|
||||
os.rename(src, dst)
|
||||
|
||||
|
||||
def _apply_win_mode(file_path, mode):
|
||||
def realpath(file_path):
|
||||
"""
|
||||
This function converts the given POSIX mode into a Windows ACL list, and applies it to the
|
||||
file given its path. If the given path is a symbolic link, it will resolved to apply the
|
||||
mode on the targeted file.
|
||||
Find the real path for the given path. This method resolves symlinks, including
|
||||
recursive symlinks, and is protected against symlinks that creates an infinite loop.
|
||||
"""
|
||||
original_path = file_path
|
||||
|
||||
if POSIX_MODE:
|
||||
path = os.path.realpath(file_path)
|
||||
if os.path.islink(path):
|
||||
# If path returned by realpath is still a link, it means that it failed to
|
||||
# resolve the symlink because of a loop.
|
||||
# See realpath code: https://github.com/python/cpython/blob/master/Lib/posixpath.py
|
||||
raise RuntimeError('Error, link {0} is a loop!'.format(original_path))
|
||||
return path
|
||||
|
||||
inspected_paths = [] # type: List[str]
|
||||
while os.path.islink(file_path):
|
||||
link_path = file_path
|
||||
@@ -223,6 +232,17 @@ def _apply_win_mode(file_path, mode):
|
||||
if file_path in inspected_paths:
|
||||
raise RuntimeError('Error, link {0} is a loop!'.format(original_path))
|
||||
inspected_paths.append(file_path)
|
||||
|
||||
return os.path.abspath(file_path)
|
||||
|
||||
|
||||
def _apply_win_mode(file_path, mode):
|
||||
"""
|
||||
This function converts the given POSIX mode into a Windows ACL list, and applies it to the
|
||||
file given its path. If the given path is a symbolic link, it will resolved to apply the
|
||||
mode on the targeted file.
|
||||
"""
|
||||
file_path = realpath(file_path)
|
||||
# Get owner sid of the file
|
||||
security = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION)
|
||||
user = security.GetSecurityDescriptorOwner()
|
||||
|
||||
@@ -26,7 +26,9 @@ for attribute in dir(std_os):
|
||||
if not hasattr(ourselves, attribute):
|
||||
setattr(ourselves, attribute, getattr(std_os, attribute))
|
||||
|
||||
# Similar to os.path, allow certbot.compat.os.path to behave as a module
|
||||
# Import our internal path module, then allow certbot.compat.os.path
|
||||
# to behave as a module (similarly to os.path).
|
||||
from certbot.compat import _path as path # type: ignore # pylint: disable=wrong-import-position
|
||||
std_sys.modules[__name__ + '.path'] = path
|
||||
|
||||
# Clean all remaining importables that are not from the core os module.
|
||||
|
||||
@@ -31,6 +31,7 @@ from certbot import reporter
|
||||
from certbot import storage
|
||||
from certbot import updater
|
||||
from certbot import util
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import misc
|
||||
from certbot.compat import os
|
||||
from certbot.display import util as display_util, ops as display_ops
|
||||
@@ -841,12 +842,12 @@ def _populate_from_certname(config):
|
||||
return config
|
||||
|
||||
def _check_certificate_and_key(config):
|
||||
if not os.path.isfile(os.path.realpath(config.cert_path)):
|
||||
if not os.path.isfile(filesystem.realpath(config.cert_path)):
|
||||
raise errors.ConfigurationError("Error while reading certificate from path "
|
||||
"{0}".format(config.cert_path))
|
||||
if not os.path.isfile(os.path.realpath(config.key_path)):
|
||||
"{0}".format(config.cert_path))
|
||||
if not os.path.isfile(filesystem.realpath(config.key_path)):
|
||||
raise errors.ConfigurationError("Error while reading private key from path "
|
||||
"{0}".format(config.key_path))
|
||||
"{0}".format(config.key_path))
|
||||
def plugins_cmd(config, plugins):
|
||||
"""List server software plugins.
|
||||
|
||||
|
||||
@@ -486,7 +486,7 @@ def dir_setup(test_dir, pkg): # pragma: no cover
|
||||
link, (ex: OS X) such plugins will be confused. This function prevents
|
||||
such a case.
|
||||
"""
|
||||
return os.path.realpath(tempfile.mkdtemp(prefix))
|
||||
return filesystem.realpath(tempfile.mkdtemp(prefix))
|
||||
|
||||
temp_dir = expanded_tempdir("temp")
|
||||
config_dir = expanded_tempdir("config")
|
||||
|
||||
@@ -31,7 +31,7 @@ class PluginStorageTest(test_util.ConfigTestCase):
|
||||
self.plugin.storage.storagepath = os.path.join(self.config.config_dir,
|
||||
".pluginstorage.json")
|
||||
with mock.patch("six.moves.builtins.open", mock_open):
|
||||
with mock.patch('os.path.isfile', return_value=True):
|
||||
with mock.patch('certbot.compat.os.path.isfile', return_value=True):
|
||||
with mock.patch("certbot.reverter.util"):
|
||||
self.assertRaises(errors.PluginStorageError,
|
||||
self.plugin.storage._load) # pylint: disable=protected-access
|
||||
|
||||
@@ -97,8 +97,8 @@ class UpdateLiveSymlinksTest(BaseCertManagerTest):
|
||||
for kind in ALL_FOUR:
|
||||
os.chdir(os.path.dirname(self.config_files[domain][kind]))
|
||||
self.assertEqual(
|
||||
os.path.realpath(os.readlink(self.config_files[domain][kind])),
|
||||
os.path.realpath(archive_paths[domain][kind]))
|
||||
filesystem.realpath(os.readlink(self.config_files[domain][kind])),
|
||||
filesystem.realpath(archive_paths[domain][kind]))
|
||||
finally:
|
||||
os.chdir(prev_dir)
|
||||
|
||||
|
||||
@@ -48,18 +48,6 @@ class WindowsChmodTests(TempDirTestCase):
|
||||
self.assertFalse(filesystem._compare_dacls(ref_dacl_probe, cur_dacl_probe)) # pylint: disable=protected-access
|
||||
self.assertTrue(filesystem._compare_dacls(ref_dacl_link, cur_dacl_link)) # pylint: disable=protected-access
|
||||
|
||||
def test_symlink_loop_mitigation(self):
|
||||
link1_path = os.path.join(self.tempdir, 'link1')
|
||||
link2_path = os.path.join(self.tempdir, 'link2')
|
||||
link3_path = os.path.join(self.tempdir, 'link3')
|
||||
os.symlink(link1_path, link2_path)
|
||||
os.symlink(link2_path, link3_path)
|
||||
os.symlink(link3_path, link1_path)
|
||||
|
||||
with self.assertRaises(RuntimeError) as error:
|
||||
filesystem.chmod(link1_path, 0o755)
|
||||
self.assertTrue('link1 is a loop!' in str(error.exception))
|
||||
|
||||
def test_world_permission(self):
|
||||
everybody = win32security.ConvertStringSidToSid(EVERYBODY_SID)
|
||||
|
||||
@@ -320,7 +308,6 @@ class CopyOwnershipTest(test_util.TempDirTestCase):
|
||||
|
||||
class OsReplaceTest(test_util.TempDirTestCase):
|
||||
"""Test to ensure consistent behavior of rename method"""
|
||||
|
||||
def test_os_replace_to_existing_file(self):
|
||||
"""Ensure that replace will effectively rename src into dst for all platforms."""
|
||||
src = os.path.join(self.tempdir, 'src')
|
||||
@@ -335,6 +322,46 @@ class OsReplaceTest(test_util.TempDirTestCase):
|
||||
self.assertTrue(os.path.exists(dst))
|
||||
|
||||
|
||||
class RealpathTest(test_util.TempDirTestCase):
|
||||
"""Tests for realpath method"""
|
||||
def setUp(self):
|
||||
super(RealpathTest, self).setUp()
|
||||
self.probe_path = _create_probe(self.tempdir)
|
||||
|
||||
def test_symlink_resolution(self):
|
||||
# Absolute resolution
|
||||
link_path = os.path.join(self.tempdir, 'link_abs')
|
||||
os.symlink(self.probe_path, link_path)
|
||||
|
||||
self.assertEqual(self.probe_path, filesystem.realpath(self.probe_path))
|
||||
self.assertEqual(self.probe_path, filesystem.realpath(link_path))
|
||||
|
||||
# Relative resolution
|
||||
curdir = os.getcwd()
|
||||
link_path = os.path.join(self.tempdir, 'link_rel')
|
||||
probe_name = os.path.basename(self.probe_path)
|
||||
try:
|
||||
os.chdir(os.path.dirname(self.probe_path))
|
||||
os.symlink(probe_name, link_path)
|
||||
|
||||
self.assertEqual(self.probe_path, filesystem.realpath(probe_name))
|
||||
self.assertEqual(self.probe_path, filesystem.realpath(link_path))
|
||||
finally:
|
||||
os.chdir(curdir)
|
||||
|
||||
def test_symlink_loop_mitigation(self):
|
||||
link1_path = os.path.join(self.tempdir, 'link1')
|
||||
link2_path = os.path.join(self.tempdir, 'link2')
|
||||
link3_path = os.path.join(self.tempdir, 'link3')
|
||||
os.symlink(link1_path, link2_path)
|
||||
os.symlink(link2_path, link3_path)
|
||||
os.symlink(link3_path, link1_path)
|
||||
|
||||
with self.assertRaises(RuntimeError) as error:
|
||||
filesystem.realpath(link1_path)
|
||||
self.assertTrue('link1 is a loop!' in str(error.exception))
|
||||
|
||||
|
||||
def _get_security_dacl(target):
|
||||
return win32security.GetFileSecurity(target, win32security.DACL_SECURITY_INFORMATION)
|
||||
|
||||
|
||||
@@ -7,8 +7,12 @@ from certbot.compat import os
|
||||
class OsTest(unittest.TestCase):
|
||||
"""Unit tests for os module."""
|
||||
def test_forbidden_methods(self):
|
||||
# Checks for os module
|
||||
for method in ['chmod', 'chown', 'open', 'mkdir', 'makedirs', 'rename', 'replace']:
|
||||
self.assertRaises(RuntimeError, getattr(os, method))
|
||||
# Checks for os.path module
|
||||
for method in ['realpath']:
|
||||
self.assertRaises(RuntimeError, getattr(os.path, method))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -542,7 +542,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
return True
|
||||
return orig_open(fn)
|
||||
|
||||
with mock.patch("os.path.isfile") as mock_if:
|
||||
with mock.patch("certbot.compat.os.path.isfile") as mock_if:
|
||||
mock_if.side_effect = mock_isfile
|
||||
with mock.patch('certbot.main.client') as client:
|
||||
ret, stdout, stderr = self._call_no_clientmock(args, stdout)
|
||||
|
||||
@@ -528,7 +528,7 @@ class OsInfoTest(unittest.TestCase):
|
||||
from certbot.util import (get_os_info, get_systemd_os_info,
|
||||
get_os_info_ua)
|
||||
|
||||
with mock.patch('os.path.isfile', return_value=True):
|
||||
with mock.patch('certbot.compat.os.path.isfile', return_value=True):
|
||||
self.assertEqual(get_os_info(
|
||||
test_util.vector_path("os-release"))[0], 'systemdos')
|
||||
self.assertEqual(get_os_info(
|
||||
@@ -536,13 +536,13 @@ class OsInfoTest(unittest.TestCase):
|
||||
self.assertEqual(get_systemd_os_info(os.devnull), ("", ""))
|
||||
self.assertEqual(get_os_info_ua(
|
||||
test_util.vector_path("os-release")), "SystemdOS")
|
||||
with mock.patch('os.path.isfile', return_value=False):
|
||||
with mock.patch('certbot.compat.os.path.isfile', return_value=False):
|
||||
self.assertEqual(get_systemd_os_info(), ("", ""))
|
||||
|
||||
def test_systemd_os_release_like(self):
|
||||
from certbot.util import get_systemd_os_like
|
||||
|
||||
with mock.patch('os.path.isfile', return_value=True):
|
||||
with mock.patch('certbot.compat.os.path.isfile', return_value=True):
|
||||
id_likes = get_systemd_os_like(test_util.vector_path(
|
||||
"os-release"))
|
||||
self.assertEqual(len(id_likes), 3)
|
||||
@@ -552,7 +552,7 @@ class OsInfoTest(unittest.TestCase):
|
||||
def test_non_systemd_os_info(self, popen_mock):
|
||||
from certbot.util import (get_os_info, get_python_os_info,
|
||||
get_os_info_ua)
|
||||
with mock.patch('os.path.isfile', return_value=False):
|
||||
with mock.patch('certbot.compat.os.path.isfile', return_value=False):
|
||||
with mock.patch('platform.system_alias',
|
||||
return_value=('NonSystemD', '42', '42')):
|
||||
self.assertEqual(get_os_info()[0], 'nonsystemd')
|
||||
|
||||
Reference in New Issue
Block a user