1
0
mirror of https://gitlab.isc.org/isc-projects/bind9.git synced 2025-04-18 09:44:09 +03:00

chg: test: Rewrite kasp system test to pytest (1)

Move test code that can be reused for the kasp pytest-based system test.

Merge branch 'matthijs-pytest-rewrite-kasp-system-test-1' into 'main'

See merge request isc-projects/bind9!10252
This commit is contained in:
Matthijs Mekking 2025-04-10 21:17:49 +00:00
commit b571c084db
4 changed files with 644 additions and 78 deletions

View File

@ -9,6 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import difflib
import shutil
from typing import Optional
@ -128,3 +129,24 @@ def is_response_to(response: dns.message.Message, query: dns.message.Message) ->
single_question(response)
single_question(query)
assert query.is_response(response), str(response)
def file_contents_equal(file1, file2):
def normalize_line(line):
# remove trailing&leading whitespace and replace multiple whitespaces
return " ".join(line.split())
def read_lines(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return [normalize_line(line) for line in file.readlines()]
lines1 = read_lines(file1)
lines2 = read_lines(file2)
differ = difflib.Differ()
diff = differ.compare(lines1, lines2)
for line in diff:
assert not line.startswith("+ ") and not line.startswith(
"- "
), f'file contents of "{file1}" and "{file2}" differ'

View File

@ -10,24 +10,34 @@
# information regarding copyright ownership.
from functools import total_ordering
import glob
import os
from pathlib import Path
import re
import subprocess
import time
from typing import Optional, Union
from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta, timezone
import dns
import dns.tsig
import isctest.log
import isctest.query
DEFAULT_TTL = 300
NEXT_KEY_EVENT_THRESHOLD = 100
def _query(server, qname, qtype):
def _query(server, qname, qtype, tsig=None):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
if tsig is not None:
tsigkey = tsig.split(":")
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring)
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
except dns.exception.Timeout:
@ -100,6 +110,165 @@ class KeyTimingMetadata:
return result
class KeyProperties:
"""
Represent the (expected) properties a key should have.
"""
def __init__(
self,
name: str,
properties: dict,
metadata: dict,
timing: Dict[str, KeyTimingMetadata],
):
self.name = name
self.key = None
self.properties = properties
self.metadata = metadata
self.timing = timing
def __repr__(self):
return self.name
def __str__(self) -> str:
return self.name
@staticmethod
def default(with_state=True) -> "KeyProperties":
properties = {
"expect": True,
"private": True,
"legacy": False,
"role": "csk",
"role_full": "key-signing",
"dnskey_ttl": 3600,
"flags": 257,
}
metadata = {
"Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number,
"Length": 256,
"Lifetime": 0,
"KSK": "yes",
"ZSK": "yes",
}
timing: Dict[str, KeyTimingMetadata] = {}
result = KeyProperties(
name="DEFAULT", properties=properties, metadata=metadata, timing=timing
)
result.name = "DEFAULT"
result.key = None
if with_state:
result.metadata["GoalState"] = "omnipresent"
result.metadata["DNSKEYState"] = "rumoured"
result.metadata["KRRSIGState"] = "rumoured"
result.metadata["ZRRSIGState"] = "rumoured"
result.metadata["DSState"] = "hidden"
return result
def Ipub(self, config):
ipub = timedelta(0)
if self.key.get_metadata("Predecessor", must_exist=False) != "undefined":
# Ipub = Dprp + TTLkey
ipub = (
config["dnskey-ttl"]
+ config["zone-propagation-delay"]
+ config["publish-safety"]
)
self.timing["Active"] = self.timing["Published"] + ipub
def IpubC(self, config):
if not self.key.is_ksk():
return
ttl1 = config["dnskey-ttl"] + config["publish-safety"]
ttl2 = timedelta(0)
if self.key.get_metadata("Predecessor", must_exist=False) == "undefined":
# If this is the first key, we also need to wait until the zone
# signatures are omnipresent. Use max-zone-ttl instead of
# dnskey-ttl, and no publish-safety (because we are looking at
# signatures here, not the public key).
ttl2 = config["max-zone-ttl"]
# IpubC = DprpC + TTLkey
ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2)
self.timing["PublishCDS"] = self.timing["Published"] + ipubc
if self.metadata["Lifetime"] != 0:
self.timing["DeleteCDS"] = (
self.timing["PublishCDS"] + self.metadata["Lifetime"]
)
def Iret(self, config):
if self.metadata["Lifetime"] == 0:
return
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
safety_interval = config["retire-safety"]
iretKSK = timedelta(0)
iretZSK = timedelta(0)
if self.key.is_ksk():
# Iret = DprpP + TTLds
iretKSK = (
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
)
if self.key.is_zsk():
# Iret = Dsgn + Dprp + TTLsig
iretZSK = (
sign_delay
+ config["zone-propagation-delay"]
+ config["max-zone-ttl"]
+ safety_interval
)
self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK)
def set_expected_keytimes(self, config, offset=None, pregenerated=False):
if self.key is None:
raise ValueError("KeyProperties must be attached to a Key")
if self.properties["legacy"]:
return
if offset is None:
offset = self.properties["offset"]
self.timing["Generated"] = self.key.get_timing("Created")
self.timing["Published"] = self.timing["Generated"]
if pregenerated:
self.timing["Published"] = self.key.get_timing("Publish")
self.timing["Published"] = self.timing["Published"] + offset
self.Ipub(config)
# Set Retired timing metadata if key has lifetime.
if self.metadata["Lifetime"] != 0:
self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"]
self.IpubC(config)
self.Iret(config)
# Key state change times must exist, but since we cannot reliably tell
# when named made the actual state change, we don't care what the
# value is. Set it to None will verify that the metadata exists, but
# without actual checking the value.
self.timing["DNSKEYChange"] = None
if self.key.is_ksk():
self.timing["DSChange"] = None
self.timing["KRRSIGChange"] = None
if self.key.is_zsk():
self.timing["ZRRSIGChange"] = None
@total_ordering
class Key:
"""
@ -117,6 +286,7 @@ class Key:
else:
self.keydir = Path(keydir)
self.path = str(self.keydir / name)
self.privatefile = f"{self.path}.private"
self.keyfile = f"{self.path}.key"
self.statefile = f"{self.path}.state"
self.tag = int(self.name[-5:])
@ -139,21 +309,43 @@ class Key:
)
return None
def get_metadata(self, metadata: str, must_exist=True) -> str:
def get_metadata(
self, metadata: str, file=None, comment=False, must_exist=True
) -> str:
if file is None:
file = self.statefile
value = "undefined"
regex = rf"{metadata}:\s+(.*)"
with open(self.statefile, "r", encoding="utf-8") as file:
for line in file:
regex = rf"{metadata}:\s+(\S+).*"
if comment:
# The expected metadata is prefixed with a ';'.
regex = rf";\s+{metadata}:\s+(\S+).*"
with open(file, "r", encoding="utf-8") as fp:
for line in fp:
match = re.match(regex, line)
if match is not None:
value = match.group(1)
break
if must_exist and value == "undefined":
raise ValueError(
'state metadata "{metadata}" for key "{self.name}" undefined'
f'metadata "{metadata}" for key "{self.name}" in file "{file}" undefined'
)
return value
def ttl(self) -> int:
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
if line.startswith(";"):
continue
return int(line.split()[1])
return 0
def dnskey(self):
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
if "DNSKEY" in line:
return line.strip()
return "undefined"
def is_ksk(self) -> bool:
return self.get_metadata("KSK") == "yes"
@ -187,7 +379,7 @@ class Key:
dsfromkey_command = [
os.environ.get("DSFROMKEY"),
"-T",
"3600",
str(self.ttl()),
"-a",
alg,
"-C",
@ -216,6 +408,149 @@ class Key:
return digest_fromfile == digest_fromwire
def is_metadata_consistent(self, key, metadata, checkval=True):
"""
If 'key' exists in 'metadata' then it must also exist in the state
meta data. Otherwise, it must not exist in the state meta data.
If 'checkval' is True, the meta data values must also match.
"""
if key in metadata:
if checkval:
value = self.get_metadata(key)
if value != f"{metadata[key]}":
isctest.log.debug(
f"{self.name} {key} METADATA MISMATCH: {value} - {metadata[key]}"
)
return value == f"{metadata[key]}"
return self.get_metadata(key) != "undefined"
value = self.get_metadata(key, must_exist=False)
if value != "undefined":
isctest.log.debug(f"{self.name} {key} METADATA UNEXPECTED: {value}")
return value == "undefined"
def is_timing_consistent(self, key, timing, file, comment=False):
"""
If 'key' exists in 'timing' then it must match the value in the state
timing data. Otherwise, it must also not exist in the state timing data.
"""
if key in timing:
value = self.get_metadata(key, file=file, comment=comment)
if value != str(timing[key]):
isctest.log.debug(
f"{self.name} {key} TIMING MISMATCH: {value} - {timing[key]}"
)
return value == str(timing[key])
value = self.get_metadata(key, file=file, comment=comment, must_exist=False)
if value != "undefined":
isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}")
return value == "undefined"
def match_properties(self, zone, properties):
"""
Check the key with given properties.
"""
if not properties.properties["expect"]:
return False
# Check file existence.
# Noop. If file is missing then the get_metadata calls will fail.
# Check the public key file.
role = properties.properties["role_full"]
comment = f"This is a {role} key, keyid {self.tag}, for {zone}."
if not isctest.util.file_contents_contain(self.keyfile, comment):
isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'")
return False
ttl = properties.properties["dnskey_ttl"]
flags = properties.properties["flags"]
alg = properties.metadata["Algorithm"]
dnskey = f"{zone}. {ttl} IN DNSKEY {flags} 3 {alg}"
if not isctest.util.file_contents_contain(self.keyfile, dnskey):
isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'")
return False
# Now check the private key file.
if properties.properties["private"]:
# Retrieve creation date.
created = self.get_metadata("Generated")
pval = self.get_metadata("Created", file=self.privatefile)
if pval != created:
isctest.log.debug(
f"{self.name} Created METADATA MISMATCH: {pval} - {created}"
)
return False
pval = self.get_metadata("Private-key-format", file=self.privatefile)
if pval != "v1.3":
isctest.log.debug(
f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3"
)
return False
pval = self.get_metadata("Algorithm", file=self.privatefile)
if pval != f"{alg}":
isctest.log.debug(
f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}"
)
return False
# Now check the key state file.
if properties.properties["legacy"]:
return True
comment = f"This is the state of key {self.tag}, for {zone}."
if not isctest.util.file_contents_contain(self.statefile, comment):
isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'")
return False
attributes = [
"Lifetime",
"Algorithm",
"Length",
"KSK",
"ZSK",
"GoalState",
"DNSKEYState",
"KRRSIGState",
"ZRRSIGState",
"DSState",
]
for key in attributes:
if not self.is_metadata_consistent(key, properties.metadata):
return False
# A match is found.
return True
def match_timingmetadata(self, timings, file=None, comment=False):
if file is None:
file = self.statefile
attributes = [
"Generated",
"Created",
"Published",
"Publish",
"PublishCDS",
"SyncPublish",
"Active",
"Activate",
"Retired",
"Inactive",
"Revoked",
"Removed",
"Delete",
]
for key in attributes:
if not self.is_timing_consistent(key, timings, file, comment=comment):
isctest.log.debug(f"{self.name} TIMING METADATA MISMATCH: {key}")
return False
return True
def __lt__(self, other: "Key"):
return self.name < other.name
@ -226,14 +561,14 @@ class Key:
return self.path
def check_zone_is_signed(server, zone):
def check_zone_is_signed(server, zone, tsig=None):
addr = server.ip
fqdn = f"{zone}."
# wait until zone is fully signed
signed = False
for _ in range(10):
response = _query(server, fqdn, dns.rdatatype.NSEC)
response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig)
if not isinstance(response, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} NSEC from {addr}")
elif response.rcode() != dns.rcode.NOERROR:
@ -277,13 +612,119 @@ def check_zone_is_signed(server, zone):
assert signed
def check_dnssec_verify(server, zone):
def verify_keys(zone, keys, expected):
"""
Checks keys for a configured zone. This verifies:
1. The expected number of keys exist in 'keys'.
2. The keys match the expected properties.
"""
def _verify_keys():
# check number of keys matches expected.
if len(keys) != len(expected):
return False
if len(keys) == 0:
return True
for expect in expected:
expect.key = None
for key in keys:
found = False
i = 0
while not found and i < len(expected):
if expected[i].key is None:
found = key.match_properties(zone, expected[i])
if found:
key.external = expected[i].properties["legacy"]
expected[i].key = key
i += 1
if not found:
return False
return True
isctest.run.retry_with_timeout(_verify_keys, timeout=10)
def check_keytimes(keys, expected):
"""
Check the key timing metadata for all keys in 'keys'.
"""
assert len(keys) == len(expected)
if len(keys) == 0:
return
for key in keys:
for expect in expected:
if expect.properties["legacy"]:
continue
if not key is expect.key:
continue
synonyms = {}
if "Generated" in expect.timing:
synonyms["Created"] = expect.timing["Generated"]
if "Published" in expect.timing:
synonyms["Publish"] = expect.timing["Published"]
if "PublishCDS" in expect.timing:
synonyms["SyncPublish"] = expect.timing["PublishCDS"]
if "Active" in expect.timing:
synonyms["Activate"] = expect.timing["Active"]
if "Retired" in expect.timing:
synonyms["Inactive"] = expect.timing["Retired"]
if "DeleteCDS" in expect.timing:
synonyms["SyncDelete"] = expect.timing["DeleteCDS"]
if "Revoked" in expect.timing:
synonyms["Revoked"] = expect.timing["Revoked"]
if "Removed" in expect.timing:
synonyms["Delete"] = expect.timing["Removed"]
assert key.match_timingmetadata(synonyms, file=key.keyfile, comment=True)
if expect.properties["private"]:
assert key.match_timingmetadata(synonyms, file=key.privatefile)
if not expect.properties["legacy"]:
assert key.match_timingmetadata(expect.timing)
state_changes = [
"DNSKEYChange",
"KRRSIGChange",
"ZRRSIGChange",
"DSChange",
]
for change in state_changes:
assert key.is_metadata_consistent(
change, expect.timing, checkval=False
)
def check_keyrelationships(keys, expected):
"""
Check the key relationships (Successor and Predecessor metadata).
"""
for key in keys:
for expect in expected:
if expect.properties["legacy"]:
continue
if not key is expect.key:
continue
relationship_status = ["Predecessor", "Successor"]
for status in relationship_status:
assert key.is_metadata_consistent(status, expect.metadata)
def check_dnssec_verify(server, zone, tsig=None):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."
verified = False
for _ in range(10):
transfer = _query(server, fqdn, dns.rdatatype.AXFR)
transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig)
if not isinstance(transfer, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
elif transfer.rcode() != dns.rcode.NOERROR:
@ -415,9 +856,9 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False):
delete_md = f"Sync{delete_md}"
for key in keys:
publish = key.get_timing(publish_md)
publish = key.get_timing(publish_md, must_exist=False)
delete = key.get_timing(delete_md, must_exist=False)
published = now >= publish
published = publish is not None and now >= publish
removed = delete is not None and delete <= now
if not published or removed:
@ -502,8 +943,8 @@ def check_cds(rrset, keys):
assert numcds == len(cdss)
def _query_rrset(server, fqdn, qtype):
response = _query(server, fqdn, qtype)
def _query_rrset(server, fqdn, qtype, tsig=None):
response = _query(server, fqdn, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
rrs = []
@ -523,46 +964,46 @@ def _query_rrset(server, fqdn, qtype):
return rrs, rrsigs
def check_apex(server, zone, ksks, zsks):
def check_apex(server, zone, ksks, zsks, tsig=None):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY)
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
assert len(dnskeys) > 0
check_dnskeys(dnskeys, ksks, zsks)
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA)
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY)
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS)
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
def check_subdomain(server, zone, ksks, zsks):
def check_subdomain(server, zone, ksks, zsks, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
qtype = dns.rdatatype.A
response = _query(server, qname, qtype)
response = _query(server, qname, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"
@ -577,3 +1018,86 @@ def check_subdomain(server, zone, ksks, zsks):
assert len(rrsigs) > 0
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
def next_key_event_equals(server, zone, next_event):
if next_event is None:
# No next key event check.
return True
val = int(next_event.total_seconds())
if val == 3600:
waitfor = rf".*zone {zone}.*: next key event in (.*) seconds"
else:
# Don't want default loadkeys interval.
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
with server.watch_log_from_start() as watcher:
watcher.wait_for_line(re.compile(waitfor))
# WMM: The with code below is extracting the line the watcher was
# waiting for. If WatchLog.wait_for_line()` returned the matched string,
# we can use it directly on `re.match`.
next_found = False
minval = val - NEXT_KEY_EVENT_THRESHOLD
maxval = val + NEXT_KEY_EVENT_THRESHOLD
with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp:
for line in fp:
match = re.match(waitfor, line)
if match is not None:
nextval = int(match.group(1))
if minval <= nextval <= maxval:
next_found = True
break
isctest.log.debug(
f"check next key event: expected {val} in: {line.strip()}"
)
return next_found
def keydir_to_keylist(
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
) -> List[Key]:
"""
Retrieve all keys from the key files in a directory. If 'zone' is None,
retrieve all keys in the directory, otherwise only those matching the
zone name. If 'keydir' is None, search the current directory.
"""
if zone is None:
zone = ""
all_keys = []
if keydir is None:
regex = rf"(K{zone}\.\+.*\+.*)\.key"
for filename in glob.glob(f"K{zone}.+*+*.key"):
match = re.match(regex, filename)
if match is not None:
all_keys.append(Key(match.group(1)))
else:
regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key"
for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"):
match = re.match(regex, filename)
if match is not None:
all_keys.append(Key(match.group(1), keydir))
states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"]
def used(kk):
if not in_use:
return True
for state in states:
val = kk.get_metadata(state, must_exist=False)
if val not in ["undefined", "hidden"]:
isctest.log.debug(f"key {kk} in use")
return True
return False
return [k for k in all_keys if used(k)]
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
return [Key(name, keydir) for name in keystr.split()]

View File

@ -0,0 +1,42 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns.zone
def zone_contains(
zone: dns.zone.Zone, rrset: dns.rrset.RRset, compare_ttl=False
) -> bool:
"""Check if a zone contains RRset"""
def compare_rrs(rr1, rrset):
rr2 = next((other_rr for other_rr in rrset if rr1 == other_rr), None)
if rr2 is None:
return False
if compare_ttl:
return rr1.ttl == rr2.ttl
return True
for _, node in zone.nodes.items():
for rdataset in node:
for rr in rdataset:
if compare_rrs(rr, rrset):
return True
return False
def file_contents_contain(file, substr):
with open(file, "r", encoding="utf-8") as fp:
for line in fp:
if f"{substr}" in line:
return True
return False

View File

@ -10,19 +10,14 @@
# information regarding copyright ownership.
from datetime import timedelta
import difflib
import os
import shutil
import time
from typing import List, Optional
import pytest
import isctest
from isctest.kasp import (
Key,
KeyTimingMetadata,
)
from isctest.kasp import KeyTimingMetadata
pytestmark = pytest.mark.extra_artifacts(
[
@ -89,31 +84,6 @@ def between(value, start, end):
return start < value < end
def check_file_contents_equal(file1, file2):
def normalize_line(line):
# remove trailing&leading whitespace and replace multiple whitespaces
return " ".join(line.split())
def read_lines(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return [normalize_line(line) for line in file.readlines()]
lines1 = read_lines(file1)
lines2 = read_lines(file2)
differ = difflib.Differ()
diff = differ.compare(lines1, lines2)
for line in diff:
assert not line.startswith("+ ") and not line.startswith(
"- "
), f'file contents of "{file1}" and "{file2}" differ'
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
return [Key(name, keydir) for name in keystr.split()]
def ksr(zone, policy, action, options="", raise_on_exception=True):
ksr_command = [
os.environ.get("KSR"),
@ -515,14 +485,14 @@ def test_ksr_common(servers):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y")
zsks = keystr_to_keylist(out)
zsks = isctest.kasp.keystr_to_keylist(out)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
@ -532,7 +502,7 @@ def test_ksr_common(servers):
# in the given key directory
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
@ -575,18 +545,22 @@ def test_ksr_common(servers):
# check that 'dnssec-ksr keygen' selects pregenerated keys for
# the same time bundle
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
selected_zsks = keystr_to_keylist(out, zskdir)
selected_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(selected_zsks) == 2
for index, key in enumerate(selected_zsks):
assert zsks[index] == key
check_file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup")
check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup")
isctest.check.file_contents_equal(
f"{key.path}.private", f"{key.path}.private.backup"
)
isctest.check.file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
isctest.check.file_contents_equal(
f"{key.path}.state", f"{key.path}.state.backup"
)
# check that 'dnssec-ksr keygen' generates only necessary keys for
# overlapping time bundle
out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
overlapping_zsks = keystr_to_keylist(out, zskdir)
overlapping_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(overlapping_zsks) == 4
verbose = err.split()
@ -606,15 +580,19 @@ def test_ksr_common(servers):
for index, key in enumerate(overlapping_zsks):
if index < 2:
assert zsks[index] == key
check_file_contents_equal(
isctest.check.file_contents_equal(
f"{key.path}.private", f"{key.path}.private.backup"
)
check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup")
isctest.check.file_contents_equal(
f"{key.path}.key", f"{key.path}.key.backup"
)
isctest.check.file_contents_equal(
f"{key.path}.state", f"{key.path}.state.backup"
)
# run 'dnssec-ksr keygen' again with verbosity 0
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
overlapping_zsks2 = keystr_to_keylist(out, zskdir)
overlapping_zsks2 = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(overlapping_zsks2) == 4
check_keys(overlapping_zsks2, lifetime)
for index, key in enumerate(overlapping_zsks2):
@ -709,7 +687,7 @@ def test_ksr_lastbundle(servers):
kskdir = "ns1/offline"
offset = -timedelta(days=365)
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None, offset=offset)
@ -717,7 +695,7 @@ def test_ksr_lastbundle(servers):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
@ -788,7 +766,7 @@ def test_ksr_inthemiddle(servers):
kskdir = "ns1/offline"
offset = -timedelta(days=365)
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None, offset=offset)
@ -796,7 +774,7 @@ def test_ksr_inthemiddle(servers):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(zsks) == 4
lifetime = timedelta(days=31 * 6)
@ -868,13 +846,13 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
then = now + offset
until = now + end
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# key generation
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
# create request
@ -941,7 +919,7 @@ def test_ksr_unlimited(servers):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None)
@ -949,7 +927,7 @@ def test_ksr_unlimited(servers):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(zsks) == 1
lifetime = None
@ -1058,7 +1036,7 @@ def test_ksr_twotone(servers):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 2
ksks_defalg = []
@ -1082,7 +1060,7 @@ def test_ksr_twotone(servers):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
# First algorithm keys have a lifetime of 3 months, so there should
# be 4 created keys. Second algorithm keys have a lifetime of 5
# months, so there should be 3 created keys. While only two time
@ -1176,7 +1154,7 @@ def test_ksr_kskroll(servers):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = keystr_to_keylist(out, kskdir)
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
assert len(ksks) == 2
lifetime = timedelta(days=31 * 6)
@ -1185,7 +1163,7 @@ def test_ksr_kskroll(servers):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
assert len(zsks) == 1
check_keys(zsks, None)