diff --git a/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/tests/dns_rfc2136_test.py b/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/tests/dns_rfc2136_test.py index 3a82f1b65..d39afccaa 100644 --- a/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/tests/dns_rfc2136_test.py +++ b/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/tests/dns_rfc2136_test.py @@ -40,8 +40,19 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic self.mock_client = mock.MagicMock() # _get_rfc2136_client | pylint: disable=protected-access + self.orig_get_client = self.auth._get_rfc2136_client self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client) + def test_get_client_default_conf_values(self): + # algorithm and sign_query are intentionally absent to test that the default (None) + # value does not crash Certbot. + creds = { "server": SERVER, "port": PORT, "name": NAME, "secret": SECRET } + self.auth.credentials = mock.MagicMock() + self.auth.credentials.conf = lambda key: creds.get(key, None) + client = self.orig_get_client() + assert client.algorithm == self.auth.ALGORITHMS["HMAC-MD5"] + assert client.sign_query == False + @test_util.patch_display_util() def test_perform(self, unused_mock_get_utility): self.auth.perform([self.achall]) @@ -100,7 +111,7 @@ class RFC2136ClientTest(unittest.TestCase): from certbot_dns_rfc2136._internal.dns_rfc2136 import _RFC2136Client self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5, - TIMEOUT) + False, TIMEOUT) @mock.patch("dns.query.tcp") def test_add_txt_record(self, query_mock): @@ -177,14 +188,17 @@ class RFC2136ClientTest(unittest.TestCase): self.rfc2136_client._find_domain('foo.bar.'+DOMAIN) @mock.patch("dns.query.tcp") - def test_query_soa_found(self, query_mock): + @mock.patch("dns.message.make_query") + def test_query_soa_found(self, mock_make_query, query_mock): query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA) query_mock.return_value.rcode.return_value = dns.rcode.NOERROR + mock_make_query.return_value = mock.MagicMock() # _query_soa | pylint: disable=protected-access result = self.rfc2136_client._query_soa(DOMAIN) query_mock.assert_called_with(mock.ANY, SERVER, TIMEOUT, PORT) + mock_make_query.return_value.use_tsig.assert_not_called() assert result @mock.patch("dns.query.tcp") @@ -218,6 +232,17 @@ class RFC2136ClientTest(unittest.TestCase): udp_mock.assert_called_with(mock.ANY, SERVER, TIMEOUT, PORT) assert result + @mock.patch("dns.query.tcp") + @mock.patch("dns.message.make_query") + def test_query_soa_signed(self, mock_make_query, unused_mock_query): + mock_make_query.return_value = mock.MagicMock() + self.rfc2136_client.sign_query = True + self.rfc2136_client.algorithm = "alg0" + + self.rfc2136_client._query_soa(DOMAIN) + + mock_make_query.return_value.use_tsig.assert_called_with(mock.ANY, algorithm="alg0") + if __name__ == "__main__": sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover