1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-21 19:01:07 +03:00

unittest and lint cleanup

This commit is contained in:
James Kasten
2015-04-22 23:17:53 -07:00
parent 95ba2730f1
commit 12899d0c38
18 changed files with 146 additions and 115 deletions

View File

@@ -1,3 +1,4 @@
"""Creates ACME accounts for server."""
import logging
import os
import re
@@ -197,5 +198,5 @@ class Account(object):
"""Scrub email address before using it."""
if re.match(cls.EMAIL_REGEX, email):
return bool(not email.startswith(".") and ".." not in email)
logging.warn("Invalid email address: using default address.")
logging.warn("Invalid email address.")
return False

View File

@@ -10,8 +10,8 @@ from letsencrypt.client import achallenges
from letsencrypt.client import constants
from letsencrypt.client import errors
class AuthHandler(object): # pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes, too-few-public-methods
class AuthHandler(object):
"""ACME Authorization Handler for a client.
:ivar dv_auth: Authenticator capable of solving
@@ -108,7 +108,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
if self.dv_c:
dv_resp = self.dv_auth.perform(self.dv_c)
# This will catch both specific types of errors.
except errors.AuthorizationError as err:
except errors.AuthorizationError:
logging.critical("Failure in setting up challenges.")
logging.info("Attempting to clean up outstanding challenges...")
self._cleanup_challenges()
@@ -211,12 +211,18 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
return completed, failed
def _get_chall_status(self, authzr, achall):
def _get_chall_status(self, authzr, achall): # pylint: disable=no-self-use
"""Get the status of the challenge.
.. warning:: This assumes only one instance of type of challenge in
each challenge resource.
:param authzr: Authorization Resource
:type authzr: :class:`letsencrypt.acme.messages2.AuthorizationResource`
:param achall: Annotated challenge for which to get status
:type achall: :class:`letsencrypt.client.achallenges.AnnotatedChallenge`
"""
for authzr_challb in authzr.body.challenges:
if type(authzr_challb.chall) is type(achall.challb.chall):

View File

@@ -46,7 +46,7 @@ class Client(object):
"""
def __init__(self, config, account, dv_auth, installer):
def __init__(self, config, account_, dv_auth, installer):
"""Initialize a client.
:param dv_auth: IAuthenticator that can solve the
@@ -56,14 +56,14 @@ class Client(object):
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
self.account = account
self.account = account_
self.installer = installer
# TODO: Allow for other alg types besides RS256
self.network = network2.Network(
"https://%s/acme/new-reg" % config.server,
jwk.JWKRSA.load(account.key.pem))
jwk.JWKRSA.load(self.account.key.pem))
self.config = config
@@ -74,7 +74,7 @@ class Client(object):
else:
self.auth_handler = None
def register(self, save=True):
def register(self):
"""New Registration with the ACME server."""
self.account = self.network.register_from_account(self.account)
if self.account.terms_of_service:
@@ -167,16 +167,18 @@ class Client(object):
if certr.cert_chain_uri:
# TODO: Except
chain_cert = self.network.fetch_chain(certr.cert_chain_uri)
chain_file, act_chain_path = le_util.unique_file(chain_path, 0o644)
try:
chain_file.write(chain_cert.to_pem())
finally:
chain_file.close()
if chain_cert:
chain_file, act_chain_path = le_util.unique_file(
chain_path, 0o644)
try:
chain_file.write(chain_cert.to_pem())
finally:
chain_file.close()
logging.info("Cert chain written to %s", act_chain_path)
logging.info("Cert chain written to %s", act_chain_path)
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(act_chain_path)
# This expects a valid chain file
cert_chain_abspath = os.path.abspath(act_chain_path)
return os.path.abspath(act_cert_path), cert_chain_abspath

View File

@@ -470,6 +470,15 @@ class Network(object):
return self.request_issuance(csr, updated_authzrs), updated_authzrs
def _get_cert(self, uri):
"""Returns certificate from URI.
:param str uri: URI of certificate
:returns: tuple of the form
(response, :class:`letsencrypt.acme.jose.ComparableX509`)
:rtype: tuple
"""
content_type = self.DER_CONTENT_TYPE # TODO: make it a param
response = self._get(uri, headers={'Accept': content_type},
content_type=content_type)
@@ -521,7 +530,8 @@ class Network(object):
"""
if certr.cert_chain_uri is not None:
return self._get_cert(certr.cert_chain_uri)
_, cert = self._get_cert(certr.cert_chain_uri)
return cert
def revoke(self, certr, when=messages2.Revocation.NOW):
"""Revoke certificate.

View File

@@ -1008,7 +1008,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
sni_response = apache_dvsni.perform()
if sni_response:
# Must restart in order to activate the challenges.
# Handled here because we may be able to load up other challenge types
# Handled here because we may be able to load up other challenge
# types
self.restart()
# Go through all of the challenges and assign them to the proper

View File

@@ -18,6 +18,8 @@ from letsencrypt.client.plugins.apache import parser
from letsencrypt.client.plugins.apache.tests import util
from letsencrypt.client.tests import acme_util
class TwoVhost80Test(util.ApacheTest):
"""Test two standard well configured HTTP vhosts."""
@@ -157,14 +159,18 @@ class TwoVhost80Test(util.ApacheTest):
# Note: As more challenges are offered this will have to be expanded
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
achall1 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
"pending"),
domain="encryption-example.demo", key=auth_key)
achall2 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
"pending"),
domain="letsencrypt.demo", key=auth_key)
dvsni_ret_val = [

View File

@@ -14,6 +14,8 @@ from letsencrypt.client.plugins.apache.obj import Addr
from letsencrypt.client.plugins.apache.tests import util
from letsencrypt.client.tests import acme_util
class DvsniPerformTest(util.ApacheTest):
"""Test the ApacheDVSNI challenge."""
@@ -39,18 +41,22 @@ class DvsniPerformTest(util.ApacheTest):
auth_key = le_util.Key(rsa256_file, rsa256_pem)
self.achalls = [
achallenges.DVSNI(
chall=challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9\xf1"
"\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
), domain="encryption-example.demo", key=auth_key),
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9"
"\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
), "pending"),
domain="encryption-example.demo", key=auth_key),
achallenges.DVSNI(
chall=challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7"
"\xa1\xb2\xc5\x96\xba",
), domain="letsencrypt.demo", key=auth_key),
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7\xa1\xb2\xc5"
"\x96\xba",
), "pending"),
domain="letsencrypt.demo", key=auth_key),
]
def tearDown(self):

View File

@@ -15,6 +15,8 @@ from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import le_util
from letsencrypt.client.tests import acme_util
# Classes based on to allow interrupting infinite loop under test
# after one iteration, based on.
@@ -68,7 +70,7 @@ class SNICallbackTest(unittest.TestCase):
"letsencrypt.client.tests", "testdata/rsa256_key.pem")
key = le_util.Key("foo", test_key)
self.cert = achallenges.DVSNI(
chall=challenges.DVSNI(r="x"*32, nonce="abcdef"),
challb=acme_util.DVSNI_P,
domain="example.com", key=key).gen_cert_and_response()[0]
private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.pem)
@@ -301,10 +303,12 @@ class PerformTest(unittest.TestCase):
self.key = le_util.Key("something", test_key)
self.achall1 = achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="foo"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="foo"), "pending"),
domain="foo.example.com", key=self.key)
self.achall2 = achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="bar"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="bar"), "pending"),
domain="bar.example.com", key=self.key)
bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.achalls = [self.achall1, self.achall2, bad_achall]
@@ -350,12 +354,12 @@ class PerformTest(unittest.TestCase):
def test_perform_with_pending_tasks(self):
self.authenticator.tasks = {"foononce.acme.invalid": "cert_data"}
extra_achall = achallenges.DVSNI(chall="a", domain="b", key="c")
extra_achall = acme_util.DVSNI_P
self.assertRaises(
ValueError, self.authenticator.perform, [extra_achall])
def test_perform_without_challenge_list(self):
extra_achall = achallenges.DVSNI(chall="a", domain="b", key="c")
extra_achall = acme_util.DVSNI_P
# This is wrong because a challenge must be specified.
self.assertRaises(ValueError, self.authenticator.perform, [])
# This is wrong because it must be a list, not a bare challenge.
@@ -466,7 +470,8 @@ class DoChildProcessTest(unittest.TestCase):
key = le_util.Key("foo", test_key)
self.key = key
self.cert = achallenges.DVSNI(
chall=challenges.DVSNI(r="x"*32, nonce="abcdef"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="x"*32, nonce="abcdef"), "pending"),
domain="example.com", key=key).gen_cert_and_response()[0]
private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.pem)
@@ -559,7 +564,8 @@ class CleanupTest(unittest.TestCase):
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(None)
self.achall = achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="foononce"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="foononce"), "pending"),
domain="foo.example.com", key="key")
self.authenticator.tasks = {self.achall.nonce_domain: "stuff"}
self.authenticator.child_pid = 12345
@@ -579,7 +585,8 @@ class CleanupTest(unittest.TestCase):
def test_bad_cleanup(self):
self.assertRaises(
ValueError, self.authenticator.cleanup, [achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="badnonce"),
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="badnonce"), "pending"),
domain="bad.example.com", key="key")])

View File

@@ -1,3 +1,4 @@
"""Tests for letsencrypt.client.account."""
import mock
import os
import pkg_resources
@@ -66,10 +67,6 @@ class AccountTest(unittest.TestCase):
self.assertEqual(acc.key, self.key)
self.assertEqual(acc.config, self.config)
def test_save(self):
self.test_account.save()
self._read_out_config(self.email)
def test_save_from_existing_account(self):
self.test_account.save()
acc = account.Account.from_existing_account(self.config, self.email)
@@ -118,9 +115,6 @@ class AccountTest(unittest.TestCase):
accs = account.Account.get_accounts(self.config)
self.assertEqual(len(accs), 2)
def _read_out_config(self, filep):
print open(os.path.join(self.accounts_dir, filep)).read()
class SafeEmailTest(unittest.TestCase):
"""Test safe_email."""

View File

@@ -5,23 +5,24 @@ import re
import unittest
import M2Crypto
import mock
from letsencrypt.acme import challenges
from letsencrypt.client import le_util
from letsencrypt.client.tests import acme_util
class DVSNITest(unittest.TestCase):
"""Tests for letsencrypt.client.achallenges.DVSNI."""
def setUp(self):
self.chall = challenges.DVSNI(r="r_value", nonce="12345ABCDE")
self.chall = acme_util.chall_to_challb(
challenges.DVSNI(r="r_value", nonce="12345ABCDE"), "pending")
self.response = challenges.DVSNIResponse()
key = le_util.Key("path", pkg_resources.resource_string(
__name__, os.path.join("testdata", "rsa256_key.pem")))
from letsencrypt.client.achallenges import DVSNI
self.achall = DVSNI(chall=self.chall, domain="example.com", key=key)
self.achall = DVSNI(challb=self.chall, domain="example.com", key=key)
def test_proxy(self):
self.assertEqual(self.chall.r, self.achall.r)
@@ -41,22 +42,5 @@ class DVSNITest(unittest.TestCase):
)
class IndexedTest(unittest.TestCase):
"""Tests for letsencrypt.client.achallenges.Indexed."""
def setUp(self):
from letsencrypt.client.achallenges import Indexed
self.achall = mock.MagicMock()
self.ichall = Indexed(achall=self.achall, index=0)
def test_attributes(self):
self.assertEqual(self.achall, self.ichall.achall)
self.assertEqual(0, self.ichall.index)
def test_proxy(self):
self.assertEqual(self.achall.foo, self.ichall.foo)
if __name__ == "__main__":
unittest.main()

View File

@@ -71,7 +71,7 @@ def gen_combos(challbs):
for i in dv_chall for j in cont_chall)
def chall_to_challb(chall, status):
def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
"""Return ChallengeBody from Challenge.
:param str status: "valid", "invalid", "pending"...
@@ -86,7 +86,7 @@ def chall_to_challb(chall, status):
if status == "valid":
kwargs.update({"validated": datetime.datetime.now()})
return messages2.ChallengeBody(**kwargs)
return messages2.ChallengeBody(**kwargs) # pylint: disable=star-args
# Pending ChallengeBody objects
@@ -116,6 +116,7 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
:param bool combos: Whether or not to add combinations
"""
# pylint: disable=redefined-outer-name
challbs = [
chall_to_challb(chall, status)
for chall, status in itertools.izip(challs, statuses)
@@ -123,7 +124,7 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
authz_kwargs = {
"identifier": messages2.Identifier(
typ=messages2.IDENTIFIER_FQDN, value=domain),
"challenges": challbs,
"challenges": challbs,
}
if combos:
authz_kwargs.update({"combinations": gen_combos(challbs)})
@@ -134,8 +135,9 @@ def gen_authzr(authz_status, domain, challs, statuses, combos=True):
"expires": datetime.datetime(now.year, now.month+1, now.day),
})
# pylint: disable=star-args
return messages2.AuthorizationResource(
uri="https://trusted.ca/new-authz-resource",
new_cert_uri="https://trusted.ca/new-cert",
body=messages2.Authorization(**authz_kwargs)
)
)

View File

@@ -6,10 +6,7 @@ import unittest
import mock
from letsencrypt.acme import challenges
from letsencrypt.acme import messages2
from letsencrypt.client import account
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client import network2
@@ -156,7 +153,7 @@ class GetAuthorizationsTest(unittest.TestCase):
return exp_resp
def _validate_all(self, unused1, unused2):
def _validate_all(self, unused_1, unused_2):
for dom in self.handler.authzr.keys():
azr = self.handler.authzr[dom]
self.handler.authzr[dom] = acme_util.gen_authzr(
@@ -164,6 +161,25 @@ class GetAuthorizationsTest(unittest.TestCase):
["valid"]*len(azr.body.challenges), azr.body.combinations)
class PollChallengesTest(unittest.TestCase):
"""Test poll challenges."""
def setUp(self):
from letsencrypt.client.auth_handler import AuthHandler
# Account is mocked...
self.handler = AuthHandler(
None, None, None, mock.Mock(key="mock_key"))
self.doms = ["0", "1", "2"]
self.handler.authzr[self.doms[0]] = acme_util.gen_authzr(
"pending", self.doms[0], acme_util.CHALLENGES, ["pending"]*6, False)
self.handler.authzr[self.doms[1]] = acme_util.gen_authzr(
"pending", self.doms[1], acme_util.CHALLENGES, ["pending"]*6, False)
self.handler.authzr[self.doms[2]] = acme_util.gen_authzr(
"pending", self.doms[2], acme_util.CHALLENGES, ["pending"]*6, False)
class GenChallengePathTest(unittest.TestCase):
"""Tests for letsencrypt.client.auth_handler.gen_challenge_path.
@@ -208,11 +224,11 @@ class GenChallengePathTest(unittest.TestCase):
def test_full_cont_server(self):
challbs = (acme_util.RECOVERY_TOKEN_P,
acme_util.RECOVERY_CONTACT_P,
acme_util.POP_P,
acme_util.DVSNI_P,
acme_util.SIMPLE_HTTPS_P,
acme_util.DNS_P)
acme_util.RECOVERY_CONTACT_P,
acme_util.POP_P,
acme_util.DVSNI_P,
acme_util.SIMPLE_HTTPS_P,
acme_util.DNS_P)
# Typical webserver client that can do everything except DNS
# Attempted to make the order realistic
prefs = [challenges.RecoveryToken,
@@ -295,7 +311,8 @@ class IsPreferredTest(unittest.TestCase):
def test_mutually_exclusvie(self):
self.assertFalse(
self._call(acme_util.DVSNI_P, frozenset([acme_util.SIMPLE_HTTPS_P])))
self._call(
acme_util.DVSNI_P, frozenset([acme_util.SIMPLE_HTTPS_P])))
def test_mutually_exclusive_same_type(self):
self.assertTrue(

View File

@@ -15,12 +15,12 @@ from letsencrypt.client import le_util
class DetermineAccountTest(unittest.TestCase):
def setUp(self):
self.accounts_dir = tempfile.mkdtemp("accounts")
self.account_keys_dir = os.path.join(self.accounts_dir, "keys")
os.makedirs(self.account_keys_dir, 0o700)
account_keys_dir = os.path.join(self.accounts_dir, "keys")
os.makedirs(account_keys_dir, 0o700)
self.config = mock.MagicMock(
spec=configuration.NamespaceConfig, accounts_dir=self.accounts_dir,
account_keys_dir=self.account_keys_dir, rsa_key_size=2048,
account_keys_dir=account_keys_dir, rsa_key_size=2048,
server="letsencrypt-demo.org")
def tearDown(self):
@@ -29,6 +29,7 @@ class DetermineAccountTest(unittest.TestCase):
@mock.patch("letsencrypt.client.client.account.Account.from_prompts")
@mock.patch("letsencrypt.client.client.display_ops.choose_account")
def determine_account(self, mock_op, mock_prompt):
"""Test determine account"""
from letsencrypt.client import client
key = le_util.Key("file", "pem")
@@ -46,7 +47,7 @@ class DetermineAccountTest(unittest.TestCase):
# Test multiple
self.assertFalse(mock_op.called)
acc2 = account.Account(self.config, self.key)
acc2 = account.Account(self.config, key)
acc2.save()
chosen_acc = client.determine_account(self.config)
self.assertTrue(mock_op.called)

View File

@@ -21,14 +21,14 @@ class PerformTest(unittest.TestCase):
name="rec_token_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
token = achallenges.RecoveryToken(chall=None, domain="0")
token = achallenges.RecoveryToken(challb=None, domain="0")
responses = self.auth.perform([token])
self.assertEqual(responses, ["RecoveryToken0"])
def test_rec_token5(self):
tokens = []
for i in xrange(5):
tokens.append(achallenges.RecoveryToken(chall=None, domain=str(i)))
tokens.append(achallenges.RecoveryToken(challb=None, domain=str(i)))
responses = self.auth.perform(tokens)
@@ -39,7 +39,7 @@ class PerformTest(unittest.TestCase):
def test_unexpected(self):
self.assertRaises(
errors.LetsEncryptContAuthError, self.auth.perform, [
achallenges.DVSNI(chall=None, domain="0", key="invalid_key")])
achallenges.DVSNI(challb=None, domain="0", key="invalid_key")])
def test_chall_pref(self):
self.assertEqual(
@@ -58,8 +58,8 @@ class CleanupTest(unittest.TestCase):
self.auth.rec_token.cleanup = self.mock_cleanup
def test_rec_token2(self):
token1 = achallenges.RecoveryToken(chall=None, domain="0")
token2 = achallenges.RecoveryToken(chall=None, domain="1")
token1 = achallenges.RecoveryToken(challb=None, domain="0")
token2 = achallenges.RecoveryToken(challb=None, domain="1")
self.auth.cleanup([token1, token2])
@@ -67,8 +67,8 @@ class CleanupTest(unittest.TestCase):
[mock.call(token1), mock.call(token2)])
def test_unexpected(self):
token = achallenges.RecoveryToken(chall=None, domain="0")
unexpected = achallenges.DVSNI(chall=None, domain="0", key="dummy_key")
token = achallenges.RecoveryToken(challb=None, domain="0")
unexpected = achallenges.DVSNI(challb=None, domain="0", key="dummy_key")
self.assertRaises(errors.LetsEncryptContAuthError,
self.auth.cleanup, [token, unexpected])

View File

@@ -128,9 +128,9 @@ class NcursesDisplayTest(DisplayT):
self.displayer.checklist("message", self.tags)
choices = [
(self.tags[0], "", False),
(self.tags[1], "", False),
(self.tags[2], "", False)
(self.tags[0], "", True),
(self.tags[1], "", True),
(self.tags[2], "", True),
]
mock_checklist.assert_called_with(
"message", width=display_util.WIDTH, height=display_util.HEIGHT,

View File

@@ -253,19 +253,12 @@ class NetworkTest(unittest.TestCase):
self.assertRaises(errors.UnexpectedUpdate, self.net.answer_challenge,
self.challr.body.update(uri='foo'), chall_response)
@unittest.skip("Skip til challenge_resource boulder issue is resolved")
def test_answer_challenge_missing_next(self):
self._mock_post_get()
self.assertRaises(errors.NetworkError, self.net.answer_challenge,
self.challr.body, challenges.DNSResponse())
def test_answer_challenges(self):
self.net.answer_challenge = mock.MagicMock()
self.assertEqual(
[self.net.answer_challenge(
self.challr.body, challenges.DNSResponse())],
self.net.answer_challenges(
[self.challr.body], [challenges.DNSResponse()]))
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
self.assertEqual(

View File

@@ -39,22 +39,23 @@ class RecoveryTokenTest(unittest.TestCase):
self.assertFalse(self.rec_token.requires_human("example3.com"))
self.rec_token.cleanup(achallenges.RecoveryToken(
chall=None, domain="example3.com"))
challb=challenges.RecoveryToken(), domain="example3.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
# Shouldn't throw an error
self.rec_token.cleanup(achallenges.RecoveryToken(
chall=None, domain="example4.com"))
challb=None, domain="example4.com"))
# SHOULD throw an error (OSError other than nonexistent file)
self.assertRaises(
OSError, self.rec_token.cleanup,
achallenges.RecoveryToken(chall=None, domain="a"+"r"*10000+".com"))
achallenges.RecoveryToken(challb=None, domain="a"+"r"*10000+".com"))
def test_perform_stored(self):
self.rec_token.store_token("example4.com", 444)
response = self.rec_token.perform(
achallenges.RecoveryToken(chall=None, domain="example4.com"))
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example4.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="444"))
@@ -63,12 +64,14 @@ class RecoveryTokenTest(unittest.TestCase):
def test_perform_not_stored(self, mock_input):
mock_input().input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(
achallenges.RecoveryToken(chall=None, domain="example5.com"))
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example5.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="555"))
response = self.rec_token.perform(
achallenges.RecoveryToken(chall=None, domain="example6.com"))
achallenges.RecoveryToken(
challb=challenges.RecoveryToken(), domain="example6.com"))
self.assertTrue(response is None)

View File

@@ -18,10 +18,8 @@ import letsencrypt
from letsencrypt.client import configuration
from letsencrypt.client import client
from letsencrypt.client import crypto_util
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
from letsencrypt.client import log
from letsencrypt.client.display import util as display_util
from letsencrypt.client.display import ops as display_ops