mirror of
https://gitlab.isc.org/isc-projects/bind9.git
synced 2025-04-18 09:44:09 +03:00
Introduce class KeyProperties
In isctest.kasp, introduce a new class 'KeyProperties' that can be used to check if a Key matches expected properties. Properties are for the time being divided in three parts: 'properties' that contain some attributes of the expected properties (such as are we dealing with a legacy key, is the private key available, and other things that do not fit the metadata exactly), 'metadata' that contains expected metadata (such as 'Algorithm', 'Lifetime', 'Length'), and 'timing', which is metadata of the class KeyTimingMetadata. The 'default()' method fills in the expected properties for the default DNSSEC policy. The 'set_expected_times()' sets the expected timing metadata, derived from when the key was created. This method can take an offset to push the expected timing metadata a duration in the future or back into the past. If 'pregenerated=True', derive the expected timing metadata from the 'Publish' metadata derived from the keyfile, rather than from the 'Created' metadata. The calculations in the 'Ipub', 'IpubC' and 'Iret' methods are derived from RFC 7583 DNSSEC Key Rollover Timing Considerations.
This commit is contained in:
parent
ee8e9f1ded
commit
0b9fbca18e
@ -15,7 +15,7 @@ from pathlib import Path
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from typing import List, Optional, Union
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
@ -100,6 +100,165 @@ class KeyTimingMetadata:
|
||||
return result
|
||||
|
||||
|
||||
class KeyProperties:
|
||||
"""
|
||||
Represent the (expected) properties a key should have.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
properties: dict,
|
||||
metadata: dict,
|
||||
timing: Dict[str, KeyTimingMetadata],
|
||||
):
|
||||
self.name = name
|
||||
self.key = None
|
||||
self.properties = properties
|
||||
self.metadata = metadata
|
||||
self.timing = timing
|
||||
|
||||
def __repr__(self):
|
||||
return self.name
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
@staticmethod
|
||||
def default(with_state=True) -> "KeyProperties":
|
||||
properties = {
|
||||
"expect": True,
|
||||
"private": True,
|
||||
"legacy": False,
|
||||
"role": "csk",
|
||||
"role_full": "key-signing",
|
||||
"dnskey_ttl": 3600,
|
||||
"flags": 257,
|
||||
}
|
||||
metadata = {
|
||||
"Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number,
|
||||
"Length": 256,
|
||||
"Lifetime": 0,
|
||||
"KSK": "yes",
|
||||
"ZSK": "yes",
|
||||
}
|
||||
timing: Dict[str, KeyTimingMetadata] = {}
|
||||
|
||||
result = KeyProperties(
|
||||
name="DEFAULT", properties=properties, metadata=metadata, timing=timing
|
||||
)
|
||||
result.name = "DEFAULT"
|
||||
result.key = None
|
||||
if with_state:
|
||||
result.metadata["GoalState"] = "omnipresent"
|
||||
result.metadata["DNSKEYState"] = "rumoured"
|
||||
result.metadata["KRRSIGState"] = "rumoured"
|
||||
result.metadata["ZRRSIGState"] = "rumoured"
|
||||
result.metadata["DSState"] = "hidden"
|
||||
|
||||
return result
|
||||
|
||||
def Ipub(self, config):
|
||||
ipub = timedelta(0)
|
||||
|
||||
if self.key.get_metadata("Predecessor", must_exist=False) != "undefined":
|
||||
# Ipub = Dprp + TTLkey
|
||||
ipub = (
|
||||
config["dnskey-ttl"]
|
||||
+ config["zone-propagation-delay"]
|
||||
+ config["publish-safety"]
|
||||
)
|
||||
|
||||
self.timing["Active"] = self.timing["Published"] + ipub
|
||||
|
||||
def IpubC(self, config):
|
||||
if not self.key.is_ksk():
|
||||
return
|
||||
|
||||
ttl1 = config["dnskey-ttl"] + config["publish-safety"]
|
||||
ttl2 = timedelta(0)
|
||||
|
||||
if self.key.get_metadata("Predecessor", must_exist=False) == "undefined":
|
||||
# If this is the first key, we also need to wait until the zone
|
||||
# signatures are omnipresent. Use max-zone-ttl instead of
|
||||
# dnskey-ttl, and no publish-safety (because we are looking at
|
||||
# signatures here, not the public key).
|
||||
ttl2 = config["max-zone-ttl"]
|
||||
|
||||
# IpubC = DprpC + TTLkey
|
||||
ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2)
|
||||
|
||||
self.timing["PublishCDS"] = self.timing["Published"] + ipubc
|
||||
|
||||
if self.metadata["Lifetime"] != 0:
|
||||
self.timing["DeleteCDS"] = (
|
||||
self.timing["PublishCDS"] + self.metadata["Lifetime"]
|
||||
)
|
||||
|
||||
def Iret(self, config):
|
||||
if self.metadata["Lifetime"] == 0:
|
||||
return
|
||||
|
||||
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
|
||||
safety_interval = config["retire-safety"]
|
||||
|
||||
iretKSK = timedelta(0)
|
||||
iretZSK = timedelta(0)
|
||||
if self.key.is_ksk():
|
||||
# Iret = DprpP + TTLds
|
||||
iretKSK = (
|
||||
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
|
||||
)
|
||||
if self.key.is_zsk():
|
||||
# Iret = Dsgn + Dprp + TTLsig
|
||||
iretZSK = (
|
||||
sign_delay
|
||||
+ config["zone-propagation-delay"]
|
||||
+ config["max-zone-ttl"]
|
||||
+ safety_interval
|
||||
)
|
||||
|
||||
self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK)
|
||||
|
||||
def set_expected_keytimes(self, config, offset=None, pregenerated=False):
|
||||
if self.key is None:
|
||||
raise ValueError("KeyProperties must be attached to a Key")
|
||||
|
||||
if self.properties["legacy"]:
|
||||
return
|
||||
|
||||
if offset is None:
|
||||
offset = self.properties["offset"]
|
||||
|
||||
self.timing["Generated"] = self.key.get_timing("Created")
|
||||
|
||||
self.timing["Published"] = self.timing["Generated"]
|
||||
if pregenerated:
|
||||
self.timing["Published"] = self.key.get_timing("Publish")
|
||||
self.timing["Published"] = self.timing["Published"] + offset
|
||||
self.Ipub(config)
|
||||
|
||||
# Set Retired timing metadata if key has lifetime.
|
||||
if self.metadata["Lifetime"] != 0:
|
||||
self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"]
|
||||
|
||||
self.IpubC(config)
|
||||
self.Iret(config)
|
||||
|
||||
# Key state change times must exist, but since we cannot reliably tell
|
||||
# when named made the actual state change, we don't care what the
|
||||
# value is. Set it to None will verify that the metadata exists, but
|
||||
# without actual checking the value.
|
||||
self.timing["DNSKEYChange"] = None
|
||||
|
||||
if self.key.is_ksk():
|
||||
self.timing["DSChange"] = None
|
||||
self.timing["KRRSIGChange"] = None
|
||||
|
||||
if self.key.is_zsk():
|
||||
self.timing["ZRRSIGChange"] = None
|
||||
|
||||
|
||||
@total_ordering
|
||||
class Key:
|
||||
"""
|
||||
@ -579,5 +738,47 @@ def check_subdomain(server, zone, ksks, zsks):
|
||||
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
|
||||
|
||||
|
||||
def keydir_to_keylist(
|
||||
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
|
||||
) -> List[Key]:
|
||||
"""
|
||||
Retrieve all keys from the key files in a directory. If 'zone' is None,
|
||||
retrieve all keys in the directory, otherwise only those matching the
|
||||
zone name. If 'keydir' is None, search the current directory.
|
||||
"""
|
||||
if zone is None:
|
||||
zone = ""
|
||||
|
||||
all_keys = []
|
||||
if keydir is None:
|
||||
regex = rf"(K{zone}\.\+.*\+.*)\.key"
|
||||
for filename in glob.glob(f"K{zone}.+*+*.key"):
|
||||
match = re.match(regex, filename)
|
||||
if match is not None:
|
||||
all_keys.append(Key(match.group(1)))
|
||||
else:
|
||||
regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key"
|
||||
for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"):
|
||||
match = re.match(regex, filename)
|
||||
if match is not None:
|
||||
all_keys.append(Key(match.group(1), keydir))
|
||||
|
||||
states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"]
|
||||
|
||||
def used(kk):
|
||||
if not in_use:
|
||||
return True
|
||||
|
||||
for state in states:
|
||||
val = kk.get_metadata(state, must_exist=False)
|
||||
if val not in ["undefined", "hidden"]:
|
||||
isctest.log.debug(f"key {kk} in use")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
return [k for k in all_keys if used(k)]
|
||||
|
||||
|
||||
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
|
||||
return [Key(name, keydir) for name in keystr.split()]
|
||||
|
Loading…
x
Reference in New Issue
Block a user