1
0
mirror of https://github.com/quay/quay.git synced 2026-01-27 18:42:52 +03:00
Files
quay/data/secscan_model/__init__.py
Oleg Bulatov 84fa795ae7 chore: fix isort config and remove isort: skip_file (#2196)
* chore: pass config to isort as it doesn't always detect it

* chore: mark package "test" as local, not stdlib

* chore: remove "isort: skip_file"

* chore: fix app in test_load_security_information

* chore: fix app in test_notification

* chore: fix app in test_index_report
2023-09-21 11:46:03 -04:00

73 lines
2.6 KiB
Python

import logging
from data.secscan_model.datatypes import (
ScanLookupStatus,
SecurityInformationLookupResult,
)
from data.secscan_model.interface import (
InvalidConfigurationException,
SecurityScannerInterface,
)
from data.secscan_model.secscan_v4_model import NoopV4SecurityScanner
from data.secscan_model.secscan_v4_model import ScanToken as V4ScanToken
from data.secscan_model.secscan_v4_model import V4SecurityScanner
logger = logging.getLogger(__name__)
class SecurityScannerModelProxy(SecurityScannerInterface):
def configure(self, app, instance_keys, storage):
try:
self._model = V4SecurityScanner(app, instance_keys, storage)
except InvalidConfigurationException:
self._model = NoopV4SecurityScanner()
logger.info("===============================")
logger.info("Using split secscan model: `%s`", [self._model])
logger.info("===============================")
return self
def perform_indexing(self, next_token=None, batch_size=None):
if next_token is not None:
assert isinstance(next_token, V4ScanToken)
assert isinstance(next_token.min_id, int)
return self._model.perform_indexing(next_token, batch_size)
def perform_indexing_recent_manifests(self, batch_size=None):
self._model.perform_indexing_recent_manifests(batch_size)
def load_security_information(
self, manifest_or_legacy_image, include_vulnerabilities, model_cache=None
):
manifest = manifest_or_legacy_image.as_manifest()
info = self._model.load_security_information(manifest, include_vulnerabilities, model_cache)
if info.status != ScanLookupStatus.NOT_YET_INDEXED:
return info
return SecurityInformationLookupResult.with_status(ScanLookupStatus.NOT_YET_INDEXED)
def register_model_cleanup_callbacks(self, data_model_config):
return self._model.register_model_cleanup_callbacks(data_model_config)
@property
def legacy_api_handler(self):
raise NotImplementedError
def lookup_notification_page(self, notification_id, page_index=None):
return self._model.lookup_notification_page(notification_id, page_index)
def process_notification_page(self, page_result):
return self._model.process_notification_page(page_result)
def mark_notification_handled(self, notification_id):
return self._model.mark_notification_handled(notification_id)
def garbage_collect_manifest_report(self, manifest_digest):
return self._model.garbage_collect_manifest_report(manifest_digest)
secscan_model = SecurityScannerModelProxy()