1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Remove the need for route53:ListResourceRecordSets

* add test_change_txt_record_delete
This commit is contained in:
Brad Warren
2018-03-05 18:36:03 -08:00
parent fe682e779b
commit 7bc45121a1
2 changed files with 31 additions and 47 deletions

View File

@@ -1,4 +1,5 @@
"""Certbot Route53 authenticator plugin."""
import collections
import logging
import time
@@ -33,6 +34,7 @@ class Authenticator(dns_common.DNSAuthenticator):
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.r53 = boto3.client("route53")
self._resource_records = collections.defaultdict(list)
def more_info(self): # pylint: disable=missing-docstring,no-self-use
return "Solve a DNS01 challenge using AWS Route53"
@@ -85,28 +87,22 @@ class Authenticator(dns_common.DNSAuthenticator):
zones.sort(key=lambda z: len(z[0]), reverse=True)
return zones[0][1]
def _get_validation_rrset(self, zone_id, validation_domain_name):
validation_domain_name += "."
records = self.r53.list_resource_record_sets(HostedZoneId=zone_id)
for record in records["ResourceRecordSets"]:
if record["Name"] == validation_domain_name and record["Type"] == "TXT":
return record["ResourceRecords"]
return []
def _change_txt_record(self, action, validation_domain_name, validation):
zone_id = self._find_zone_id_for_domain(validation_domain_name)
rrecords = self._get_validation_rrset(zone_id, validation_domain_name)
rrecords = self._resource_records[validation_domain_name]
challenge = {"Value": '"{0}"'.format(validation)}
if action == "DELETE":
if len(rrecords) > 1:
# Remove the record being deleted from the list of tracked records
rrecords.remove(challenge)
if rrecords:
# Need to update instead, as we're not deleting the rrset
action = "UPSERT"
# Remove the record being deleted from the list
rrecords = [rr for rr in rrecords if rr != challenge]
else:
# Create a new list containing the record to use with DELETE
rrecords = [challenge]
else:
if challenge not in rrecords:
rrecords.append(challenge)
rrecords.append(challenge)
response = self.r53.change_resource_record_sets(
HostedZoneId=zone_id,

View File

@@ -178,9 +178,6 @@ class ClientTest(unittest.TestCase):
def test_change_txt_record(self):
self.client._find_zone_id_for_domain = mock.MagicMock()
self.client._get_validation_rrset = mock.MagicMock(
return_value=[]
)
self.client.r53.change_resource_record_sets = mock.MagicMock(
return_value={"ChangeInfo": {"Id": 1}})
@@ -189,10 +186,30 @@ class ClientTest(unittest.TestCase):
call_count = self.client.r53.change_resource_record_sets.call_count
self.assertEqual(call_count, 1)
def test_change_txt_record_delete(self):
self.client._find_zone_id_for_domain = mock.MagicMock()
self.client.r53.change_resource_record_sets = mock.MagicMock(
return_value={"ChangeInfo": {"Id": 1}})
validation = "some-value"
validation_record = {"Value": '"{0}"'.format(validation)}
self.client._resource_records[DOMAIN] = [validation_record]
self.client._change_txt_record("DELETE", DOMAIN, validation)
call_count = self.client.r53.change_resource_record_sets.call_count
self.assertEqual(call_count, 1)
call_args = self.client.r53.change_resource_record_sets.call_args_list[0][1]
call_args_batch = call_args["ChangeBatch"]["Changes"][0]
self.assertEqual(call_args_batch["Action"], "DELETE")
self.assertEqual(
call_args_batch["ResourceRecordSet"]["ResourceRecords"],
[validation_record])
def test_change_txt_record_multirecord(self):
self.client._find_zone_id_for_domain = mock.MagicMock()
self.client._get_validation_rrset = mock.MagicMock()
self.client._get_validation_rrset.return_value = [
self.client._resource_records[DOMAIN] = [
{"Value": "\"pre-existing-value\""},
{"Value": "\"pre-existing-value-two\""},
]
@@ -211,35 +228,6 @@ class ClientTest(unittest.TestCase):
self.assertEqual(call_count, 1)
def test_get_validation_rrset(self):
self.client.r53.list_resource_record_sets = mock.MagicMock(
return_value={"ResourceRecordSets": [
{"Name": "_acme-challenge.example.org.",
"Type": "TXT",
"ResourceRecords": [
{"Value": "\"validation-token\""},
{"Value": "\"another-validation-token\""},
],
},
{"Name": "_acme-challenge.example.org.",
"Type": "NS",
"ResourceRecords": [
{"Value": "ns1.example.com"},
],
}
]})
rrset = self.client._get_validation_rrset("zoneid",
"_acme-challenge.example.org")
self.assertEquals(len(rrset), 2)
self.assertTrue({"Value": "\"another-validation-token\""} in rrset)
def test_get_validation_rrset_empty(self):
self.client.r53.list_resource_record_sets = mock.MagicMock(
return_value={"ResourceRecordSets": []})
rrset = self.client._get_validation_rrset("zoneid",
"_acme-challenge.example.org")
self.assertEquals(rrset, [])
def test_wait_for_change(self):
self.client.r53.get_change = mock.MagicMock(
side_effect=[{"ChangeInfo": {"Status": "PENDING"}},