From e245c0c734f034c281c9c0d5d58fe94df711db2c Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 28 Apr 2023 10:46:18 +0800 Subject: [PATCH 1/9] cert_audit: Support parsing file with multiple PEMs Previously, if a file had multiple PEM objects, only the first one would be parsed. This commit add the support so that we could parse all the PEM objects in the file. Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 1ccfc2188f..d6e73fffb8 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -90,7 +90,7 @@ class AuditData: class X509Parser: """A parser class to parse crt/crl/csr file or data in PEM/DER format.""" - PEM_REGEX = br'-{5}BEGIN (?P.*?)-{5}\n(?P.*?)-{5}END (?P=type)-{5}\n' + PEM_REGEX = br'-{5}BEGIN (?P.*?)-{5}(?P.*?)-{5}END (?P=type)-{5}' PEM_TAG_REGEX = br'-{5}BEGIN (?P.*?)-{5}\n' PEM_TAGS = { DataType.CRT: 'CERTIFICATE', @@ -277,12 +277,15 @@ class TestDataAuditor(Auditor): """ with open(filename, 'rb') as f: data = f.read() - result = self.parse_bytes(data) - if result is not None: - result.location = filename - return [result] - else: - return [] + + results = [] + for idx, m in enumerate(re.finditer(X509Parser.PEM_REGEX, data, flags=re.S), 1): + result = self.parse_bytes(data[m.start():m.end()]) + if result is not None: + result.location = "{}#{}".format(filename, idx) + results.append(result) + + return results def parse_suite_data(data_f): From fe13bd3d0e1f4deac7462c62970f6591f0988174 Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 28 Apr 2023 10:58:38 +0800 Subject: [PATCH 2/9] cert_audit: Merge audit_data for identical X.509 objects Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 41 ++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index d6e73fffb8..5729ee9886 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -65,8 +65,13 @@ class AuditData: #pylint: disable=too-few-public-methods def __init__(self, data_type: DataType, x509_obj): self.data_type = data_type - self.location = "" + # the locations that the x509 object could be found + self.locations = [] # type: typing.List[str] self.fill_validity_duration(x509_obj) + self._obj = x509_obj + + def __eq__(self, __value) -> bool: + return self._obj == __value._obj def fill_validity_duration(self, x509_obj): """Read validity period from an X.509 object.""" @@ -282,7 +287,7 @@ class TestDataAuditor(Auditor): for idx, m in enumerate(re.finditer(X509Parser.PEM_REGEX, data, flags=re.S), 1): result = self.parse_bytes(data[m.start():m.end()]) if result is not None: - result.location = "{}#{}".format(filename, idx) + result.locations.append("{}#{}".format(filename, idx)) results.append(result) return results @@ -342,20 +347,38 @@ class SuiteDataAuditor(Auditor): audit_data = self.parse_bytes(bytes.fromhex(match.group('data'))) if audit_data is None: continue - audit_data.location = "{}:{}:#{}".format(filename, - data_f.line_no, - idx + 1) + audit_data.locations.append("{}:{}:#{}".format(filename, + data_f.line_no, + idx + 1)) audit_data_list.append(audit_data) return audit_data_list +def merge_auditdata(original: typing.List[AuditData]) \ + -> typing.List[AuditData]: + """ + Multiple AuditData might be extracted from different locations for + an identical X.509 object. Merge them into one entry in the list. + """ + results = [] + for x in original: + if x not in results: + results.append(x) + else: + idx = results.index(x) + results[idx].locations.extend(x.locations) + return results + + def list_all(audit_data: AuditData): - print("{}\t{}\t{}\t{}".format( + print("{:20}\t{:20}\t{:3}\t{}".format( audit_data.not_valid_before.isoformat(timespec='seconds'), audit_data.not_valid_after.isoformat(timespec='seconds'), audit_data.data_type.name, - audit_data.location)) + audit_data.locations[0])) + for loc in audit_data.locations[1:]: + print("{:20}\t{:20}\t{:3}\t{}".format('', '', '', loc)) def configure_logger(logger: logging.Logger) -> None: @@ -455,6 +478,10 @@ def main(): sd_auditor.walk_all(suite_data_files) audit_results = td_auditor.audit_data + sd_auditor.audit_data + audit_results = merge_auditdata(audit_results) + + logger.info("Total: {} objects found!".format(len(audit_results))) + # we filter out the files whose validity duration covers the provided # duration. filter_func = lambda d: (start_date < d.not_valid_before) or \ From 0b4832bbf5ece7c6d099c829887dab0dd6cd19a5 Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 28 Apr 2023 11:14:28 +0800 Subject: [PATCH 3/9] cert_audit: Sort the outputs by not_valid_after date Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 5729ee9886..81c69d3701 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -487,11 +487,13 @@ def main(): filter_func = lambda d: (start_date < d.not_valid_before) or \ (d.not_valid_after < end_date) + sortby_end = lambda d: d.not_valid_after + if args.all: filter_func = None # filter and output the results - for d in filter(filter_func, audit_results): + for d in sorted(filter(filter_func, audit_results), key=sortby_end): list_all(d) logger.debug("Done!") From fd72d9f556b32c48d6e72afedd626fece0ba8a76 Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 28 Apr 2023 11:17:24 +0800 Subject: [PATCH 4/9] cert_audit: Fix bug in check_cryptography_version check_cryptography_version didn't provide helpful message with Python < 3.6, because re.Match object is not subscriptable. Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 81c69d3701..35ea93c0d9 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -45,7 +45,7 @@ from mbedtls_dev import build_tree def check_cryptography_version(): match = re.match(r'^[0-9]+', cryptography.__version__) - if match is None or int(match[0]) < 35: + if match is None or int(match.group(0)) < 35: raise Exception("audit-validity-dates requires cryptography >= 35.0.0" + "({} is too old)".format(cryptography.__version__)) From 13f2ef4949c3f7decb158aa4478d880242c5e05d Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 5 May 2023 16:53:37 +0800 Subject: [PATCH 5/9] cert_audit: Calculate identifier for X.509 objects The identifier is calculated SHA1 hex string from the DER encoding of each X.509 objects. It's useful for finding out the identical X.509 objects. Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 35ea93c0d9..73509e1543 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -31,6 +31,7 @@ import argparse import datetime import glob import logging +import hashlib from enum import Enum # The script requires cryptography >= 35.0.0 which is only available @@ -69,10 +70,20 @@ class AuditData: self.locations = [] # type: typing.List[str] self.fill_validity_duration(x509_obj) self._obj = x509_obj + encoding = cryptography.hazmat.primitives.serialization.Encoding.DER + self._identifier = hashlib.sha1(self._obj.public_bytes(encoding)).hexdigest() def __eq__(self, __value) -> bool: return self._obj == __value._obj + @property + def identifier(self): + """ + Identifier of the underlying X.509 object, which is consistent across + different runs. + """ + return self._identifier + def fill_validity_duration(self, x509_obj): """Read validity period from an X.509 object.""" # Certificate expires after "not_valid_after" From 31e3d12be957b75eae94730f1f1967873e52b016 Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 5 May 2023 17:01:49 +0800 Subject: [PATCH 6/9] cert_audit: Output format improvement We should print all the information for each objects found every line. This makes it easy to analyze the output. Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 73509e1543..6c8a4e81fb 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -383,13 +383,13 @@ def merge_auditdata(original: typing.List[AuditData]) \ def list_all(audit_data: AuditData): - print("{:20}\t{:20}\t{:3}\t{}".format( - audit_data.not_valid_before.isoformat(timespec='seconds'), - audit_data.not_valid_after.isoformat(timespec='seconds'), - audit_data.data_type.name, - audit_data.locations[0])) - for loc in audit_data.locations[1:]: - print("{:20}\t{:20}\t{:3}\t{}".format('', '', '', loc)) + for loc in audit_data.locations: + print("{}\t{:20}\t{:20}\t{:3}\t{}".format( + audit_data.identifier, + audit_data.not_valid_before.isoformat(timespec='seconds'), + audit_data.not_valid_after.isoformat(timespec='seconds'), + audit_data.data_type.name, + loc)) def configure_logger(logger: logging.Logger) -> None: From e09d27e7239736bb7335b770f6cc4a732e9d4dde Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Fri, 5 May 2023 17:29:12 +0800 Subject: [PATCH 7/9] cert_audit: Use dictionary to store parsed AuditData Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 32 +++++++++++++++++---------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 6c8a4e81fb..127c0a0fe0 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -209,13 +209,11 @@ class Auditor: X.509 data(DER/PEM format) to an X.509 object. - walk_all: Defaultly, it iterates over all the files in the provided file name list, calls `parse_file` for each file and stores the results - by extending Auditor.audit_data. + by extending the `results` passed to the function. """ def __init__(self, logger): self.logger = logger self.default_files = self.collect_default_files() - # A list to store the parsed audit_data. - self.audit_data = [] # type: typing.List[AuditData] self.parser = X509Parser({ DataType.CRT: { DataFormat.PEM: x509.load_pem_x509_certificate, @@ -257,15 +255,27 @@ class Auditor: return audit_data return None - def walk_all(self, file_list: typing.Optional[typing.List[str]] = None): + def walk_all(self, + results: typing.Dict[str, AuditData], + file_list: typing.Optional[typing.List[str]] = None) \ + -> None: """ - Iterate over all the files in the list and get audit data. + Iterate over all the files in the list and get audit data. The + results will be written to `results` passed to this function. + + :param results: The dictionary used to store the parsed + AuditData. The keys of this dictionary should + be the identifier of the AuditData. """ if file_list is None: file_list = self.default_files for filename in file_list: data_list = self.parse_file(filename) - self.audit_data.extend(data_list) + for d in data_list: + if d.identifier in results: + results[d.identifier].locations.extend(d.locations) + else: + results[d.identifier] = d @staticmethod def find_test_dir(): @@ -485,11 +495,9 @@ def main(): end_date = start_date # go through all the files - td_auditor.walk_all(data_files) - sd_auditor.walk_all(suite_data_files) - audit_results = td_auditor.audit_data + sd_auditor.audit_data - - audit_results = merge_auditdata(audit_results) + audit_results = {} + td_auditor.walk_all(audit_results, data_files) + sd_auditor.walk_all(audit_results, suite_data_files) logger.info("Total: {} objects found!".format(len(audit_results))) @@ -504,7 +512,7 @@ def main(): filter_func = None # filter and output the results - for d in sorted(filter(filter_func, audit_results), key=sortby_end): + for d in sorted(filter(filter_func, audit_results.values()), key=sortby_end): list_all(d) logger.debug("Done!") From ee870a6e831687c7ee660679214d37e404b4da4e Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Sat, 6 May 2023 10:06:19 +0800 Subject: [PATCH 8/9] cert_audit: Remove merge_auditdata We maintain a dict with unique AudiData objects (AuditData with unique underlying X.509 objects). We don't need merge_auditdata anymore. Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index 127c0a0fe0..ecde428450 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -73,9 +73,6 @@ class AuditData: encoding = cryptography.hazmat.primitives.serialization.Encoding.DER self._identifier = hashlib.sha1(self._obj.public_bytes(encoding)).hexdigest() - def __eq__(self, __value) -> bool: - return self._obj == __value._obj - @property def identifier(self): """ @@ -263,7 +260,7 @@ class Auditor: Iterate over all the files in the list and get audit data. The results will be written to `results` passed to this function. - :param results: The dictionary used to store the parsed + :param results: The dictionary used to store the parsed AuditData. The keys of this dictionary should be the identifier of the AuditData. """ @@ -376,22 +373,6 @@ class SuiteDataAuditor(Auditor): return audit_data_list -def merge_auditdata(original: typing.List[AuditData]) \ - -> typing.List[AuditData]: - """ - Multiple AuditData might be extracted from different locations for - an identical X.509 object. Merge them into one entry in the list. - """ - results = [] - for x in original: - if x not in results: - results.append(x) - else: - idx = results.index(x) - results[idx].locations.extend(x.locations) - return results - - def list_all(audit_data: AuditData): for loc in audit_data.locations: print("{}\t{:20}\t{:20}\t{:3}\t{}".format( From a57f67747453a23c97f13cedcf6b42ce71b45318 Mon Sep 17 00:00:00 2001 From: Pengyu Lv Date: Mon, 8 May 2023 18:07:28 +0800 Subject: [PATCH 9/9] cert_audit: Fix DER files missed from parsing Signed-off-by: Pengyu Lv --- tests/scripts/audit-validity-dates.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/scripts/audit-validity-dates.py b/tests/scripts/audit-validity-dates.py index ecde428450..5506e40e7f 100755 --- a/tests/scripts/audit-validity-dates.py +++ b/tests/scripts/audit-validity-dates.py @@ -302,12 +302,22 @@ class TestDataAuditor(Auditor): data = f.read() results = [] + # Try to parse all PEM blocks. + is_pem = False for idx, m in enumerate(re.finditer(X509Parser.PEM_REGEX, data, flags=re.S), 1): + is_pem = True result = self.parse_bytes(data[m.start():m.end()]) if result is not None: result.locations.append("{}#{}".format(filename, idx)) results.append(result) + # Might be DER format. + if not is_pem: + result = self.parse_bytes(data) + if result is not None: + result.locations.append("{}".format(filename)) + results.append(result) + return results