1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/workers/test/test_securityscanningnotificationworker.py
Brandon Caton 5c1c21bb1b ci: fixing app type import (PROJQUAY-8991) (#4021)
ci: fixing app type import
2025-06-10 09:14:57 -04:00

98 lines
3.5 KiB
Python

import json
import os
from urllib.parse import urlparse
import pytest
from app import instance_keys, notification_queue, secscan_notification_queue, storage
from data import database, model
from data.registry_model import registry_model
from data.secscan_model import secscan_model
from test.fixtures import *
from util.secscan.v4.fake import fake_security_scanner
from workers.securityscanningnotificationworker import (
SecurityScanningNotificationWorker,
)
from app import app as flask_app # isort: skip
@pytest.mark.skipif(
os.environ.get("TEST_DATABASE_URI", "").find("mysql") >= 0, reason="Flaky on MySQL"
)
@pytest.mark.parametrize(
"issue", [None, "wrong_id", "no_event_registered", "severity_too_low", "no_matching_manifest"]
)
def test_notification(issue, initialized_db):
worker = SecurityScanningNotificationWorker(secscan_notification_queue)
secscan_model.configure(flask_app, instance_keys, storage)
worker._secscan_model = secscan_model
hostname = urlparse(flask_app.config["SECURITY_SCANNER_V4_ENDPOINT"]).netloc
with fake_security_scanner(hostname=hostname) as fake:
repository_ref = registry_model.lookup_repository("devtable", "simple")
# Add a security notification event to the repository.
if issue != "no_event_registered":
model.notification.create_repo_notification(
repository_ref.id,
"vulnerability_found",
"webhook",
{},
{
"vulnerability": {
"priority": "Low" if issue != "severity_too_low" else "Critical",
},
},
)
tag = registry_model.get_repo_tag(repository_ref, "latest")
manifest = registry_model.get_manifest_for_tag(tag)
# Add a notification to the scanner, matching the manifest.
notification_id = "somenotificationid"
fake.add_notification(
notification_id if issue != "wrong_id" else "wrongid",
manifest.digest if issue != "no_matching_manifest" else "sha256:incorrect",
"added",
{
"normalized_severity": "High",
"description": "Some description",
"package": {
"id": "42",
"name": "FooBar",
"version": "v0.0.1",
},
"name": "BarBaz",
"links": "http://example.com",
},
)
# Add the notification to the queue.
name = ["with_id", notification_id]
secscan_notification_queue.put(
name,
json.dumps({"notification_id": notification_id}),
)
# Process the notification via the worker.
worker.poll_queue()
# Ensure the repository notification was enqueued.
found = notification_queue.get()
if issue:
assert found is None
return
assert found is not None
body = json.loads(found["body"])
assert body["event_data"]["repository"] == "devtable/simple"
assert body["event_data"]["namespace"] == "devtable"
assert body["event_data"]["name"] == "simple"
assert body["event_data"]["tags"] == ["latest"]
assert body["event_data"]["vulnerability"]["id"] == "BarBaz"
assert body["event_data"]["vulnerability"]["description"] == "Some description"
assert body["event_data"]["vulnerability"]["priority"] == "High"