mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* chore: pass config to isort as it doesn't always detect it * chore: mark package "test" as local, not stdlib * chore: remove "isort: skip_file" * chore: fix app in test_load_security_information * chore: fix app in test_notification * chore: fix app in test_index_report
73 lines
2.6 KiB
Python
73 lines
2.6 KiB
Python
import logging
|
|
|
|
from data.secscan_model.datatypes import (
|
|
ScanLookupStatus,
|
|
SecurityInformationLookupResult,
|
|
)
|
|
from data.secscan_model.interface import (
|
|
InvalidConfigurationException,
|
|
SecurityScannerInterface,
|
|
)
|
|
from data.secscan_model.secscan_v4_model import NoopV4SecurityScanner
|
|
from data.secscan_model.secscan_v4_model import ScanToken as V4ScanToken
|
|
from data.secscan_model.secscan_v4_model import V4SecurityScanner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SecurityScannerModelProxy(SecurityScannerInterface):
|
|
def configure(self, app, instance_keys, storage):
|
|
try:
|
|
self._model = V4SecurityScanner(app, instance_keys, storage)
|
|
except InvalidConfigurationException:
|
|
self._model = NoopV4SecurityScanner()
|
|
|
|
logger.info("===============================")
|
|
logger.info("Using split secscan model: `%s`", [self._model])
|
|
logger.info("===============================")
|
|
|
|
return self
|
|
|
|
def perform_indexing(self, next_token=None, batch_size=None):
|
|
if next_token is not None:
|
|
assert isinstance(next_token, V4ScanToken)
|
|
assert isinstance(next_token.min_id, int)
|
|
|
|
return self._model.perform_indexing(next_token, batch_size)
|
|
|
|
def perform_indexing_recent_manifests(self, batch_size=None):
|
|
self._model.perform_indexing_recent_manifests(batch_size)
|
|
|
|
def load_security_information(
|
|
self, manifest_or_legacy_image, include_vulnerabilities, model_cache=None
|
|
):
|
|
manifest = manifest_or_legacy_image.as_manifest()
|
|
|
|
info = self._model.load_security_information(manifest, include_vulnerabilities, model_cache)
|
|
if info.status != ScanLookupStatus.NOT_YET_INDEXED:
|
|
return info
|
|
|
|
return SecurityInformationLookupResult.with_status(ScanLookupStatus.NOT_YET_INDEXED)
|
|
|
|
def register_model_cleanup_callbacks(self, data_model_config):
|
|
return self._model.register_model_cleanup_callbacks(data_model_config)
|
|
|
|
@property
|
|
def legacy_api_handler(self):
|
|
raise NotImplementedError
|
|
|
|
def lookup_notification_page(self, notification_id, page_index=None):
|
|
return self._model.lookup_notification_page(notification_id, page_index)
|
|
|
|
def process_notification_page(self, page_result):
|
|
return self._model.process_notification_page(page_result)
|
|
|
|
def mark_notification_handled(self, notification_id):
|
|
return self._model.mark_notification_handled(notification_id)
|
|
|
|
def garbage_collect_manifest_report(self, manifest_digest):
|
|
return self._model.garbage_collect_manifest_report(manifest_digest)
|
|
|
|
|
|
secscan_model = SecurityScannerModelProxy()
|