mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* chore(pre-commit): match black version with requirements-dev * run `make black` against repo * ci: switch to black 24.4.2 * fix: py312 * fix: flake8 errors * fix: flake8 conflicts * chore: add git blame ignore revs file
113 lines
3.5 KiB
Python
113 lines
3.5 KiB
Python
"""
|
|
List and manage repository vulnerabilities and other security information.
|
|
"""
|
|
|
|
import logging
|
|
from enum import Enum, unique
|
|
|
|
import features
|
|
from app import model_cache, storage
|
|
from auth.decorators import process_basic_auth_no_pass
|
|
from data.registry_model import registry_model
|
|
from data.secscan_model import secscan_model
|
|
from data.secscan_model.datatypes import ScanLookupStatus
|
|
from endpoints.api import (
|
|
RepositoryParamResource,
|
|
deprecated,
|
|
disallow_for_app_repositories,
|
|
nickname,
|
|
parse_args,
|
|
path_param,
|
|
query_param,
|
|
require_repo_read,
|
|
resource,
|
|
show_if,
|
|
)
|
|
from endpoints.api.manifest import MANIFEST_DIGEST_ROUTE
|
|
from endpoints.decorators import anon_allowed
|
|
from endpoints.exception import DownstreamIssue, NotFound
|
|
from util.parsing import truthy_bool
|
|
|
|
|
|
@unique
|
|
class SecurityScanStatus(Enum):
|
|
"""
|
|
Security scan status enum.
|
|
"""
|
|
|
|
SCANNED = "scanned"
|
|
FAILED = "failed"
|
|
QUEUED = "queued"
|
|
UNSUPPORTED = "unsupported"
|
|
MANIFEST_LAYER_TOO_LARGE = "manifest_layer_too_large"
|
|
|
|
|
|
MAPPED_STATUSES = {}
|
|
MAPPED_STATUSES[ScanLookupStatus.FAILED_TO_INDEX] = SecurityScanStatus.FAILED
|
|
MAPPED_STATUSES[ScanLookupStatus.SUCCESS] = SecurityScanStatus.SCANNED
|
|
MAPPED_STATUSES[ScanLookupStatus.NOT_YET_INDEXED] = SecurityScanStatus.QUEUED
|
|
MAPPED_STATUSES[ScanLookupStatus.UNSUPPORTED_FOR_INDEXING] = SecurityScanStatus.UNSUPPORTED
|
|
MAPPED_STATUSES[ScanLookupStatus.MANIFEST_LAYER_TOO_LARGE] = (
|
|
SecurityScanStatus.MANIFEST_LAYER_TOO_LARGE
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _security_info(manifest_or_legacy_image, include_vulnerabilities=True):
|
|
"""
|
|
Returns a dict representing the result of a call to the security status API for the given
|
|
manifest or image.
|
|
"""
|
|
result = secscan_model.load_security_information(
|
|
manifest_or_legacy_image,
|
|
include_vulnerabilities=include_vulnerabilities,
|
|
model_cache=model_cache,
|
|
)
|
|
if result.status == ScanLookupStatus.UNKNOWN_MANIFEST_OR_IMAGE:
|
|
raise NotFound()
|
|
|
|
if result.status == ScanLookupStatus.COULD_NOT_LOAD:
|
|
raise DownstreamIssue(result.scanner_request_error)
|
|
|
|
assert result.status in MAPPED_STATUSES
|
|
return {
|
|
"status": MAPPED_STATUSES[result.status].value,
|
|
"data": (
|
|
result.security_information.to_dict()
|
|
if result.security_information is not None
|
|
else None
|
|
),
|
|
}
|
|
|
|
|
|
@resource(MANIFEST_DIGEST_ROUTE + "/security")
|
|
@show_if(features.SECURITY_SCANNER)
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
@path_param("manifestref", "The digest of the manifest")
|
|
class RepositoryManifestSecurity(RepositoryParamResource):
|
|
"""
|
|
Operations for managing the vulnerabilities in a repository manifest.
|
|
"""
|
|
|
|
@process_basic_auth_no_pass
|
|
@anon_allowed
|
|
@require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
|
|
@nickname("getRepoManifestSecurity")
|
|
@disallow_for_app_repositories
|
|
@parse_args()
|
|
@query_param(
|
|
"vulnerabilities", "Include vulnerabilities informations", type=truthy_bool, default=False
|
|
)
|
|
def get(self, namespace, repository, manifestref, parsed_args):
|
|
repo_ref = registry_model.lookup_repository(namespace, repository)
|
|
if repo_ref is None:
|
|
raise NotFound()
|
|
|
|
manifest = registry_model.lookup_manifest_by_digest(repo_ref, manifestref, allow_dead=True)
|
|
if manifest is None:
|
|
raise NotFound()
|
|
|
|
return _security_info(manifest, parsed_args.vulnerabilities)
|