1
0
mirror of https://gitlab.isc.org/isc-projects/bind9.git synced 2025-04-18 09:44:09 +03:00

Add support for TSIG in isctest.kasp

For some kasp test we are going to need TSIG based queries to
differentiate between views.
This commit is contained in:
Matthijs Mekking 2025-03-14 11:19:40 +01:00
parent 12e57eb222
commit 9cb287afa0

View File

@ -21,6 +21,7 @@ from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta, timezone
import dns
import dns.tsig
import isctest.log
import isctest.query
@ -29,8 +30,14 @@ DEFAULT_TTL = 300
NEXT_KEY_EVENT_THRESHOLD = 100
def _query(server, qname, qtype):
def _query(server, qname, qtype, tsig=None):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
if tsig is not None:
tsigkey = tsig.split(":")
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring)
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
except dns.exception.Timeout:
@ -554,14 +561,14 @@ class Key:
return self.path
def check_zone_is_signed(server, zone):
def check_zone_is_signed(server, zone, tsig=None):
addr = server.ip
fqdn = f"{zone}."
# wait until zone is fully signed
signed = False
for _ in range(10):
response = _query(server, fqdn, dns.rdatatype.NSEC)
response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig)
if not isinstance(response, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} NSEC from {addr}")
elif response.rcode() != dns.rcode.NOERROR:
@ -711,13 +718,13 @@ def check_keyrelationships(keys, expected):
assert key.is_metadata_consistent(status, expect.metadata)
def check_dnssec_verify(server, zone):
def check_dnssec_verify(server, zone, tsig=None):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."
verified = False
for _ in range(10):
transfer = _query(server, fqdn, dns.rdatatype.AXFR)
transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig)
if not isinstance(transfer, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
elif transfer.rcode() != dns.rcode.NOERROR:
@ -936,8 +943,8 @@ def check_cds(rrset, keys):
assert numcds == len(cdss)
def _query_rrset(server, fqdn, qtype):
response = _query(server, fqdn, qtype)
def _query_rrset(server, fqdn, qtype, tsig=None):
response = _query(server, fqdn, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
rrs = []
@ -957,46 +964,46 @@ def _query_rrset(server, fqdn, qtype):
return rrs, rrsigs
def check_apex(server, zone, ksks, zsks):
def check_apex(server, zone, ksks, zsks, tsig=None):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY)
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
assert len(dnskeys) > 0
check_dnskeys(dnskeys, ksks, zsks)
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA)
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY)
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS)
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
def check_subdomain(server, zone, ksks, zsks):
def check_subdomain(server, zone, ksks, zsks, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
qtype = dns.rdatatype.A
response = _query(server, qname, qtype)
response = _query(server, qname, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"