mirror of
https://github.com/certbot/certbot.git
synced 2026-01-21 19:01:07 +03:00
Incorporated feedback and added extra error checking to POP
This commit is contained in:
@@ -33,15 +33,18 @@ class ProofOfPossession(object): # pylint: disable=too-few-public-methods
|
||||
or False
|
||||
|
||||
"""
|
||||
if (not isinstance(achall.challb.hints.jwk, achall.challb.alg.kty) or
|
||||
achall.challb.alg in [jose.HS256, jose.HS384, jose.HS512]):
|
||||
if (achall.alg in [jose.HS256, jose.HS384, jose.HS512] or
|
||||
not isinstance(achall.hints.jwk, achall.alg.kty)):
|
||||
return None
|
||||
|
||||
# This will work regardless of how JWKES is implemented
|
||||
for cert, key, _ in self.installer.get_all_certs_keys():
|
||||
der_cert_key = M2Crypto.X509.load_cert(cert).get_pubkey().as_der()
|
||||
cert_key = achall.challb.alg.kty.load(der_cert_key)
|
||||
if cert_key == achall.challb.hints.jwk:
|
||||
try:
|
||||
cert_key = achall.alg.kty.load(der_cert_key)
|
||||
# If JWKES.load raises other exceptions, they should be caught here
|
||||
except (IndexError, ValueError, TypeError):
|
||||
continue
|
||||
if cert_key == achall.hints.jwk:
|
||||
return self._gen_response(achall, key)
|
||||
|
||||
# Is there are different prompt we should give the user?
|
||||
@@ -54,29 +57,29 @@ class ProofOfPossession(object): # pylint: disable=too-few-public-methods
|
||||
# If we get here, the key wasn't found
|
||||
return False
|
||||
|
||||
def _gen_response(self, challb, key_path): # pylint: disable=no-self-use
|
||||
def _gen_response(self, achall, key_path): # pylint: disable=no-self-use
|
||||
"""Create the response to the Proof of Possession Challenge.
|
||||
|
||||
:param challb: Proof of Possession Challenge
|
||||
:type challb: :class:`letsencrypt.acme.challenges.ProofOfPossession`
|
||||
:param achall: Proof of Possession Challenge
|
||||
:type achall: :class:`letsencrypt.client.achallenges.ProofOfPossession`
|
||||
|
||||
:param str key_path: Path to the key corresponding to the hinted to
|
||||
public key.
|
||||
|
||||
:returns: Response or None/False if the challenge cannot be completed
|
||||
:returns: Response or False if the challenge cannot be completed
|
||||
:rtype: :class:`letsencrypt.acme.challenges.ProofOfPossessionResponse`
|
||||
or False
|
||||
|
||||
"""
|
||||
|
||||
if os.path.isfile(key_path):
|
||||
with open(key_path, 'rb') as key:
|
||||
try:
|
||||
jwk = challb.alg.kty.load(key.read())
|
||||
except (IndexError, ValueError, TypeError):
|
||||
# Needs to be changed if JWKES doesn't have a key attribute
|
||||
jwk = achall.alg.kty.load(key.read())
|
||||
sig = other.Signature.from_msg(achall.nonce, jwk.key,
|
||||
alg=achall.alg)
|
||||
except (IndexError, ValueError, TypeError, jose.errors.Error):
|
||||
return False
|
||||
# If JWKES doesn't have a key attribute, this needs to be modified
|
||||
sig = other.Signature.from_msg(challb.nonce, jwk.key,
|
||||
alg=challb.alg)
|
||||
return challenges.ProofOfPossessionResponse(nonce=challb.nonce,
|
||||
return challenges.ProofOfPossessionResponse(nonce=achall.nonce,
|
||||
signature=sig)
|
||||
return False
|
||||
|
||||
@@ -8,6 +8,7 @@ import mock
|
||||
|
||||
from letsencrypt.acme import challenges
|
||||
from letsencrypt.acme import jose
|
||||
from letsencrypt.acme import messages2
|
||||
from letsencrypt.client import achallenges
|
||||
from letsencrypt.client import proof_of_possession
|
||||
from letsencrypt.client.display import util as display_util
|
||||
@@ -19,46 +20,53 @@ CERT0_PATH = pkg_resources.resource_filename(
|
||||
CERT1_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "cert-san.pem"))
|
||||
CERT2_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "dsa_cert.pem"))
|
||||
CERT2_KEY_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "dsa512_key.pem"))
|
||||
CERT3_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "matching_cert.pem"))
|
||||
KEY_PATH = pkg_resources.resource_filename(
|
||||
CERT3_KEY_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "rsa512_key.pem"))
|
||||
KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
|
||||
CERT3_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
|
||||
BASE_PACKAGE, os.path.join('testdata', 'rsa512_key.pem'))).publickey()
|
||||
|
||||
|
||||
class ProofOfPossessionTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.installer = mock.MagicMock()
|
||||
certs = [CERT0_PATH, CERT1_PATH, CERT2_PATH, CERT3_PATH]
|
||||
keys = [None, None, CERT2_KEY_PATH, CERT3_KEY_PATH]
|
||||
self.installer.get_all_certs_keys.return_value = zip(
|
||||
[CERT0_PATH, CERT1_PATH, CERT2_PATH], 3 * [KEY_PATH], 3 * [None])
|
||||
certs, keys, 4 * [None])
|
||||
self.proof_of_pos = proof_of_possession.ProofOfPossession(
|
||||
self.installer)
|
||||
|
||||
hints = challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.JWKRSA(key=KEY), cert_fingerprints=(),
|
||||
jwk=jose.JWKRSA(key=CERT3_KEY), cert_fingerprints=(),
|
||||
certs=(), serial_numbers=(), subject_key_identifiers=(),
|
||||
issuers=(), authorized_for=())
|
||||
challenge = challenges.ProofOfPossession(
|
||||
chall = challenges.ProofOfPossession(
|
||||
alg=jose.RS256, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
|
||||
challb = messages2.ChallengeBody(
|
||||
chall=chall, uri="http://example", status=messages2.STATUS_PENDING)
|
||||
self.achall = achallenges.ProofOfPossession(
|
||||
challb=challenge, domain="example.com")
|
||||
challb=challb, domain="example.com")
|
||||
|
||||
def test_perform_bad_challenge(self):
|
||||
hints = challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.jwk.JWKOct(key=KEY), cert_fingerprints=(),
|
||||
jwk=jose.jwk.JWKOct(key=CERT3_KEY), cert_fingerprints=(),
|
||||
certs=(), serial_numbers=(), subject_key_identifiers=(),
|
||||
issuers=(), authorized_for=())
|
||||
challenge = challenges.ProofOfPossession(
|
||||
chall = challenges.ProofOfPossession(
|
||||
alg=jose.HS512, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
|
||||
challb = messages2.ChallengeBody(
|
||||
chall=chall, uri="http://example", status=messages2.STATUS_PENDING)
|
||||
self.achall = achallenges.ProofOfPossession(
|
||||
challb=challenge, domain="example.com")
|
||||
|
||||
response = self.proof_of_pos.perform(self.achall)
|
||||
self.assertEqual(response, None)
|
||||
challb=challb, domain="example.com")
|
||||
self.assertEqual(self.proof_of_pos.perform(self.achall), None)
|
||||
|
||||
def test_perform_no_input(self):
|
||||
response = self.proof_of_pos.perform(self.achall)
|
||||
self.assertTrue(response.verify())
|
||||
self.assertTrue(self.proof_of_pos.perform(self.achall).verify())
|
||||
|
||||
@mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility")
|
||||
def test_perform_with_input(self, mock_input):
|
||||
@@ -66,16 +74,12 @@ class ProofOfPossessionTest(unittest.TestCase):
|
||||
self.installer.get_all_certs_keys.return_value.pop()
|
||||
mock_input().input.side_effect = [(display_util.CANCEL, ""),
|
||||
(display_util.OK, CERT0_PATH),
|
||||
(display_util.OK, KEY_PATH)]
|
||||
|
||||
response = self.proof_of_pos.perform(self.achall)
|
||||
self.assertFalse(response)
|
||||
|
||||
response = self.proof_of_pos.perform(self.achall)
|
||||
self.assertFalse(response)
|
||||
|
||||
response = self.proof_of_pos.perform(self.achall)
|
||||
self.assertTrue(response.verify())
|
||||
(display_util.OK, "imaginary_file"),
|
||||
(display_util.OK, CERT3_KEY_PATH)]
|
||||
self.assertFalse(self.proof_of_pos.perform(self.achall))
|
||||
self.assertFalse(self.proof_of_pos.perform(self.achall))
|
||||
self.assertFalse(self.proof_of_pos.perform(self.achall))
|
||||
self.assertTrue(self.proof_of_pos.perform(self.achall).verify())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
14
letsencrypt/client/tests/testdata/dsa512_key.pem
vendored
Normal file
14
letsencrypt/client/tests/testdata/dsa512_key.pem
vendored
Normal file
@@ -0,0 +1,14 @@
|
||||
-----BEGIN DSA PARAMETERS-----
|
||||
MIGdAkEAwebEoGBfokKQeALHHnAZMQwYU35ILEBdV8oUmzv7qpSVUoHihyqfn6GC
|
||||
OixAKSP8EJYcTilIqPbFbfFyOPlbLwIVANoFHEDiQgknAvKrG78pHzAJdQSPAkEA
|
||||
qfka5Bnl+CeEMpzVZGrOVqZE/LFdZK9eT6YtWjzqtIkf3hwXUVxJsTnBG4xmrfvl
|
||||
41pgNJpgu99YOYqPpS0g7A==
|
||||
-----END DSA PARAMETERS-----
|
||||
-----BEGIN DSA PRIVATE KEY-----
|
||||
MIH5AgEAAkEAwebEoGBfokKQeALHHnAZMQwYU35ILEBdV8oUmzv7qpSVUoHihyqf
|
||||
n6GCOixAKSP8EJYcTilIqPbFbfFyOPlbLwIVANoFHEDiQgknAvKrG78pHzAJdQSP
|
||||
AkEAqfka5Bnl+CeEMpzVZGrOVqZE/LFdZK9eT6YtWjzqtIkf3hwXUVxJsTnBG4xm
|
||||
rfvl41pgNJpgu99YOYqPpS0g7AJATQ2LUzjGQSM6UljcPY5I2OD9THkUR9kH2tth
|
||||
zZd70UoI9btrVaTizgqYShuok94glSQNK0H92JgUk3scJPaAkAIVAMDn61h6vrCE
|
||||
mNv063So6E+eYaIN
|
||||
-----END DSA PRIVATE KEY-----
|
||||
17
letsencrypt/client/tests/testdata/dsa_cert.pem
vendored
Normal file
17
letsencrypt/client/tests/testdata/dsa_cert.pem
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICuDCCAnWgAwIBAgIJAPjmErVMzwVLMAsGCWCGSAFlAwQDAjB3MQswCQYDVQQG
|
||||
EwJVUzERMA8GA1UECAwITWljaGlnYW4xEjAQBgNVBAcMCUFubiBBcmJvcjErMCkG
|
||||
A1UECgwiVW5pdmVyc2l0eSBvZiBNaWNoaWdhbiBhbmQgdGhlIEVGRjEUMBIGA1UE
|
||||
AwwLZXhhbXBsZS5jb20wHhcNMTUwNTEyMTUzOTQzWhcNMTUwNjExMTUzOTQzWjB3
|
||||
MQswCQYDVQQGEwJVUzERMA8GA1UECAwITWljaGlnYW4xEjAQBgNVBAcMCUFubiBB
|
||||
cmJvcjErMCkGA1UECgwiVW5pdmVyc2l0eSBvZiBNaWNoaWdhbiBhbmQgdGhlIEVG
|
||||
RjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wgfEwgakGByqGSM44BAEwgZ0CQQDB5sSg
|
||||
YF+iQpB4AscecBkxDBhTfkgsQF1XyhSbO/uqlJVSgeKHKp+foYI6LEApI/wQlhxO
|
||||
KUio9sVt8XI4+VsvAhUA2gUcQOJCCScC8qsbvykfMAl1BI8CQQCp+RrkGeX4J4Qy
|
||||
nNVkas5WpkT8sV1kr15Ppi1aPOq0iR/eHBdRXEmxOcEbjGat++XjWmA0mmC731g5
|
||||
io+lLSDsA0MAAkBNDYtTOMZBIzpSWNw9jkjY4P1MeRRH2Qfa22HNl3vRSgj1u2tV
|
||||
pOLOCphKG6iT3iCVJA0rQf3YmBSTexwk9oCQo1AwTjAdBgNVHQ4EFgQUZ2DlTDGU
|
||||
PMwTUt0KztM6IyX61BcwHwYDVR0jBBgwFoAUZ2DlTDGUPMwTUt0KztM6IyX61Bcw
|
||||
DAYDVR0TBAUwAwEB/zALBglghkgBZQMEAwIDMAAwLQIVAIbMgGx+KwBr4rgqZ2Lh
|
||||
AAO8TegHAhQsuxpIIIphiReoWEtEJk4TqEIz/A==
|
||||
-----END CERTIFICATE-----
|
||||
Reference in New Issue
Block a user