mirror of
https://github.com/quay/quay.git
synced 2025-04-18 10:44:06 +03:00
* database: adding subject_backfilled index to manifest table (PROJQUAY-7360) (#2963) adding subject_backfilled index to manifest table * Rebasing with main * updating cypress data * fixing conflicts and rebasing with latest code * adding tests * Forcing an empty commit. * Forcing an empty commit. * skip_locked test fix * adding tests * minor fixes --------- Co-authored-by: Brandon Caton <bcaton@redhat.com>
484 lines
16 KiB
Python
484 lines
16 KiB
Python
import logging
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from notifications import build_repository_event_data
|
|
from util.jinjautil import get_template_env
|
|
from util.secscan import PRIORITY_LEVELS, get_priority_for_index
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TEMPLATE_ENV = get_template_env("events")
|
|
|
|
|
|
class InvalidNotificationEventException(Exception):
|
|
pass
|
|
|
|
|
|
class NotificationEvent(object):
|
|
def __init__(self):
|
|
pass
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
"""
|
|
Returns a 'level' representing the severity of the event.
|
|
|
|
Valid values are: 'info', 'warning', 'error', 'primary', 'success'
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
"""
|
|
Returns a human readable one-line summary for the given notification data.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def get_message(self, event_data, notification_data):
|
|
"""
|
|
Returns a human readable HTML message for the given notification data.
|
|
"""
|
|
return TEMPLATE_ENV.get_template(self.event_name() + ".html").render(
|
|
{"event_data": event_data, "notification_data": notification_data}
|
|
)
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
"""
|
|
Returns sample data for testing the raising of this notification, with an example
|
|
notification.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def should_perform(self, event_data, notification_data):
|
|
"""
|
|
Whether a notification for this event should be performed.
|
|
|
|
By default returns True.
|
|
"""
|
|
return True
|
|
|
|
@classmethod
|
|
def event_name(cls):
|
|
"""
|
|
Particular event implemented by subclasses.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def get_event(cls, eventname):
|
|
found = NotificationEvent._get_event(cls, eventname)
|
|
if found is not None:
|
|
return found
|
|
|
|
raise InvalidNotificationEventException("Unable to find event: %s" % eventname)
|
|
|
|
@classmethod
|
|
def event_names(cls):
|
|
for subc in cls.__subclasses__():
|
|
if subc.event_name() is None:
|
|
for subsubc in subc.__subclasses__():
|
|
yield subsubc.event_name()
|
|
else:
|
|
yield subc.event_name()
|
|
|
|
@staticmethod
|
|
def _get_event(cls, eventname):
|
|
for subc in cls.__subclasses__():
|
|
if subc.event_name() is None:
|
|
found = NotificationEvent._get_event(subc, eventname)
|
|
if found is not None:
|
|
return found
|
|
elif subc.event_name() == eventname:
|
|
return subc()
|
|
|
|
|
|
class RepoPushEvent(NotificationEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "repo_push"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "primary"
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Repository %s updated" % (event_data["repository"])
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
return build_repository_event_data(
|
|
namespace_name, repo_name, {"updated_tags": ["latest", "foo"], "pruned_image_count": 3}
|
|
)
|
|
|
|
|
|
class RepoMirrorSyncStartedEvent(NotificationEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "repo_mirror_sync_started"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "info"
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Repository Mirror started for %s" % (event_data["message"])
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
return build_repository_event_data(
|
|
namespace_name, repo_name, {"message": "TEST NOTIFICATION"}
|
|
)
|
|
|
|
|
|
class RepoMirrorSyncSuccessEvent(NotificationEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "repo_mirror_sync_success"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "success"
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Repository Mirror success for %s" % (event_data["message"])
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
return build_repository_event_data(
|
|
namespace_name, repo_name, {"message": "TEST NOTIFICATION"}
|
|
)
|
|
|
|
|
|
class RepoMirrorSyncFailedEvent(NotificationEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "repo_mirror_sync_failed"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "error"
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Repository Mirror failed for %s" % (event_data["message"])
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
return build_repository_event_data(
|
|
namespace_name, repo_name, {"message": "TEST NOTIFICATION"}
|
|
)
|
|
|
|
|
|
def _build_summary(event_data):
|
|
"""
|
|
Returns a summary string for the build data found in the event data block.
|
|
"""
|
|
summary = "for repository %s [%s]" % (event_data["repository"], event_data["build_id"][0:7])
|
|
return summary
|
|
|
|
|
|
class VulnerabilityFoundEvent(NotificationEvent):
|
|
CONFIG_LEVEL = "level"
|
|
PRIORITY_KEY = "priority"
|
|
VULNERABILITY_KEY = "vulnerability"
|
|
MULTIPLE_VULNERABILITY_KEY = "vulnerabilities"
|
|
VULNERABLE_INDEX_REPORT_CREATED = "vulnerable_index_report_created"
|
|
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "vulnerability_found"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
vuln_data = event_data[VulnerabilityFoundEvent.VULNERABILITY_KEY]
|
|
priority = vuln_data[VulnerabilityFoundEvent.PRIORITY_KEY]
|
|
if priority == "Critical":
|
|
return "error"
|
|
|
|
if priority == "Medium" or priority == "High":
|
|
return "warning"
|
|
|
|
return "info"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
level = event_config.get(VulnerabilityFoundEvent.CONFIG_LEVEL, "Critical")
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{
|
|
"tags": ["latest", "prod", "foo", "bar", "baz"],
|
|
"image": "some-image-id",
|
|
"vulnerability": {
|
|
"id": "CVE-FAKE-CVE",
|
|
"description": "A futurist vulnerability",
|
|
"link": "https://security-tracker.debian.org/tracker/CVE-FAKE-CVE",
|
|
"priority": get_priority_for_index(level),
|
|
},
|
|
},
|
|
)
|
|
|
|
def should_perform(self, event_data, notification_data):
|
|
event_config = notification_data.event_config_dict
|
|
if VulnerabilityFoundEvent.CONFIG_LEVEL not in event_config:
|
|
return True
|
|
|
|
if VulnerabilityFoundEvent.VULNERABLE_INDEX_REPORT_CREATED in event_data:
|
|
return True
|
|
|
|
if VulnerabilityFoundEvent.VULNERABILITY_KEY not in event_data:
|
|
return False
|
|
|
|
vuln_info = event_data.get(VulnerabilityFoundEvent.VULNERABILITY_KEY, {})
|
|
event_severity = PRIORITY_LEVELS.get(vuln_info.get("priority", "Unknown"))
|
|
if event_severity is None:
|
|
return False
|
|
|
|
actual_level_index = int(event_severity["index"])
|
|
filter_level_index = int(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL])
|
|
return actual_level_index <= filter_level_index
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
vuln_key = VulnerabilityFoundEvent.VULNERABILITY_KEY
|
|
priority_key = VulnerabilityFoundEvent.PRIORITY_KEY
|
|
|
|
multiple_vulns = event_data.get(VulnerabilityFoundEvent.MULTIPLE_VULNERABILITY_KEY)
|
|
if multiple_vulns is not None:
|
|
top_priority = multiple_vulns[0].get(priority_key, "Unknown")
|
|
matching = [v for v in multiple_vulns if v.get(priority_key, "Unknown") == top_priority]
|
|
|
|
msg = "%s %s" % (len(matching), top_priority)
|
|
if len(matching) < len(multiple_vulns):
|
|
msg += " and %s more" % (len(multiple_vulns) - len(matching))
|
|
|
|
msg += " vulnerabilities were detected in repository %s in %s tags"
|
|
return msg % (event_data["repository"], len(event_data["tags"]))
|
|
else:
|
|
msg = "%s vulnerability detected in repository %s in %s tags"
|
|
return msg % (
|
|
event_data[vuln_key][priority_key],
|
|
event_data["repository"],
|
|
len(event_data["tags"]),
|
|
)
|
|
|
|
|
|
class BaseBuildEvent(NotificationEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return None
|
|
|
|
def should_perform(self, event_data, notification_data):
|
|
if not notification_data.event_config_dict:
|
|
return True
|
|
|
|
event_config = notification_data.event_config_dict
|
|
ref_regex = event_config.get("ref-regex") or None
|
|
if ref_regex is None:
|
|
return True
|
|
|
|
# Lookup the ref. If none, this is a non-git build and we should not fire the event.
|
|
ref = event_data.get("trigger_metadata", {}).get("ref", None)
|
|
if ref is None:
|
|
return False
|
|
|
|
# Try parsing the regex string as a regular expression. If we fail, we fail to fire
|
|
# the event.
|
|
try:
|
|
return bool(re.compile(str(ref_regex)).match(ref))
|
|
except Exception:
|
|
logger.warning("Regular expression error for build event filter: %s", ref_regex)
|
|
return False
|
|
|
|
|
|
class BuildQueueEvent(BaseBuildEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "build_queued"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "info"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
build_uuid = "fake-build-id"
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{
|
|
"is_manual": False,
|
|
"build_id": build_uuid,
|
|
"build_name": "some-fake-build",
|
|
"docker_tags": ["latest", "foo", "bar"],
|
|
"trigger_id": "1245634",
|
|
"trigger_kind": "GitHub",
|
|
"trigger_metadata": {
|
|
"default_branch": "master",
|
|
"ref": "refs/heads/somebranch",
|
|
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
|
|
"commit_info": {
|
|
"url": "http://path/to/the/commit",
|
|
"message": "Some commit message",
|
|
"date": time.mktime(datetime.now().timetuple()),
|
|
"author": {
|
|
"username": "fakeauthor",
|
|
"url": "http://path/to/fake/author/in/scm",
|
|
"avatar_url": "http://www.gravatar.com/avatar/fakehash",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
subpage="/build/%s" % build_uuid,
|
|
)
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Build queued " + _build_summary(event_data)
|
|
|
|
|
|
class BuildStartEvent(BaseBuildEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "build_start"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "info"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
build_uuid = "fake-build-id"
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{
|
|
"build_id": build_uuid,
|
|
"build_name": "some-fake-build",
|
|
"docker_tags": ["latest", "foo", "bar"],
|
|
"trigger_id": "1245634",
|
|
"trigger_kind": "GitHub",
|
|
"trigger_metadata": {
|
|
"default_branch": "master",
|
|
"ref": "refs/heads/somebranch",
|
|
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
|
|
},
|
|
},
|
|
subpage="/build/%s" % build_uuid,
|
|
)
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Build started " + _build_summary(event_data)
|
|
|
|
|
|
class BuildSuccessEvent(BaseBuildEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "build_success"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "success"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
build_uuid = "fake-build-id"
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{
|
|
"build_id": build_uuid,
|
|
"build_name": "some-fake-build",
|
|
"docker_tags": ["latest", "foo", "bar"],
|
|
"trigger_id": "1245634",
|
|
"trigger_kind": "GitHub",
|
|
"trigger_metadata": {
|
|
"default_branch": "master",
|
|
"ref": "refs/heads/somebranch",
|
|
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
|
|
},
|
|
"image_id": "1245657346",
|
|
},
|
|
subpage="/build/%s" % build_uuid,
|
|
)
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Build succeeded " + _build_summary(event_data)
|
|
|
|
|
|
class BuildFailureEvent(BaseBuildEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "build_failure"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "error"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
build_uuid = "fake-build-id"
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{
|
|
"build_id": build_uuid,
|
|
"build_name": "some-fake-build",
|
|
"docker_tags": ["latest", "foo", "bar"],
|
|
"trigger_kind": "GitHub",
|
|
"error_message": "This is a fake error message",
|
|
"trigger_id": "1245634",
|
|
"trigger_kind": "GitHub",
|
|
"trigger_metadata": {
|
|
"default_branch": "master",
|
|
"ref": "refs/heads/somebranch",
|
|
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
|
|
"commit_info": {
|
|
"url": "http://path/to/the/commit",
|
|
"message": "Some commit message",
|
|
"date": time.mktime(datetime.now().timetuple()),
|
|
"author": {
|
|
"username": "fakeauthor",
|
|
"url": "http://path/to/fake/author/in/scm",
|
|
"avatar_url": "http://www.gravatar.com/avatar/fakehash",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
subpage="/build/%s" % build_uuid,
|
|
)
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Build failure " + _build_summary(event_data)
|
|
|
|
|
|
class BuildCancelledEvent(BaseBuildEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "build_cancelled"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "info"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
build_uuid = "fake-build-id"
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{
|
|
"build_id": build_uuid,
|
|
"build_name": "some-fake-build",
|
|
"docker_tags": ["latest", "foo", "bar"],
|
|
"trigger_id": "1245634",
|
|
"trigger_kind": "GitHub",
|
|
"trigger_metadata": {
|
|
"default_branch": "master",
|
|
"ref": "refs/heads/somebranch",
|
|
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
|
|
},
|
|
"image_id": "1245657346",
|
|
},
|
|
subpage="/build/%s" % build_uuid,
|
|
)
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return "Build cancelled " + _build_summary(event_data)
|
|
|
|
|
|
class RepoImageExpiryEvent(NotificationEvent):
|
|
@classmethod
|
|
def event_name(cls):
|
|
return "repo_image_expiry"
|
|
|
|
def get_level(self, event_data, notification_data):
|
|
return "info"
|
|
|
|
def get_summary(self, event_data, notification_data):
|
|
return f"Repository {event_data['repository']} image(s) expiring"
|
|
|
|
def get_sample_data(self, namespace_name, repo_name, event_config):
|
|
return build_repository_event_data(
|
|
namespace_name,
|
|
repo_name,
|
|
{"tags": ["latest", "v1"], "expiring_in": f"{event_config.get('days', None)} days"},
|
|
)
|