1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Reuse ACMEv1 accounts for ACMEv2 (#5902)

* Reuse ACMEv1 accounts for ACMEv2

* Correct behavior

* add unit tests

* add _find_all_inner to comply with interface

* acme-staging-v01 --> acme-staging

* only create symlink to previous account if there is one there

* recurse on server path

* update tests and change internal use of load to use server_path

* fail gracefully on corrupted account file by returning [] when rmdir fails

* only reuse accounts in staging for now
This commit is contained in:
ohemorange
2018-06-04 16:04:47 -07:00
committed by Brad Warren
parent 236f9630e0
commit 8e4303af9f
4 changed files with 108 additions and 8 deletions

View File

@@ -16,6 +16,7 @@ import zope.component
from acme import fields as acme_fields
from acme import messages
from certbot import constants
from certbot import errors
from certbot import interfaces
from certbot import util
@@ -142,7 +143,11 @@ class AccountFileStorage(interfaces.AccountStorage):
self.config.strict_permissions)
def _account_dir_path(self, account_id):
return os.path.join(self.config.accounts_dir, account_id)
return self._account_dir_path_for_server_path(account_id, self.config.server_path)
def _account_dir_path_for_server_path(self, account_id, server_path):
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
return os.path.join(accounts_dir, account_id)
@classmethod
def _regr_path(cls, account_dir_path):
@@ -156,22 +161,44 @@ class AccountFileStorage(interfaces.AccountStorage):
def _metadata_path(cls, account_dir_path):
return os.path.join(account_dir_path, "meta.json")
def find_all(self):
def _find_all_for_server_path(self, server_path):
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
try:
candidates = os.listdir(self.config.accounts_dir)
candidates = os.listdir(accounts_dir)
except OSError:
return []
accounts = []
for account_id in candidates:
try:
accounts.append(self.load(account_id))
accounts.append(self._load_for_server_path(account_id, server_path))
except errors.AccountStorageError:
logger.debug("Account loading problem", exc_info=True)
if not accounts and server_path in constants.LE_REUSE_SERVERS:
# find all for the next link down
prev_server_path = constants.LE_REUSE_SERVERS[server_path]
prev_accounts = self._find_all_for_server_path(prev_server_path)
# if we found something, link to that
if prev_accounts:
if os.path.islink(accounts_dir):
os.unlink(accounts_dir)
else:
try:
os.rmdir(accounts_dir)
except OSError:
return []
prev_account_dir = self.config.accounts_dir_for_server_path(prev_server_path)
os.symlink(prev_account_dir, accounts_dir)
accounts = prev_accounts
return accounts
def load(self, account_id):
account_dir_path = self._account_dir_path(account_id)
def find_all(self):
return self._find_all_for_server_path(self.config.server_path)
def _load_for_server_path(self, account_id, server_path):
account_dir_path = self._account_dir_path_for_server_path(account_id, server_path)
if not os.path.isdir(account_dir_path):
raise errors.AccountNotFound(
"Account at %s does not exist" % account_dir_path)
@@ -193,6 +220,9 @@ class AccountFileStorage(interfaces.AccountStorage):
account_id, acc.id))
return acc
def load(self, account_id):
return self._load_for_server_path(account_id, self.config.server_path)
def save(self, account, acme):
self._save(account, acme, regr_only=False)

View File

@@ -65,8 +65,12 @@ class NamespaceConfig(object):
@property
def accounts_dir(self): # pylint: disable=missing-docstring
return self.accounts_dir_for_server_path(self.server_path)
def accounts_dir_for_server_path(self, server_path):
"""Path to accounts directory based on server_path"""
return os.path.join(
self.namespace.config_dir, constants.ACCOUNTS_DIR, self.server_path)
self.namespace.config_dir, constants.ACCOUNTS_DIR, server_path)
@property
def backup_dir(self): # pylint: disable=missing-docstring

View File

