From e1e52a9d56f684cf2cb289a0661da65c132843c7 Mon Sep 17 00:00:00 2001 From: Brad Warren Date: Thu, 11 Feb 2016 17:47:15 -0800 Subject: [PATCH] Verify both symlink and target --- letsencrypt/storage.py | 37 ++++++++++++++++++++----------- letsencrypt/tests/storage_test.py | 4 ++++ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/letsencrypt/storage.py b/letsencrypt/storage.py index 6f7f54646..6786ac745 100644 --- a/letsencrypt/storage.py +++ b/letsencrypt/storage.py @@ -112,6 +112,21 @@ def update_configuration(lineagename, target, cli_config): return configobj.ConfigObj(config_filename) +def get_link_target(link): + """Get an absolute path to the target of link. + + :param str link: Path to a symbolic link + + :returns: Absolute path to the target of link + :rtype: str + + """ + target = os.readlink(link) + if not os.path.isabs(target): + target = os.path.join(os.path.dirname(link), target) + return os.path.abspath(target) + + class RenewableCert(object): # pylint: disable=too-many-instance-attributes """Renewable certificate. @@ -194,13 +209,15 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes def _check_symlinks(self): """Raises an exception if a symlink doesn't exist""" - def check(link): - """Checks if symlink points to a file that exists""" - return os.path.exists(os.path.realpath(link)) for kind in ALL_FOUR: - if not check(getattr(self, kind)): + link = getattr(self, kind) + if not os.path.islink(link): raise errors.CertStorageError( - "link: {0} does not exist".format(getattr(self, kind))) + "expected {0} to be a symlink".format(link)) + target = get_link_target(link) + if not os.path.exists(target): + raise errors.CertStorageError("target {0} of symlink {1} does " + "not exist".format(target, link)) def _consistent(self): """Are the files associated with this lineage self-consistent? @@ -225,10 +242,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes return False for kind in ALL_FOUR: link = getattr(self, kind) - where = os.path.dirname(link) - target = os.readlink(link) - if not os.path.isabs(target): - target = os.path.join(where, target) + target = get_link_target(link) # Each element's link must point within the cert lineage's # directory within the official archive directory @@ -343,10 +357,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes logger.debug("Expected symlink %s for %s does not exist.", link, kind) return None - target = os.readlink(link) - if not os.path.isabs(target): - target = os.path.join(os.path.dirname(link), target) - return os.path.abspath(target) + return get_link_target(link) def current_version(self, kind): """Returns numerical version of the specified item. diff --git a/letsencrypt/tests/storage_test.py b/letsencrypt/tests/storage_test.py index 9d402089c..071d6d7bb 100644 --- a/letsencrypt/tests/storage_test.py +++ b/letsencrypt/tests/storage_test.py @@ -687,6 +687,10 @@ class RenewableCertTests(BaseRenewableCertTest): self.assertRaises(errors.CertStorageError, storage.RenewableCert, self.config.filename, self.cli_config) + os.symlink("missing", self.config[ALL_FOUR[0]]) + self.assertRaises(errors.CertStorageError, + storage.RenewableCert, + self.config.filename, self.cli_config) if __name__ == "__main__":