1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/notifications/notificationevent.py
Sunandadadi 6688bcca09
backend: implement basic functionality (PROJQUAY-7076) (#2984)
* database: adding subject_backfilled index to manifest table (PROJQUAY-7360) (#2963)

adding subject_backfilled index to manifest table

* Rebasing with main

* updating cypress data

* fixing conflicts and rebasing with latest code

* adding tests

* Forcing an empty commit.

* Forcing an empty commit.

* skip_locked test fix

* adding tests

* minor fixes

---------

Co-authored-by: Brandon Caton <bcaton@redhat.com>
2024-06-27 16:48:39 -04:00

484 lines
16 KiB
Python

import logging
import re
import time
from datetime import datetime
from notifications import build_repository_event_data
from util.jinjautil import get_template_env
from util.secscan import PRIORITY_LEVELS, get_priority_for_index
logger = logging.getLogger(__name__)
TEMPLATE_ENV = get_template_env("events")
class InvalidNotificationEventException(Exception):
pass
class NotificationEvent(object):
def __init__(self):
pass
def get_level(self, event_data, notification_data):
"""
Returns a 'level' representing the severity of the event.
Valid values are: 'info', 'warning', 'error', 'primary', 'success'
"""
raise NotImplementedError
def get_summary(self, event_data, notification_data):
"""
Returns a human readable one-line summary for the given notification data.
"""
raise NotImplementedError
def get_message(self, event_data, notification_data):
"""
Returns a human readable HTML message for the given notification data.
"""
return TEMPLATE_ENV.get_template(self.event_name() + ".html").render(
{"event_data": event_data, "notification_data": notification_data}
)
def get_sample_data(self, namespace_name, repo_name, event_config):
"""
Returns sample data for testing the raising of this notification, with an example
notification.
"""
raise NotImplementedError
def should_perform(self, event_data, notification_data):
"""
Whether a notification for this event should be performed.
By default returns True.
"""
return True
@classmethod
def event_name(cls):
"""
Particular event implemented by subclasses.
"""
raise NotImplementedError
@classmethod
def get_event(cls, eventname):
found = NotificationEvent._get_event(cls, eventname)
if found is not None:
return found
raise InvalidNotificationEventException("Unable to find event: %s" % eventname)
@classmethod
def event_names(cls):
for subc in cls.__subclasses__():
if subc.event_name() is None:
for subsubc in subc.__subclasses__():
yield subsubc.event_name()
else:
yield subc.event_name()
@staticmethod
def _get_event(cls, eventname):
for subc in cls.__subclasses__():
if subc.event_name() is None:
found = NotificationEvent._get_event(subc, eventname)
if found is not None:
return found
elif subc.event_name() == eventname:
return subc()
class RepoPushEvent(NotificationEvent):
@classmethod
def event_name(cls):
return "repo_push"
def get_level(self, event_data, notification_data):
return "primary"
def get_summary(self, event_data, notification_data):
return "Repository %s updated" % (event_data["repository"])
def get_sample_data(self, namespace_name, repo_name, event_config):
return build_repository_event_data(
namespace_name, repo_name, {"updated_tags": ["latest", "foo"], "pruned_image_count": 3}
)
class RepoMirrorSyncStartedEvent(NotificationEvent):
@classmethod
def event_name(cls):
return "repo_mirror_sync_started"
def get_level(self, event_data, notification_data):
return "info"
def get_summary(self, event_data, notification_data):
return "Repository Mirror started for %s" % (event_data["message"])
def get_sample_data(self, namespace_name, repo_name, event_config):
return build_repository_event_data(
namespace_name, repo_name, {"message": "TEST NOTIFICATION"}
)
class RepoMirrorSyncSuccessEvent(NotificationEvent):
@classmethod
def event_name(cls):
return "repo_mirror_sync_success"
def get_level(self, event_data, notification_data):
return "success"
def get_summary(self, event_data, notification_data):
return "Repository Mirror success for %s" % (event_data["message"])
def get_sample_data(self, namespace_name, repo_name, event_config):
return build_repository_event_data(
namespace_name, repo_name, {"message": "TEST NOTIFICATION"}
)
class RepoMirrorSyncFailedEvent(NotificationEvent):
@classmethod
def event_name(cls):
return "repo_mirror_sync_failed"
def get_level(self, event_data, notification_data):
return "error"
def get_summary(self, event_data, notification_data):
return "Repository Mirror failed for %s" % (event_data["message"])
def get_sample_data(self, namespace_name, repo_name, event_config):
return build_repository_event_data(
namespace_name, repo_name, {"message": "TEST NOTIFICATION"}
)
def _build_summary(event_data):
"""
Returns a summary string for the build data found in the event data block.
"""
summary = "for repository %s [%s]" % (event_data["repository"], event_data["build_id"][0:7])
return summary
class VulnerabilityFoundEvent(NotificationEvent):
CONFIG_LEVEL = "level"
PRIORITY_KEY = "priority"
VULNERABILITY_KEY = "vulnerability"
MULTIPLE_VULNERABILITY_KEY = "vulnerabilities"
VULNERABLE_INDEX_REPORT_CREATED = "vulnerable_index_report_created"
@classmethod
def event_name(cls):
return "vulnerability_found"
def get_level(self, event_data, notification_data):
vuln_data = event_data[VulnerabilityFoundEvent.VULNERABILITY_KEY]
priority = vuln_data[VulnerabilityFoundEvent.PRIORITY_KEY]
if priority == "Critical":
return "error"
if priority == "Medium" or priority == "High":
return "warning"
return "info"
def get_sample_data(self, namespace_name, repo_name, event_config):
level = event_config.get(VulnerabilityFoundEvent.CONFIG_LEVEL, "Critical")
return build_repository_event_data(
namespace_name,
repo_name,
{
"tags": ["latest", "prod", "foo", "bar", "baz"],
"image": "some-image-id",
"vulnerability": {
"id": "CVE-FAKE-CVE",
"description": "A futurist vulnerability",
"link": "https://security-tracker.debian.org/tracker/CVE-FAKE-CVE",
"priority": get_priority_for_index(level),
},
},
)
def should_perform(self, event_data, notification_data):
event_config = notification_data.event_config_dict
if VulnerabilityFoundEvent.CONFIG_LEVEL not in event_config:
return True
if VulnerabilityFoundEvent.VULNERABLE_INDEX_REPORT_CREATED in event_data:
return True
if VulnerabilityFoundEvent.VULNERABILITY_KEY not in event_data:
return False
vuln_info = event_data.get(VulnerabilityFoundEvent.VULNERABILITY_KEY, {})
event_severity = PRIORITY_LEVELS.get(vuln_info.get("priority", "Unknown"))
if event_severity is None:
return False
actual_level_index = int(event_severity["index"])
filter_level_index = int(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL])
return actual_level_index <= filter_level_index
def get_summary(self, event_data, notification_data):
vuln_key = VulnerabilityFoundEvent.VULNERABILITY_KEY
priority_key = VulnerabilityFoundEvent.PRIORITY_KEY
multiple_vulns = event_data.get(VulnerabilityFoundEvent.MULTIPLE_VULNERABILITY_KEY)
if multiple_vulns is not None:
top_priority = multiple_vulns[0].get(priority_key, "Unknown")
matching = [v for v in multiple_vulns if v.get(priority_key, "Unknown") == top_priority]
msg = "%s %s" % (len(matching), top_priority)
if len(matching) < len(multiple_vulns):
msg += " and %s more" % (len(multiple_vulns) - len(matching))
msg += " vulnerabilities were detected in repository %s in %s tags"
return msg % (event_data["repository"], len(event_data["tags"]))
else:
msg = "%s vulnerability detected in repository %s in %s tags"
return msg % (
event_data[vuln_key][priority_key],
event_data["repository"],
len(event_data["tags"]),
)
class BaseBuildEvent(NotificationEvent):
@classmethod
def event_name(cls):
return None
def should_perform(self, event_data, notification_data):
if not notification_data.event_config_dict:
return True
event_config = notification_data.event_config_dict
ref_regex = event_config.get("ref-regex") or None
if ref_regex is None:
return True
# Lookup the ref. If none, this is a non-git build and we should not fire the event.
ref = event_data.get("trigger_metadata", {}).get("ref", None)
if ref is None:
return False
# Try parsing the regex string as a regular expression. If we fail, we fail to fire
# the event.
try:
return bool(re.compile(str(ref_regex)).match(ref))
except Exception:
logger.warning("Regular expression error for build event filter: %s", ref_regex)
return False
class BuildQueueEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return "build_queued"
def get_level(self, event_data, notification_data):
return "info"
def get_sample_data(self, namespace_name, repo_name, event_config):
build_uuid = "fake-build-id"
return build_repository_event_data(
namespace_name,
repo_name,
{
"is_manual": False,
"build_id": build_uuid,
"build_name": "some-fake-build",
"docker_tags": ["latest", "foo", "bar"],
"trigger_id": "1245634",
"trigger_kind": "GitHub",
"trigger_metadata": {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
"commit_info": {
"url": "http://path/to/the/commit",
"message": "Some commit message",
"date": time.mktime(datetime.now().timetuple()),
"author": {
"username": "fakeauthor",
"url": "http://path/to/fake/author/in/scm",
"avatar_url": "http://www.gravatar.com/avatar/fakehash",
},
},
},
},
subpage="/build/%s" % build_uuid,
)
def get_summary(self, event_data, notification_data):
return "Build queued " + _build_summary(event_data)
class BuildStartEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return "build_start"
def get_level(self, event_data, notification_data):
return "info"
def get_sample_data(self, namespace_name, repo_name, event_config):
build_uuid = "fake-build-id"
return build_repository_event_data(
namespace_name,
repo_name,
{
"build_id": build_uuid,
"build_name": "some-fake-build",
"docker_tags": ["latest", "foo", "bar"],
"trigger_id": "1245634",
"trigger_kind": "GitHub",
"trigger_metadata": {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
},
},
subpage="/build/%s" % build_uuid,
)
def get_summary(self, event_data, notification_data):
return "Build started " + _build_summary(event_data)
class BuildSuccessEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return "build_success"
def get_level(self, event_data, notification_data):
return "success"
def get_sample_data(self, namespace_name, repo_name, event_config):
build_uuid = "fake-build-id"
return build_repository_event_data(
namespace_name,
repo_name,
{
"build_id": build_uuid,
"build_name": "some-fake-build",
"docker_tags": ["latest", "foo", "bar"],
"trigger_id": "1245634",
"trigger_kind": "GitHub",
"trigger_metadata": {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
},
"image_id": "1245657346",
},
subpage="/build/%s" % build_uuid,
)
def get_summary(self, event_data, notification_data):
return "Build succeeded " + _build_summary(event_data)
class BuildFailureEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return "build_failure"
def get_level(self, event_data, notification_data):
return "error"
def get_sample_data(self, namespace_name, repo_name, event_config):
build_uuid = "fake-build-id"
return build_repository_event_data(
namespace_name,
repo_name,
{
"build_id": build_uuid,
"build_name": "some-fake-build",
"docker_tags": ["latest", "foo", "bar"],
"trigger_kind": "GitHub",
"error_message": "This is a fake error message",
"trigger_id": "1245634",
"trigger_kind": "GitHub",
"trigger_metadata": {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
"commit_info": {
"url": "http://path/to/the/commit",
"message": "Some commit message",
"date": time.mktime(datetime.now().timetuple()),
"author": {
"username": "fakeauthor",
"url": "http://path/to/fake/author/in/scm",
"avatar_url": "http://www.gravatar.com/avatar/fakehash",
},
},
},
},
subpage="/build/%s" % build_uuid,
)
def get_summary(self, event_data, notification_data):
return "Build failure " + _build_summary(event_data)
class BuildCancelledEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return "build_cancelled"
def get_level(self, event_data, notification_data):
return "info"
def get_sample_data(self, namespace_name, repo_name, event_config):
build_uuid = "fake-build-id"
return build_repository_event_data(
namespace_name,
repo_name,
{
"build_id": build_uuid,
"build_name": "some-fake-build",
"docker_tags": ["latest", "foo", "bar"],
"trigger_id": "1245634",
"trigger_kind": "GitHub",
"trigger_metadata": {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
},
"image_id": "1245657346",
},
subpage="/build/%s" % build_uuid,
)
def get_summary(self, event_data, notification_data):
return "Build cancelled " + _build_summary(event_data)
class RepoImageExpiryEvent(NotificationEvent):
@classmethod
def event_name(cls):
return "repo_image_expiry"
def get_level(self, event_data, notification_data):
return "info"
def get_summary(self, event_data, notification_data):
return f"Repository {event_data['repository']} image(s) expiring"
def get_sample_data(self, namespace_name, repo_name, event_config):
return build_repository_event_data(
namespace_name,
repo_name,
{"tags": ["latest", "v1"], "expiring_in": f"{event_config.get('days', None)} days"},
)