@@ -158,6 +158,12 @@ CONFIG_DIRS_MODE = 0o755
ACCOUNTS_DIR = "accounts"
"""Directory where all accounts are saved."""
LE_REUSE_SERVERS = {
'acme-staging-v02.api.letsencrypt.org/directory':
'acme-staging.api.letsencrypt.org/directory'
}
"""Servers that can reuse accounts from other servers."""
BACKUP_DIR = "backups"
"""Directory (relative to `IConfig.work_dir`) where backups are kept."""

View File

@@ -95,6 +95,7 @@ class AccountMemoryStorageTest(unittest.TestCase):
class AccountFileStorageTest(test_util.ConfigTestCase):
"""Tests for certbot.account.AccountFileStorage."""
#pylint: disable=too-many-public-methods
def setUp(self):
super(AccountFileStorageTest, self).setUp()
@@ -159,7 +160,8 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
self.assertEqual([], self.storage.find_all())
def test_find_all_load_skips(self):
self.storage.load = mock.MagicMock(
# pylint: disable=protected-access
self.storage._load_for_server_path = mock.MagicMock(
side_effect=["x", errors.AccountStorageError, "z"])
with mock.patch("certbot.account.os.listdir") as mock_listdir:
mock_listdir.return_value = ["x", "y", "z"]
@@ -175,6 +177,64 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
self.assertRaises(errors.AccountStorageError, self.storage.load,
"x" + self.acc.id)
def _set_server(self, server):
self.config.server = server
from certbot.account import AccountFileStorage
self.storage = AccountFileStorage(self.config)
def test_find_all_neither_exists(self):
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
self.assertEqual([], self.storage.find_all())
self.assertEqual([], self.storage.find_all())
self.assertFalse(os.path.islink(self.config.accounts_dir))
def test_find_all_find_before_save(self):
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
self.assertEqual([], self.storage.find_all())
self.storage.save(self.acc, self.mock_client)
self.assertEqual([self.acc], self.storage.find_all())
self.assertEqual([self.acc], self.storage.find_all())
self.assertFalse(os.path.islink(self.config.accounts_dir))
# we shouldn't have created a v1 account
prev_server_path = 'https://acme-staging.api.letsencrypt.org/directory'
self.assertFalse(os.path.isdir(self.config.accounts_dir_for_server_path(prev_server_path)))
def test_find_all_save_before_find(self):
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
self.storage.save(self.acc, self.mock_client)
self.assertEqual([self.acc], self.storage.find_all())
self.assertEqual([self.acc], self.storage.find_all())
self.assertFalse(os.path.islink(self.config.accounts_dir))
self.assertTrue(os.path.isdir(self.config.accounts_dir))
prev_server_path = 'https://acme-staging.api.letsencrypt.org/directory'
self.assertFalse(os.path.isdir(self.config.accounts_dir_for_server_path(prev_server_path)))
def test_find_all_server_downgrade(self):
# don't use v2 accounts with a v1 url
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
self.assertEqual([], self.storage.find_all())
self.storage.save(self.acc, self.mock_client)
self.assertEqual([self.acc], self.storage.find_all())
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
self.assertEqual([], self.storage.find_all())
def test_upgrade_version(self):
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
self.storage.save(self.acc, self.mock_client)
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
self.assertEqual([self.acc], self.storage.find_all())
@mock.patch('os.rmdir')
def test_corrupted_account(self, mock_rmdir):
# pylint: disable=protected-access
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
self.storage.save(self.acc, self.mock_client)
mock_rmdir.side_effect = OSError
self.storage._load_for_server_path = mock.MagicMock(
side_effect=errors.AccountStorageError)
self._set_server('https://acme-staging-v02.api.letsencrypt.org/directory')
self.assertEqual([], self.storage.find_all())
def test_load_ioerror(self):
self.storage.save(self.acc, self.mock_client)
mock_open = mock.mock_open()