mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
98 lines
3.5 KiB
Python
98 lines
3.5 KiB
Python
import json
|
|
import os
|
|
from urllib.parse import urlparse
|
|
|
|
import pytest
|
|
|
|
from app import instance_keys, notification_queue, secscan_notification_queue, storage
|
|
from data import database, model
|
|
from data.registry_model import registry_model
|
|
from data.secscan_model import secscan_model
|
|
from test.fixtures import *
|
|
from util.secscan.v4.fake import fake_security_scanner
|
|
from workers.securityscanningnotificationworker import (
|
|
SecurityScanningNotificationWorker,
|
|
)
|
|
|
|
from app import app as flask_app # isort: skip
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
os.environ.get("TEST_DATABASE_URI", "").find("mysql") >= 0, reason="Flaky on MySQL"
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"issue", [None, "wrong_id", "no_event_registered", "severity_too_low", "no_matching_manifest"]
|
|
)
|
|
def test_notification(issue, initialized_db):
|
|
worker = SecurityScanningNotificationWorker(secscan_notification_queue)
|
|
secscan_model.configure(flask_app, instance_keys, storage)
|
|
worker._secscan_model = secscan_model
|
|
|
|
hostname = urlparse(flask_app.config["SECURITY_SCANNER_V4_ENDPOINT"]).netloc
|
|
with fake_security_scanner(hostname=hostname) as fake:
|
|
repository_ref = registry_model.lookup_repository("devtable", "simple")
|
|
|
|
# Add a security notification event to the repository.
|
|
if issue != "no_event_registered":
|
|
model.notification.create_repo_notification(
|
|
repository_ref.id,
|
|
"vulnerability_found",
|
|
"webhook",
|
|
{},
|
|
{
|
|
"vulnerability": {
|
|
"priority": "Low" if issue != "severity_too_low" else "Critical",
|
|
},
|
|
},
|
|
)
|
|
|
|
tag = registry_model.get_repo_tag(repository_ref, "latest")
|
|
manifest = registry_model.get_manifest_for_tag(tag)
|
|
|
|
# Add a notification to the scanner, matching the manifest.
|
|
notification_id = "somenotificationid"
|
|
fake.add_notification(
|
|
notification_id if issue != "wrong_id" else "wrongid",
|
|
manifest.digest if issue != "no_matching_manifest" else "sha256:incorrect",
|
|
"added",
|
|
{
|
|
"normalized_severity": "High",
|
|
"description": "Some description",
|
|
"package": {
|
|
"id": "42",
|
|
"name": "FooBar",
|
|
"version": "v0.0.1",
|
|
},
|
|
"name": "BarBaz",
|
|
"links": "http://example.com",
|
|
},
|
|
)
|
|
|
|
# Add the notification to the queue.
|
|
name = ["with_id", notification_id]
|
|
secscan_notification_queue.put(
|
|
name,
|
|
json.dumps({"notification_id": notification_id}),
|
|
)
|
|
|
|
# Process the notification via the worker.
|
|
worker.poll_queue()
|
|
|
|
# Ensure the repository notification was enqueued.
|
|
found = notification_queue.get()
|
|
if issue:
|
|
assert found is None
|
|
return
|
|
|
|
assert found is not None
|
|
|
|
body = json.loads(found["body"])
|
|
|
|
assert body["event_data"]["repository"] == "devtable/simple"
|
|
assert body["event_data"]["namespace"] == "devtable"
|
|
assert body["event_data"]["name"] == "simple"
|
|
assert body["event_data"]["tags"] == ["latest"]
|
|
assert body["event_data"]["vulnerability"]["id"] == "BarBaz"
|
|
assert body["event_data"]["vulnerability"]["description"] == "Some description"
|
|
assert body["event_data"]["vulnerability"]["priority"] == "High"
|