diff --git a/data/model/notification.py b/data/model/notification.py index 89405bfdd..69c5490b0 100644 --- a/data/model/notification.py +++ b/data/model/notification.py @@ -1,4 +1,5 @@ import json +import logging from peewee import SQL @@ -10,14 +11,16 @@ from data.database import ( NotificationKind, Repository, RepositoryNotification, + TagNotificationSuccess, Team, TeamMember, TeamRole, User, - db_for_update, ) from data.model import InvalidNotificationException, db_transaction +logger = logging.getLogger(__name__) + def create_notification(kind_name, target, metadata={}, lookup_path=None): kind_ref = NotificationKind.get(name=kind_name) @@ -132,7 +135,7 @@ def delete_matching_notifications(target, kind_name, **kwargs): except: continue - for (key, value) in kwargs.items(): + for key, value in kwargs.items(): if not key in metadata or metadata[key] != value: matches = False break @@ -240,6 +243,7 @@ def delete_repo_notification(namespace_name, repository_name, uuid): or found.repository.name != repository_name ): raise InvalidNotificationException("No repository notifiation found with uuid: %s" % uuid) + delete_tag_notifications_for_notification(found.id) found.delete_instance() return found @@ -260,3 +264,21 @@ def list_repo_notifications(namespace_name, repository_name, event_name=None): ) return query + + +def delete_tag_notifications_for_notification(notification_id): + resp = ( + TagNotificationSuccess.delete() + .where(TagNotificationSuccess.notification == notification_id) + .execute() + ) + logger.debug( + f"Deleted {resp} entries from TagNotificationSuccess for RepositoryNotificationId: {notification_id}" + ) + return + + +def delete_tag_notifications_for_tag(tag): + resp = TagNotificationSuccess.delete().where(TagNotificationSuccess.tag == tag.id).execute() + logger.debug(f"Deleted {resp} entries from TagNotificationSuccess for tag: {tag.name}") + return diff --git a/data/model/oci/tag.py b/data/model/oci/tag.py index 5c09dd728..bb8a6e4e2 100644 --- a/data/model/oci/tag.py +++ b/data/model/oci/tag.py @@ -1,3 +1,4 @@ +import datetime import logging import uuid from calendar import timegm @@ -18,6 +19,7 @@ from data.database import ( get_epoch_timestamp_ms, ) from data.model import config, user +from data.model.notification import delete_tag_notifications_for_tag from image.docker.schema1 import ( DOCKER_SCHEMA1_CONTENT_TYPES, DockerSchema1Manifest, @@ -490,6 +492,9 @@ def _delete_tag(tag, now_ms): Deletes the given tag by marking it as expired. """ with db_transaction(): + # clean notifications for tag expiry + delete_tag_notifications_for_tag(tag) + updated = ( Tag.update(lifetime_end_ms=now_ms) .where(Tag.id == tag.id, Tag.lifetime_end_ms == tag.lifetime_end_ms) @@ -626,6 +631,9 @@ def set_tag_end_ms(tag, end_ms): """ with db_transaction(): + # clean notifications for tag expiry + delete_tag_notifications_for_tag(tag) + updated = ( Tag.update(lifetime_end_ms=end_ms) .where(Tag.id == tag) @@ -818,3 +826,31 @@ def fetch_paginated_autoprune_repo_tags_older_than_ms(repo_id, tag_lifetime_ms: raise Exception( f"Error fetching repository tags by creation date for repository id: {repo_id} with error as: {str(err)}" ) + + +def fetch_repo_tags_for_image_expiry_expiry_event(repo_id, days, notified_tags): + """ + notified_tags refer to the tags that were already notified for the event + Return query to fetch repository's distinct active tags that are expiring in x number days + """ + # TODO: check tags expiring due to org-level/repo-level auto-prune policies + try: + future_ms = (datetime.datetime.now() + datetime.timedelta(days=days)).timestamp() * 1000 + now_ms = get_epoch_timestamp_ms() + query = ( + Tag.select(Tag.id, Tag.name) + .where( + Tag.repository_id == repo_id, + (~(Tag.lifetime_end_ms >> None)), # filter for tags where expiry is set + Tag.lifetime_end_ms > now_ms, # filter expired tags + Tag.lifetime_end_ms <= future_ms, + Tag.hidden == False, + Tag.id.not_in(notified_tags), + ) + .distinct() + ) + return list(query) + except Exception as err: + raise Exception( + f"Error fetching repository tags repository id: {repo_id} with error as: {str(err)}" + ) diff --git a/data/model/oci/test/test_oci_tag.py b/data/model/oci/test/test_oci_tag.py index f70839161..ed08b278d 100644 --- a/data/model/oci/test/test_oci_tag.py +++ b/data/model/oci/test/test_oci_tag.py @@ -349,7 +349,7 @@ def test_delete_tag(initialized_db): assert get_tag(repo, tag.name) == tag assert tag.lifetime_end_ms is None - with assert_query_count(3): + with assert_query_count(4): assert delete_tag(repo, tag.name) == tag assert get_tag(repo, tag.name) is None @@ -371,7 +371,7 @@ def test_delete_tag_manifest_list(initialized_db): assert child_tag.name.startswith("$temp-") assert child_tag.lifetime_end_ms > get_epoch_timestamp_ms() - with assert_query_count(8): + with assert_query_count(9): assert delete_tag(repository.id, tag.name) == tag # Assert temporary tags pointing to child manifest are now expired @@ -389,7 +389,7 @@ def test_delete_tags_for_manifest(initialized_db): repo = tag.repository assert get_tag(repo, tag.name) == tag - with assert_query_count(5): + with assert_query_count(6): assert delete_tags_for_manifest(tag.manifest) == [tag] assert get_tag(repo, tag.name) is None diff --git a/endpoints/api/test/test_repositorynotification.py b/endpoints/api/test/test_repositorynotification.py index 868130b64..7de281d6e 100644 --- a/endpoints/api/test/test_repositorynotification.py +++ b/endpoints/api/test/test_repositorynotification.py @@ -1,5 +1,3 @@ -from test.fixtures import * - import pytest from mock import MagicMock, Mock @@ -11,6 +9,7 @@ from endpoints.api.repositorynotification import ( ) from endpoints.api.test.shared import conduct_api_call from endpoints.test.shared import client_with_identity +from test.fixtures import * @pytest.fixture() diff --git a/events/repo_image_expiry.html b/events/repo_image_expiry.html new file mode 100644 index 000000000..639fe9301 --- /dev/null +++ b/events/repo_image_expiry.html @@ -0,0 +1,9 @@ +The following tags of repository {{ event_data.repository | repository_reference }} are due to expire +{% if event_data.expiring_in %} + in {{ event_data.expiring_in }} +{% endif %} +:
+ +{% if event_data.tags %} + {% for tag in event_data.tags %}{%if loop.index > 1 %}, {% endif %}{{ (event_data.repository, tag) | repository_tag_reference }}{% endfor %} +{% endif %} diff --git a/initdb.py b/initdb.py index 06514da10..ece1955ec 100644 --- a/initdb.py +++ b/initdb.py @@ -499,6 +499,7 @@ def initialize_database(): ExternalNotificationEvent.create(name="repo_mirror_sync_started") ExternalNotificationEvent.create(name="repo_mirror_sync_success") ExternalNotificationEvent.create(name="repo_mirror_sync_failed") + ExternalNotificationEvent.create(name="repo_image_expiry") ExternalNotificationMethod.create(name="quay_notification") ExternalNotificationMethod.create(name="email") diff --git a/notifications/notificationevent.py b/notifications/notificationevent.py index 9be022abe..e2ec47f01 100644 --- a/notifications/notificationevent.py +++ b/notifications/notificationevent.py @@ -462,3 +462,22 @@ class BuildCancelledEvent(BaseBuildEvent): def get_summary(self, event_data, notification_data): return "Build cancelled " + _build_summary(event_data) + + +class RepoImageExpiryEvent(NotificationEvent): + @classmethod + def event_name(cls): + return "repo_image_expiry" + + def get_level(self, event_data, notification_data): + return "info" + + def get_summary(self, event_data, notification_data): + return f"Repository {event_data['repository']} image(s) expiring" + + def get_sample_data(self, namespace_name, repo_name, event_config): + return build_repository_event_data( + namespace_name, + repo_name, + {"tags": ["latest", "v1"], "expiring_in": f"{event_config.get('days', None)} days"}, + ) diff --git a/notifications/test/test_tag_expiry_notification.py b/notifications/test/test_tag_expiry_notification.py new file mode 100644 index 000000000..409b0a914 --- /dev/null +++ b/notifications/test/test_tag_expiry_notification.py @@ -0,0 +1,254 @@ +import datetime +import json +import os +import time + +import pytest + +import util +from app import notification_queue +from data.database import ( + ExternalNotificationMethod, + TagNotificationSuccess, + get_epoch_timestamp_ms, +) +from data.model import notification, repository +from data.model.oci.tag import ( + delete_tag, + fetch_repo_tags_for_image_expiry_expiry_event, + list_alive_tags, + set_tag_end_ms, +) +from endpoints.api.repositorynotification_models_pre_oci import pre_oci_model +from test.fixtures import * +from util.notification import * + +namespace = "buynlarge" +repo = "orgrepo" + + +@pytest.fixture +def initial_set(initialized_db): + slack = ExternalNotificationMethod.get(ExternalNotificationMethod.name == "slack") + image_expiry_event = ExternalNotificationEvent.get( + ExternalNotificationEvent.name == "repo_image_expiry" + ) + repo_ref = repository.get_repository(namespace, repo) + + n_1 = pre_oci_model.create_repo_notification( + namespace_name=namespace, + repository_name=repo, + event_name=image_expiry_event.name, + method_name=slack.name, + method_config={"url": "http://example.com"}, + event_config={"days": 5}, + title="Image(s) will expire in 5 days", + ) + n_1 = notification.get_repo_notification(n_1.uuid) + n_2 = notification.create_repo_notification( + repo_ref, + image_expiry_event.name, + slack.name, + {"url": "http://example.com"}, + {"days": 10}, + title="Image(s) will expire in 10 days", + ) + n_2 = notification.get_repo_notification(n_2.uuid) + + tags = list_alive_tags(repo_ref) + for tag in tags: + TagNotificationSuccess.create(notification=n_1.id, tag=tag.id, method=slack.id) + TagNotificationSuccess.create(notification=n_2.id, tag=tag.id, method=slack.id) + return { + "repo_ref": repo_ref, + "image_expiry_event": image_expiry_event, + "slack": slack, + "n_1": n_1, + "n_2": n_2, + "tags": tags, + } + + +@pytest.fixture +def new_notification(initial_set): + return notification.create_repo_notification( + initial_set["repo_ref"], + initial_set["image_expiry_event"].name, + initial_set["slack"].name, + {"url": "http://example.com"}, + {"days": 7}, + title="Image(s) will expire in 7 days", + ) + + +def test_tag_notifications_for_delete_repo_notification(initial_set): + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == len(initial_set["tags"]) + + notification.delete_repo_notification(namespace, repo, initial_set["n_1"].uuid) + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == 0 + + +def test_delete_tag_notifications_for_notification(initial_set): + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == len(initial_set["tags"]) + + notification.delete_tag_notifications_for_notification(initial_set["n_1"].id) + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == 0 + + +def test_delete_tag_notifications_for_tag(initial_set): + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.tag == initial_set["tags"][0].id) + .count() + ) + assert tag_event_count == 2 + + notification.delete_tag_notifications_for_tag(initial_set["tags"][0]) + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.tag == initial_set["tags"][0].id) + .count() + ) + assert tag_event_count == 0 + + +def test_fetch_tags_to_notify(initial_set, new_notification): + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == new_notification.id) + .count() + ) + assert tag_event_count == 0 + + track_tags_to_notify(initial_set["tags"], new_notification) + + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == new_notification.id) + .count() + ) + assert tag_event_count == len(initial_set["tags"]) + + +def test_fetch_notified_tag_ids_for_event(initial_set): + tag_ids = fetch_notified_tag_ids_for_event(initial_set["n_2"]) + for tag in initial_set["tags"]: + assert tag.id in tag_ids + + +def test_fetch_active_notification(initial_set, new_notification): + if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""): + util.notification.SKIP_LOCKED = False + + # causing the event to failure + for i in range(3): + notification.increment_notification_failure_count(new_notification.uuid) + + # creating a `repo_push` event type + notification.create_repo_notification( + initial_set["repo_ref"], + "repo_push", + initial_set["slack"].name, + {"url": "http://example.com"}, + {"days": 7}, + title="Image(s) will expire in 7 days", + ) + + time_now = get_epoch_timestamp_ms() + event = fetch_active_notification(initial_set["image_expiry_event"]) + assert event.id == initial_set["n_1"].id + event = ( + RepositoryNotification.select() + .where(RepositoryNotification.id == initial_set["n_1"].id) + .get() + ) + assert event.last_ran_ms >= time_now + + +def test_scan_for_image_expiry_notifications(initial_set): + if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""): + util.notification.SKIP_LOCKED = False + future_ms = (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp() * 1000 + for tag in initial_set["tags"]: + set_tag_end_ms(tag, future_ms) + scan_for_image_expiry_notifications(initial_set["image_expiry_event"].name) + + time.sleep(2) + job1 = notification_queue.get() + assert job1 is not None + job1 = json.loads(job1["body"]) + for tag in initial_set["tags"]: + assert tag.name in job1["event_data"]["tags"] + + job2 = notification_queue.get() + assert job2 is not None + job2 = json.loads(job2["body"]) + for tag in initial_set["tags"]: + assert tag.name in job2["event_data"]["tags"] + + +def test_fetch_repo_tags_for_image_expiry_expiry_event(initial_set): + future_ms = (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp() * 1000 + expected_tags = [] + for tag in initial_set["tags"]: + if tag.name == "prod": + continue + set_tag_end_ms(tag, future_ms) + expected_tags.append(tag.id) + tags = fetch_repo_tags_for_image_expiry_expiry_event( + initial_set["repo_ref"].id, days=2, notified_tags=[] + ) + assert len(tags) == len(expected_tags) + for tag in tags: + assert tag.id in expected_tags + + +def test_notifications_on_tag_expiry_update(initial_set): + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == len(initial_set["tags"]) + + for tag in initial_set["tags"]: + set_tag_end_ms(tag, get_epoch_timestamp_ms()) + + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == 0 + + +def test_notifications_on_tag_delete(initial_set): + for tag in initial_set["tags"]: + delete_tag(initial_set["repo_ref"].id, tag.name) + + tag_event_count = ( + TagNotificationSuccess.select() + .where(TagNotificationSuccess.notification == initial_set["n_1"].id) + .count() + ) + assert tag_event_count == 0 diff --git a/util/notification.py b/util/notification.py new file mode 100644 index 000000000..307462cf0 --- /dev/null +++ b/util/notification.py @@ -0,0 +1,121 @@ +import json +import logging + +from data.database import ( + ExternalNotificationEvent, + RepositoryNotification, + TagNotificationSuccess, + db_for_update, + get_epoch_timestamp_ms, +) +from data.model import db_transaction, oci +from data.model.user import get_namespace_by_user_id +from data.registry_model import registry_model +from notifications import spawn_notification + +logger = logging.getLogger(__name__) +BATCH_SIZE = 10 + +# Define a constant for the SKIP_LOCKED flag for testing purposes, +# since we test with mysql 5.7 which does not support this flag. +SKIP_LOCKED = True + + +def fetch_active_notification(event, task_run_interval_ms=5 * 60 * 60 * 1000): + """ + task_run_interval_ms specifies how long a task must wait before being ran again. + """ + with db_transaction(): + try: + # Fetch active notifications that match the event_name + query = ( + RepositoryNotification.select( + RepositoryNotification.id, + RepositoryNotification.method, + RepositoryNotification.repository, + RepositoryNotification.event_config_json, + ) + .where( + RepositoryNotification.event == event.id, + RepositoryNotification.number_of_failures < 3, + ( + RepositoryNotification.last_ran_ms + < get_epoch_timestamp_ms() - task_run_interval_ms + ) + | (RepositoryNotification.last_ran_ms.is_null(True)), + ) + .order_by(RepositoryNotification.last_ran_ms.asc(nulls="first")) + ) + notification = db_for_update(query, skip_locked=SKIP_LOCKED).get() + + RepositoryNotification.update(last_ran_ms=get_epoch_timestamp_ms()).where( + RepositoryNotification.id == notification.id + ).execute() + return notification + + except RepositoryNotification.DoesNotExist: + return None + + +def track_tags_to_notify(tags, notification): + for tag in tags: + TagNotificationSuccess.create( + notification=notification.id, tag=tag.id, method=notification.method + ) + + +def fetch_notified_tag_ids_for_event(notification): + response = ( + TagNotificationSuccess.select(TagNotificationSuccess.tag) + .where( + TagNotificationSuccess.notification == notification.id, + TagNotificationSuccess.method == notification.method, + ) + .distinct() + ) + return [r.tag.id for r in response] + + +def scan_for_image_expiry_notifications(event_name, batch_size=BATCH_SIZE): + """ + Get the repository notification prioritized by last_ran_ms = None followed by asc order of last_ran_ms. + """ + event = ExternalNotificationEvent.get(ExternalNotificationEvent.name == event_name) + for _ in range(batch_size): + notification = fetch_active_notification(event) + if not notification: + return + + repository = notification.repository + repo_id = repository.id + config = json.loads(notification.event_config_json) + + if not config.get("days", None): + logger.error( + f"Missing key days in config for notification_id:{notification.id} created for repository_id:{repo_id}" + ) + continue + + # Fetch tags that were already notified + notified_tags = fetch_notified_tag_ids_for_event(notification) + + # Fetch tags matching notification's config + tags = oci.tag.fetch_repo_tags_for_image_expiry_expiry_event( + repo_id, config["days"], notified_tags + ) + if not len(tags): + continue + + track_tags_to_notify(tags, notification) + + namespace_name = get_namespace_by_user_id(repository.namespace_user) + repository_ref = registry_model.lookup_repository(namespace_name, repository.name) + + # Push tags into queue notification worker queue + spawn_notification( + repository_ref, + event_name, + {"tags": [tag.name for tag in tags], "expiring_in": f"{config['days']} days"}, + ) + + return diff --git a/workers/gc/gcworker.py b/workers/gc/gcworker.py index 65907d1cd..43c37f2f3 100644 --- a/workers/gc/gcworker.py +++ b/workers/gc/gcworker.py @@ -8,8 +8,10 @@ from data.database import Repository, RepositoryState, UseThenDisconnect from data.model.gc import garbage_collect_repo from data.model.repository import get_random_gc_policy from data.registry_model import registry_model +from notifications.notificationevent import RepoImageExpiryEvent from util.locking import GlobalLock, LockNotAcquiredException from util.metrics.prometheus import gc_iterations +from util.notification import scan_for_image_expiry_notifications from workers.gunicorn_worker import GunicornWorker from workers.worker import Worker @@ -30,6 +32,13 @@ class GarbageCollectionWorker(Worker): self.add_operation( self._garbage_collection_repos, app.config.get("GARBAGE_COLLECTION_FREQUENCY", 30) ) + self.add_operation( + self._scan_notifications, app.config.get("GARBAGE_COLLECTION_FREQUENCY", 30) + ) + + def _scan_notifications(self): + # scan for tags that are expiring based on configured RepositoryNotifications + scan_for_image_expiry_notifications(event_name=RepoImageExpiryEvent.event_name()) def _garbage_collection_repos(self, skip_lock_for_testing=False): """ @@ -49,10 +58,14 @@ class GarbageCollectionWorker(Worker): assert features.GARBAGE_COLLECTION try: - with GlobalLock( - "REPO_GARBAGE_COLLECTION_%s" % repo_ref.id, - lock_ttl=REPOSITORY_GC_TIMEOUT + LOCK_TIMEOUT_PADDING, - ) if not skip_lock_for_testing else empty_context(): + with ( + GlobalLock( + "REPO_GARBAGE_COLLECTION_%s" % repo_ref.id, + lock_ttl=REPOSITORY_GC_TIMEOUT + LOCK_TIMEOUT_PADDING, + ) + if not skip_lock_for_testing + else empty_context() + ): try: repository = Repository.get(id=repo_ref.id) except Repository.DoesNotExist: