1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

100% test coverage, account, auth_handler

This commit is contained in:
James Kasten
2015-04-23 19:12:15 -07:00
parent cdfdee2ebc
commit 016e10f415
8 changed files with 260 additions and 71 deletions

View File

@@ -79,7 +79,7 @@ class AuthHandler(object):
self._respond(cont_resp, dv_resp, best_effort)
# Just make sure all decisions are complete.
self._verify_authzr_complete()
self.verify_authzr_complete()
# Only return valid authorizations
return [authzr for authzr in self.authzr.values()
if authzr.body.status == messages2.STATUS_VALID]
@@ -112,8 +112,7 @@ class AuthHandler(object):
logging.critical("Failure in setting up challenges.")
logging.info("Attempting to clean up outstanding challenges...")
self._cleanup_challenges()
raise errors.AuthorizationError(
"Unable to perform challenges")
raise
assert len(cont_resp) == len(self.cont_c)
assert len(dv_resp) == len(self.dv_c)
@@ -159,12 +158,14 @@ class AuthHandler(object):
return active_achalls
def _poll_challenges(self, chall_update, best_effort, min_sleep=3):
def _poll_challenges(
self, chall_update, best_effort, min_sleep=3, max_rounds=15):
"""Wait for all challenge results to be determined."""
dom_to_check = set(chall_update.keys())
comp_domains = set()
rounds = 0
while dom_to_check:
while dom_to_check and rounds < max_rounds:
# TODO: Use retry-after...
time.sleep(min_sleep)
for domain in dom_to_check:
@@ -181,13 +182,13 @@ class AuthHandler(object):
# Right now... just assume a loss and carry on...
if best_effort:
comp_domains.add(domain)
else:
raise errors.AuthorizationError(
"Failed Authorization procedure for %s" % domain)
dom_to_check -= comp_domains
comp_domains.clear()
rounds += 1
def _handle_check(self, domain, achalls):
"""Returns tuple of ('completed', 'failed')."""
@@ -226,7 +227,7 @@ class AuthHandler(object):
"""
for authzr_challb in authzr.body.challenges:
if type(authzr_challb.chall) is type(achall.challb.chall):
return achall.challb.status
return authzr_challb.status
raise errors.AuthorizationError(
"Target challenge not found in authorization resource")
@@ -268,7 +269,13 @@ class AuthHandler(object):
for achall in cont_c:
self.cont_c.remove(achall)
def _verify_authzr_complete(self):
def verify_authzr_complete(self):
"""Verifies that all authorizations have been decided.
:returns: Whether all authzr are complete
:rtype: bool
"""
for authzr in self.authzr.values():
if (authzr.body.status != messages2.STATUS_VALID and
authzr.body.status != messages2.STATUS_INVALID):
@@ -298,34 +305,7 @@ class AuthHandler(object):
challb = self.authzr[domain].body.challenges[index]
chall = challb.chall
if isinstance(chall, challenges.DVSNI):
logging.info(" DVSNI challenge for %s.", domain)
achall = achallenges.DVSNI(
challb=challb, domain=domain, key=self.account.key)
elif isinstance(chall, challenges.SimpleHTTPS):
logging.info(" SimpleHTTPS challenge for %s.", domain)
achall = achallenges.SimpleHTTPS(
challb=challb, domain=domain, key=self.account.key)
elif isinstance(chall, challenges.DNS):
logging.info(" DNS challenge for %s.", domain)
achall = achallenges.DNS(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryToken):
logging.info(" Recovery Token Challenge for %s.", domain)
achall = achallenges.RecoveryToken(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryContact):
logging.info(" Recovery Contact Challenge for %s.", domain)
achall = achallenges.RecoveryContact(
challb=challb, domain=domain)
elif isinstance(chall, challenges.ProofOfPossession):
logging.info(" Proof-of-Possession Challenge for %s", domain)
achall = achallenges.ProofOfPossession(
challb=challb, domain=domain)
else:
raise errors.LetsEncryptClientError(
"Received unsupported challenge of type: %s",
chall.typ)
achall = challb_to_achall(challb, self.account.key, domain)
if isinstance(chall, challenges.ContinuityChallenge):
cont_chall.append(achall)
@@ -335,6 +315,53 @@ class AuthHandler(object):
return cont_chall, dv_chall
def challb_to_achall(challb, key, domain):
"""Converts a ChallengeBody object to an AnnotatedChallenge.
:param challb: ChallengeBody
:type challb: :class:`letsencrypt.acme.messages2.ChallengeBody`
:param key: Key
:type key: :class:`letsencrypt.client.le_util.Key`
:param str domain: Domain of the challb
:returns: Appropriate AnnotatedChallenge
:rtype: :class:`letsencrypt.client.achallenges.AnnotatedChallenge`
"""
chall = challb.chall
if isinstance(chall, challenges.DVSNI):
logging.info(" DVSNI challenge for %s.", domain)
return achallenges.DVSNI(
challb=challb, domain=domain, key=key)
elif isinstance(chall, challenges.SimpleHTTPS):
logging.info(" SimpleHTTPS challenge for %s.", domain)
return achallenges.SimpleHTTPS(
challb=challb, domain=domain, key=key)
elif isinstance(chall, challenges.DNS):
logging.info(" DNS challenge for %s.", domain)
return achallenges.DNS(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryToken):
logging.info(" Recovery Token Challenge for %s.", domain)
return achallenges.RecoveryToken(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryContact):
logging.info(" Recovery Contact Challenge for %s.", domain)
return achallenges.RecoveryContact(
challb=challb, domain=domain)
elif isinstance(chall, challenges.ProofOfPossession):
logging.info(" Proof-of-Possession Challenge for %s", domain)
return achallenges.ProofOfPossession(
challb=challb, domain=domain)
else:
raise errors.LetsEncryptClientError(
"Received unsupported challenge of type: %s",
chall.typ)
def gen_challenge_path(challbs, preferences, combinations):
"""Generate a plan to get authority over the identity.

