mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
Reuse ACMEv1 accounts for ACMEv2 (#5902)
* Reuse ACMEv1 accounts for ACMEv2 * Correct behavior * add unit tests * add _find_all_inner to comply with interface * acme-staging-v01 --> acme-staging * only create symlink to previous account if there is one there * recurse on server path * update tests and change internal use of load to use server_path * fail gracefully on corrupted account file by returning [] when rmdir fails * only reuse accounts in staging for now
This commit is contained in:
@@ -16,6 +16,7 @@ import zope.component
|
||||
from acme import fields as acme_fields
|
||||
from acme import messages
|
||||
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
@@ -142,7 +143,11 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
self.config.strict_permissions)
|
||||
|
||||
def _account_dir_path(self, account_id):
|
||||
return os.path.join(self.config.accounts_dir, account_id)
|
||||
return self._account_dir_path_for_server_path(account_id, self.config.server_path)
|
||||
|
||||
def _account_dir_path_for_server_path(self, account_id, server_path):
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
return os.path.join(accounts_dir, account_id)
|
||||
|
||||
@classmethod
|
||||
def _regr_path(cls, account_dir_path):
|
||||
@@ -156,22 +161,44 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
def _metadata_path(cls, account_dir_path):
|
||||
return os.path.join(account_dir_path, "meta.json")
|
||||
|
||||
def find_all(self):
|
||||
def _find_all_for_server_path(self, server_path):
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
try:
|
||||
candidates = os.listdir(self.config.accounts_dir)
|
||||
candidates = os.listdir(accounts_dir)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
accounts = []
|
||||
for account_id in candidates:
|
||||
try:
|
||||
accounts.append(self.load(account_id))
|
||||
accounts.append(self._load_for_server_path(account_id, server_path))
|
||||
except errors.AccountStorageError:
|
||||
logger.debug("Account loading problem", exc_info=True)
|
||||
|
||||
|
||||
if not accounts and server_path in constants.LE_REUSE_SERVERS:
|
||||
# find all for the next link down
|
||||
prev_server_path = constants.LE_REUSE_SERVERS[server_path]
|
||||
prev_accounts = self._find_all_for_server_path(prev_server_path)
|
||||
# if we found something, link to that
|
||||
if prev_accounts:
|
||||
if os.path.islink(accounts_dir):
|
||||
os.unlink(accounts_dir)
|
||||
else:
|
||||
try:
|
||||
os.rmdir(accounts_dir)
|
||||
except OSError:
|
||||
return []
|
||||
prev_account_dir = self.config.accounts_dir_for_server_path(prev_server_path)
|
||||
os.symlink(prev_account_dir, accounts_dir)
|
||||
accounts = prev_accounts
|
||||
return accounts
|
||||
|
||||
def load(self, account_id):
|
||||
account_dir_path = self._account_dir_path(account_id)
|
||||
def find_all(self):
|
||||
return self._find_all_for_server_path(self.config.server_path)
|
||||
|
||||
def _load_for_server_path(self, account_id, server_path):
|
||||
account_dir_path = self._account_dir_path_for_server_path(account_id, server_path)
|
||||
if not os.path.isdir(account_dir_path):
|
||||
raise errors.AccountNotFound(
|
||||
"Account at %s does not exist" % account_dir_path)
|
||||
@@ -193,6 +220,9 @@ class AccountFileStorage(interfaces.AccountStorage):
|
||||
account_id, acc.id))
|
||||
return acc
|
||||
|
||||
def load(self, account_id):
|
||||
return self._load_for_server_path(account_id, self.config.server_path)
|
||||
|
||||
def save(self, account, acme):
|
||||
self._save(account, acme, regr_only=False)
|
||||
|
||||
|
||||
@@ -65,8 +65,12 @@ class NamespaceConfig(object):
|
||||
|
||||
@property
|
||||
def accounts_dir(self): # pylint: disable=missing-docstring
|
||||
return self.accounts_dir_for_server_path(self.server_path)
|
||||
|
||||
def accounts_dir_for_server_path(self, server_path):
|
||||
"""Path to accounts directory based on server_path"""
|
||||
return os.path.join(
|
||||
self.namespace.config_dir, constants.ACCOUNTS_DIR, self.server_path)
|
||||
self.namespace.config_dir, constants.ACCOUNTS_DIR, server_path)
|
||||
|
||||
@property
|
||||
def backup_dir(self): # pylint: disable=missing-docstring
|
||||
|
||||
@@ -158,6 +158,12 @@ CONFIG_DIRS_MODE = 0o755
|
||||
ACCOUNTS_DIR = "accounts"
|
||||
"""Directory where all accounts are saved."""
|
||||
|
||||
LE_REUSE_SERVERS = {
|
||||
'acme-staging-v02.api.letsencrypt.org/directory':
|
||||
'acme-staging.api.letsencrypt.org/directory'
|
||||
}
|
||||
"""Servers that can reuse accounts from other servers."""
|
||||
|
||||
BACKUP_DIR = "backups"
|
||||
"""Directory (relative to `IConfig.work_dir`) where backups are kept."""
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@ class AccountMemoryStorageTest(unittest.TestCase):
|
||||
|
||||
class AccountFileStorageTest(test_util.ConfigTestCase):
|
||||
"""Tests for certbot.account.AccountFileStorage."""
|
||||
#pylint: disable=too-many-public-methods
|
||||
|
||||
def setUp(self):
|
||||
super(AccountFileStorageTest, self).setUp()
|
||||
@@ -159,7 +160,8 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
|
||||
def test_find_all_load_skips(self):
|
||||
self.storage.load = mock.MagicMock(
|
||||
# pylint: disable=protected-access
|
||||
self.storage._load_for_server_path = mock.MagicMock(
|
||||
side_effect=["x", errors.AccountStorageError, "z"])
|
||||
with mock.patch("certbot.account.os.listdir") as mock_listdir:
|
||||
mock_listdir.return_value = ["x", "y", "z"]
|
||||
@@ -175,6 +177,64 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
||||
self.assertRaises(errors.AccountStorageError, self.storage.load,
|
||||
"x" + self.acc.id)
|
||||
|
||||
def _set_server(self, server):
|
||||
self.config.server = server
|
||||
from certbot.account import AccountFileStorage
|
||||
self.storage = AccountFileStorage(self.config)
|
||||
|
||||
def test_find_all_neither_exists(self):
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
self.assertFalse(os.path.islink(self.config.accounts_dir))
|
||||
|
||||
def test_find_all_find_before_save(self):
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
self.assertFalse(os.path.islink(self.config.accounts_dir))
|
||||
# we shouldn't have created a v1 account
|
||||
prev_server_path = 'https://acme-staging.api.letsencrypt.org/directory'
|
||||
self.assertFalse(os.path.isdir(self.config.accounts_dir_for_server_path(prev_server_path)))
|
||||
|
||||
def test_find_all_save_before_find(self):
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
self.assertFalse(os.path.islink(self.config.accounts_dir))
|
||||
self.assertTrue(os.path.isdir(self.config.accounts_dir))
|
||||
prev_server_path = 'https://acme-staging.api.letsencrypt.org/directory'
|
||||
self.assertFalse(os.path.isdir(self.config.accounts_dir_for_server_path(prev_server_path)))
|
||||
|
||||
def test_find_all_server_downgrade(self):
|
||||
# don't use v2 accounts with a v1 url
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
|
||||
def test_upgrade_version(self):
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([self.acc], self.storage.find_all())
|
||||
|
||||
@mock.patch('os.rmdir')
|
||||
def test_corrupted_account(self, mock_rmdir):
|
||||
# pylint: disable=protected-access
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
mock_rmdir.side_effect = OSError
|
||||
self.storage._load_for_server_path = mock.MagicMock(
|
||||
side_effect=errors.AccountStorageError)
|
||||
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self.assertEqual([], self.storage.find_all())
|
||||
|
||||
def test_load_ioerror(self):
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
mock_open = mock.mock_open()
|
||||
|
||||
Reference in New Issue
Block a user