View File

@@ -150,6 +150,7 @@ class Network(object):
response.links['terms-of-service']['url']
if 'terms-of-service' in response.links else terms_of_service)
# TODO: Consider removing this check based on spec clarifications #93
if new_authzr_uri is None:
try:
new_authzr_uri = response.links['next']['url']
@@ -321,7 +322,7 @@ class Network(object):
# TODO: Right now Boulder responds with the authorization resource
# instead of a challenge resource... this can be uncommented
# once the error is fixed.
return challb
return None
# raise errors.NetworkError('"up" Link header missing')
challr = messages2.ChallengeResource(
authzr_uri=authzr_uri,

View File

@@ -1,4 +1,5 @@
"""Tests for letsencrypt.client.account."""
import logging
import mock
import os
import pkg_resources
@@ -13,6 +14,7 @@ from letsencrypt.acme import messages2
from letsencrypt.client import account
from letsencrypt.client import configuration
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client.display import util as display_util
@@ -22,6 +24,8 @@ class AccountTest(unittest.TestCase):
"""Tests letsencrypt.client.account.Account."""
def setUp(self):
logging.disable(logging.CRITICAL)
self.accounts_dir = tempfile.mkdtemp("accounts")
self.account_keys_dir = os.path.join(self.accounts_dir, "keys")
os.makedirs(self.account_keys_dir, 0o700)
@@ -51,6 +55,7 @@ class AccountTest(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.accounts_dir)
logging.disable(logging.NOTSET)
@mock.patch("letsencrypt.client.account.zope.component.getUtility")
@mock.patch("letsencrypt.client.account.crypto_util.init_save_key")
@@ -67,6 +72,15 @@ class AccountTest(unittest.TestCase):
self.assertEqual(acc.key, self.key)
self.assertEqual(acc.config, self.config)
@mock.patch("letsencrypt.client.account.zope.component.getUtility")
def test_prompts_cancel(self, mock_util):
# displayer = display_util.FileDisplay(sys.stdout)
# zope.component.provideUtility(displayer)
mock_util().input.return_value = (display_util.CANCEL, "")
self.assertTrue(account.Account.from_prompts(self.config) is None)
def test_save_from_existing_account(self):
self.test_account.save()
acc = account.Account.from_existing_account(self.config, self.email)
@@ -84,12 +98,25 @@ class AccountTest(unittest.TestCase):
def test_partial_properties(self):
partial = account.Account(self.config, self.key)
regr_no_authzr_uri = messages2.RegistrationResource(
uri="uri",
new_authzr_uri=None,
terms_of_service="terms_of_service",
body=messages2.Registration(
recovery_token="recovery_token", agreement="agreement")
)
partial2 = account.Account(
self.config, self.key, regr=regr_no_authzr_uri)
self.assertTrue(partial.uri is None)
self.assertTrue(partial.new_authzr_uri is None)
self.assertTrue(partial.terms_of_service is None)
self.assertTrue(partial.recovery_token is None)
self.assertEqual(
partial2.new_authzr_uri,
"https://letsencrypt-demo.org/acme/new-authz")
def test_partial_account_default(self):
partial = account.Account(self.config, self.key)
partial.save()
@@ -115,9 +142,23 @@ class AccountTest(unittest.TestCase):
accs = account.Account.get_accounts(self.config)
self.assertEqual(len(accs), 2)
def test_get_accounts_no_accounts(self):
self.assertEqual(account.Account.get_accounts(
mock.Mock(accounts_dir="non-existant")), [])
def test_failed_existing_account(self):
self.assertRaises(
errors.LetsEncryptClientError,
account.Account.from_existing_account,
self.config, "non-existant@email.org")
class SafeEmailTest(unittest.TestCase):
"""Test safe_email."""
def setUp(self):
logging.disable(logging.CRITICAL)
def tearDown(self):
logging.disable(logging.NOTSET)
@classmethod
def _call(cls, addr):
@@ -131,16 +172,16 @@ class SafeEmailTest(unittest.TestCase):
"abc_def.jdk@hotmail.museum"
]
for addr in addrs:
self.assertTrue(addr, "%s failed." % addr)
self.assertTrue(self._call(addr), "%s failed." % addr)
def test_invalid_emails(self):
addrs = [
"letsencrypt@letsencrypt..org",
".tbd.ade@gmail.com",
"~/abc_def.jdk@hotmail.museum"
"~/abc_def.jdk@hotmail.museum",
]
for addr in addrs:
self.assertTrue(addr, "%s failed." % addr)
self.assertFalse(self._call(addr), "%s failed." % addr)
if __name__ == "__main__":

View File

@@ -72,15 +72,11 @@ def gen_combos(challbs):
def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
"""Return ChallengeBody from Challenge.
:param str status: "valid", "invalid", "pending"...
"""
"""Return ChallengeBody from Challenge."""
kwargs = {
"chall": chall,
"uri": chall.typ+"_uri",
"status": messages2.Status(status),
"status": status,
}
if status == "valid":
@@ -90,12 +86,12 @@ def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
# Pending ChallengeBody objects
DVSNI_P = chall_to_challb(DVSNI, "pending")
SIMPLE_HTTPS_P = chall_to_challb(SIMPLE_HTTPS, "pending")
DNS_P = chall_to_challb(DNS, "pending")
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, "pending")
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, "pending")
POP_P = chall_to_challb(POP, "pending")
DVSNI_P = chall_to_challb(DVSNI, messages2.STATUS_PENDING)
SIMPLE_HTTPS_P = chall_to_challb(SIMPLE_HTTPS, messages2.STATUS_PENDING)
DNS_P = chall_to_challb(DNS, messages2.STATUS_PENDING)
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages2.STATUS_PENDING)
RECOVERY_TOKEN_P = chall_to_challb(RECOVERY_TOKEN, messages2.STATUS_PENDING)
POP_P = chall_to_challb(POP, messages2.STATUS_PENDING)
CHALLENGES_P = [SIMPLE_HTTPS_P, DVSNI_P, DNS_P,
RECOVERY_CONTACT_P, RECOVERY_TOKEN_P, POP_P]
@@ -110,17 +106,18 @@ CONT_CHALLENGES_P = [
def gen_authzr(authz_status, domain, challs, statuses, combos=True):
"""Generate an authorization resource.
:param str authz_status: "valid", "invalid", "pending"...
:param authz_status: Status object
:type authz_status: :class:`letsencrypt.acme.messages2.Status`
:param list challs: Challenge objects
:param list statuses: status of each challenge object e.g. "valid"...
:param list statuses: status of each challenge object
:param bool combos: Whether or not to add combinations
"""
# pylint: disable=redefined-outer-name
challbs = [
challbs = tuple(
chall_to_challb(chall, status)
for chall, status in itertools.izip(challs, statuses)
]
)
authz_kwargs = {
"identifier": messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value=domain),
@@ -128,12 +125,16 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
}
if combos:
authz_kwargs.update({"combinations": gen_combos(challbs)})
if authz_status == "valid":
if authz_status == messages2.STATUS_VALID:
now = datetime.datetime.now()
authz_kwargs.update({
"status": messages2.Status(authz_status),
"status": authz_status,
"expires": datetime.datetime(now.year, now.month+1, now.day),
})
else:
authz_kwargs.update({
"status": authz_status,
})
# pylint: disable=star-args
return messages2.AuthorizationResource(

View File

@@ -6,6 +6,7 @@ import unittest
import mock
from letsencrypt.acme import challenges
from letsencrypt.acme import messages2
from letsencrypt.client import errors
from letsencrypt.client import le_util
@@ -36,7 +37,8 @@ class ChallengeFactoryTest(unittest.TestCase):
self.dom = "test"
self.handler.authzr[self.dom] = acme_util.gen_authzr(
"pending", self.dom, acme_util.CHALLENGES, ["pending"]*6, False)
messages2.STATUS_PENDING, self.dom, acme_util.CHALLENGES,
[messages2.STATUS_PENDING]*6, False)
def test_all(self):
cont_c, dv_c = self.handler._challenge_factory(self.dom, range(0, 6))
@@ -55,8 +57,9 @@ class ChallengeFactoryTest(unittest.TestCase):
def test_unrecognized(self):
self.handler.authzr["failure.com"] = acme_util.gen_authzr(
"pending", "failure.com",
[mock.Mock(chall="chall", typ="unrecognized")], ["pending"])
messages2.STATUS_PENDING, "failure.com",
[mock.Mock(chall="chall", typ="unrecognized")],
[messages2.STATUS_PENDING])
self.assertRaises(errors.LetsEncryptClientError,
self.handler._challenge_factory, "failure.com", [0])
@@ -145,6 +148,14 @@ class GetAuthorizationsTest(unittest.TestCase):
self.assertEqual(len(authzr), 3)
def test_perform_failure(self):
self.mock_net.request_domain_challenges.side_effect = functools.partial(
gen_dom_authzr, challs=acme_util.CHALLENGES)
self.mock_dv_auth.perform.side_effect = errors.AuthorizationError
self.assertRaises(errors.AuthorizationError,
self.handler.get_authorizations, ["0"])
def _get_exp_response(self, domain, path, challs):
# pylint: disable=no-self-use
exp_resp = [None] * len(challs)
@@ -157,28 +168,129 @@ class GetAuthorizationsTest(unittest.TestCase):
for dom in self.handler.authzr.keys():
azr = self.handler.authzr[dom]
self.handler.authzr[dom] = acme_util.gen_authzr(
"valid", dom, [challb.chall for challb in azr.body.challenges],
["valid"]*len(azr.body.challenges), azr.body.combinations)
messages2.STATUS_VALID,
dom,
[challb.chall for challb in azr.body.challenges],
[messages2.STATUS_VALID]*len(azr.body.challenges),
azr.body.combinations)
class PollChallengesTest(unittest.TestCase):
# pylint: disable=protected-access
"""Test poll challenges."""
def setUp(self):
from letsencrypt.client.auth_handler import challb_to_achall
from letsencrypt.client.auth_handler import AuthHandler
# Account is mocked...
# Account and network are mocked...
self.mock_net = mock.MagicMock()
self.handler = AuthHandler(
None, None, None, mock.Mock(key="mock_key"))
None, None, self.mock_net, mock.Mock(key="mock_key"))
self.doms = ["0", "1", "2"]
self.handler.authzr[self.doms[0]] = acme_util.gen_authzr(
"pending", self.doms[0], acme_util.CHALLENGES, ["pending"]*6, False)
messages2.STATUS_PENDING, self.doms[0],
acme_util.DV_CHALLENGES, [messages2.STATUS_PENDING]*3, False)
self.handler.authzr[self.doms[1]] = acme_util.gen_authzr(
"pending", self.doms[1], acme_util.CHALLENGES, ["pending"]*6, False)
messages2.STATUS_PENDING, self.doms[1],
acme_util.DV_CHALLENGES, [messages2.STATUS_PENDING]*3, False)
self.handler.authzr[self.doms[2]] = acme_util.gen_authzr(
"pending", self.doms[2], acme_util.CHALLENGES, ["pending"]*6, False)
messages2.STATUS_PENDING, self.doms[2],
acme_util.DV_CHALLENGES, [messages2.STATUS_PENDING]*3, False)
self.chall_update = {}
for dom in self.doms:
self.chall_update[dom] = [
challb_to_achall(challb, "dummy_key", dom)
for challb in self.handler.authzr[dom].body.challenges]
@mock.patch("letsencrypt.client.auth_handler.time")
def test_poll_challenges(self, unused_mock_time):
self.mock_net.poll.side_effect = self._mock_poll_solve_one_valid
self.handler._poll_challenges(self.chall_update, False)
for authzr in self.handler.authzr.values():
self.assertEqual(authzr.body.status, messages2.STATUS_VALID)
@mock.patch("letsencrypt.client.auth_handler.time")
def test_poll_challenges_failure_best_effort(self, unused_mock_time):
self.mock_net.poll.side_effect = self._mock_poll_solve_one_invalid
self.handler._poll_challenges(self.chall_update, True)
for authzr in self.handler.authzr.values():
self.assertEqual(authzr.body.status, messages2.STATUS_PENDING)
@mock.patch("letsencrypt.client.auth_handler.time")
def test_poll_challenges_failure(self, unused_mock_time):
self.mock_net.poll.side_effect = self._mock_poll_solve_one_invalid
self.assertRaises(errors.AuthorizationError,
self.handler._poll_challenges,
self.chall_update, False)
@mock.patch("letsencrypt.client.auth_handler.time")
def test_unable_to_find_challenge_status(self, unused_mock_time):
from letsencrypt.client.auth_handler import challb_to_achall
self.mock_net.poll.side_effect = self._mock_poll_solve_one_valid
self.chall_update[self.doms[0]].append(
challb_to_achall(acme_util.RECOVERY_CONTACT_P, "key", self.doms[0]))
self.assertRaises(
errors.AuthorizationError,
self.handler._poll_challenges, self.chall_update, False)
def test_verify_authzr_failure(self):
self.assertRaises(
errors.AuthorizationError, self.handler.verify_authzr_complete)
def _mock_poll_solve_one_valid(self, authzr):
# Pending here because my dummy script won't change the full status.
# Basically it didn't raise an error and it stopped earlier than
# Making all challenges invalid which would make mock_poll_solve_one
# change authzr to invalid
return self._mock_poll_solve_one_chall(authzr, messages2.STATUS_VALID)
def _mock_poll_solve_one_invalid(self, authzr):
return self._mock_poll_solve_one_chall(authzr, messages2.STATUS_INVALID)
def _mock_poll_solve_one_chall(self, authzr, desired_status):
# pylint: disable=no-self-use
"""Dummy method that solves one chall at a time to desired_status.
When all are solved.. it changes authzr.status to desired_status
"""
new_challbs = authzr.body.challenges
for challb in authzr.body.challenges:
if challb.status != desired_status:
new_challbs = tuple(
challb_temp if challb_temp != challb
else acme_util.chall_to_challb(challb.chall, desired_status)
for challb_temp in authzr.body.challenges
)
break
if all(test_challb.status == desired_status
for test_challb in new_challbs):
status_ = desired_status
else:
status_ = authzr.body.status
new_authzr = messages2.AuthorizationResource(
uri=authzr.uri,
new_cert_uri=authzr.new_cert_uri,
body=messages2.Authorization(
identifier=authzr.body.identifier,
challenges=new_challbs,
combinations=authzr.body.combinations,
key=authzr.body.key,
contact=authzr.body.contact,
status=status_,
),
)
return (new_authzr, "response")
class GenChallengePathTest(unittest.TestCase):
"""Tests for letsencrypt.client.auth_handler.gen_challenge_path.
@@ -328,7 +440,8 @@ def gen_auth_resp(chall_list):
def gen_dom_authzr(domain, unused_new_authzr_uri, challs):
"""Generates new authzr for domains."""
return acme_util.gen_authzr(
"pending", domain, challs, ["pending"]*len(challs))
messages2.STATUS_PENDING, domain, challs,
[messages2.STATUS_PENDING]*len(challs))
def gen_path(required, challs):

View File

@@ -85,7 +85,6 @@ class ChooseAccountTest(unittest.TestCase):
@mock.patch("letsencrypt.client.display.ops.util")
def test_one(self, mock_util):
print self.acc1
mock_util().menu.return_value = (display_util.OK, 0)
self.assertEqual(self._call([self.acc1]), self.acc1)

View File

@@ -233,6 +233,12 @@ class NetworkTest(unittest.TestCase):
self.assertRaises(
errors.UnexpectedUpdate, self.net.update_registration, self.regr)
def test_agree_to_tos(self):
self.net.update_registration = mock.Mock()
self.net.agree_to_tos(self.regr)
regr = self.net.update_registration.call_args[0][0]
self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def test_request_challenges(self):
self.response.status_code = httplib.CREATED
self.response.headers['Location'] = self.authzr.uri
@@ -280,6 +286,7 @@ class NetworkTest(unittest.TestCase):
@unittest.skip("Skip til challenge_resource boulder issue is resolved")
def test_answer_challenge_missing_next(self):
# TODO: Change once acme-spec #93 is resolved/boulder issue
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.answer_challenge,
self.challr.body, challenges.DNSResponse())

View File

@@ -19,7 +19,7 @@ setenv =
basepython = python2.7
commands =
pip install -e .[testing]
python setup.py nosetests --with-coverage --cover-min-percentage=87
python setup.py nosetests --with-coverage --cover-min-percentage=89
[testenv:lint]
# recent versions of pylint do not support Python 2.6 (#97, #187)