1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00

backend: implement basic functionality (PROJQUAY-7076) (#2984)

* database: adding subject_backfilled index to manifest table (PROJQUAY-7360) (#2963)

adding subject_backfilled index to manifest table

* Rebasing with main

* updating cypress data

* fixing conflicts and rebasing with latest code

* adding tests

* Forcing an empty commit.

* Forcing an empty commit.

* skip_locked test fix

* adding tests

* minor fixes

---------

Co-authored-by: Brandon Caton <bcaton@redhat.com>
This commit is contained in:
Sunandadadi 2024-06-27 16:48:39 -04:00 committed by GitHub
parent 70b03cadc3
commit 6688bcca09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 485 additions and 11 deletions

View File

@ -1,4 +1,5 @@
import json
import logging
from peewee import SQL
@ -10,14 +11,16 @@ from data.database import (
NotificationKind,
Repository,
RepositoryNotification,
TagNotificationSuccess,
Team,
TeamMember,
TeamRole,
User,
db_for_update,
)
from data.model import InvalidNotificationException, db_transaction
logger = logging.getLogger(__name__)
def create_notification(kind_name, target, metadata={}, lookup_path=None):
kind_ref = NotificationKind.get(name=kind_name)
@ -132,7 +135,7 @@ def delete_matching_notifications(target, kind_name, **kwargs):
except:
continue
for (key, value) in kwargs.items():
for key, value in kwargs.items():
if not key in metadata or metadata[key] != value:
matches = False
break
@ -240,6 +243,7 @@ def delete_repo_notification(namespace_name, repository_name, uuid):
or found.repository.name != repository_name
):
raise InvalidNotificationException("No repository notifiation found with uuid: %s" % uuid)
delete_tag_notifications_for_notification(found.id)
found.delete_instance()
return found
@ -260,3 +264,21 @@ def list_repo_notifications(namespace_name, repository_name, event_name=None):
)
return query
def delete_tag_notifications_for_notification(notification_id):
resp = (
TagNotificationSuccess.delete()
.where(TagNotificationSuccess.notification == notification_id)
.execute()
)
logger.debug(
f"Deleted {resp} entries from TagNotificationSuccess for RepositoryNotificationId: {notification_id}"
)
return
def delete_tag_notifications_for_tag(tag):
resp = TagNotificationSuccess.delete().where(TagNotificationSuccess.tag == tag.id).execute()
logger.debug(f"Deleted {resp} entries from TagNotificationSuccess for tag: {tag.name}")
return

View File

@ -1,3 +1,4 @@
import datetime
import logging
import uuid
from calendar import timegm
@ -18,6 +19,7 @@ from data.database import (
get_epoch_timestamp_ms,
)
from data.model import config, user
from data.model.notification import delete_tag_notifications_for_tag
from image.docker.schema1 import (
DOCKER_SCHEMA1_CONTENT_TYPES,
DockerSchema1Manifest,
@ -490,6 +492,9 @@ def _delete_tag(tag, now_ms):
Deletes the given tag by marking it as expired.
"""
with db_transaction():
# clean notifications for tag expiry
delete_tag_notifications_for_tag(tag)
updated = (
Tag.update(lifetime_end_ms=now_ms)
.where(Tag.id == tag.id, Tag.lifetime_end_ms == tag.lifetime_end_ms)
@ -626,6 +631,9 @@ def set_tag_end_ms(tag, end_ms):
"""
with db_transaction():
# clean notifications for tag expiry
delete_tag_notifications_for_tag(tag)
updated = (
Tag.update(lifetime_end_ms=end_ms)
.where(Tag.id == tag)
@ -818,3 +826,31 @@ def fetch_paginated_autoprune_repo_tags_older_than_ms(repo_id, tag_lifetime_ms:
raise Exception(
f"Error fetching repository tags by creation date for repository id: {repo_id} with error as: {str(err)}"
)
def fetch_repo_tags_for_image_expiry_expiry_event(repo_id, days, notified_tags):
"""
notified_tags refer to the tags that were already notified for the event
Return query to fetch repository's distinct active tags that are expiring in x number days
"""
# TODO: check tags expiring due to org-level/repo-level auto-prune policies
try:
future_ms = (datetime.datetime.now() + datetime.timedelta(days=days)).timestamp() * 1000
now_ms = get_epoch_timestamp_ms()
query = (
Tag.select(Tag.id, Tag.name)
.where(
Tag.repository_id == repo_id,
(~(Tag.lifetime_end_ms >> None)), # filter for tags where expiry is set
Tag.lifetime_end_ms > now_ms, # filter expired tags
Tag.lifetime_end_ms <= future_ms,
Tag.hidden == False,
Tag.id.not_in(notified_tags),
)
.distinct()
)
return list(query)
except Exception as err:
raise Exception(
f"Error fetching repository tags repository id: {repo_id} with error as: {str(err)}"
)

View File

@ -349,7 +349,7 @@ def test_delete_tag(initialized_db):
assert get_tag(repo, tag.name) == tag
assert tag.lifetime_end_ms is None
with assert_query_count(3):
with assert_query_count(4):
assert delete_tag(repo, tag.name) == tag
assert get_tag(repo, tag.name) is None
@ -371,7 +371,7 @@ def test_delete_tag_manifest_list(initialized_db):
assert child_tag.name.startswith("$temp-")
assert child_tag.lifetime_end_ms > get_epoch_timestamp_ms()
with assert_query_count(8):
with assert_query_count(9):
assert delete_tag(repository.id, tag.name) == tag
# Assert temporary tags pointing to child manifest are now expired
@ -389,7 +389,7 @@ def test_delete_tags_for_manifest(initialized_db):
repo = tag.repository
assert get_tag(repo, tag.name) == tag
with assert_query_count(5):
with assert_query_count(6):
assert delete_tags_for_manifest(tag.manifest) == [tag]
assert get_tag(repo, tag.name) is None

View File

@ -1,5 +1,3 @@
from test.fixtures import *
import pytest
from mock import MagicMock, Mock
@ -11,6 +9,7 @@ from endpoints.api.repositorynotification import (
)
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from test.fixtures import *
@pytest.fixture()

View File

@ -0,0 +1,9 @@
The following tags of repository {{ event_data.repository | repository_reference }} are due to expire
{% if event_data.expiring_in %}
in {{ event_data.expiring_in }}
{% endif %}
: <br>
{% if event_data.tags %}
{% for tag in event_data.tags %}{%if loop.index > 1 %}, {% endif %}{{ (event_data.repository, tag) | repository_tag_reference }}{% endfor %}
{% endif %}

View File

@ -499,6 +499,7 @@ def initialize_database():
ExternalNotificationEvent.create(name="repo_mirror_sync_started")
ExternalNotificationEvent.create(name="repo_mirror_sync_success")
ExternalNotificationEvent.create(name="repo_mirror_sync_failed")
ExternalNotificationEvent.create(name="repo_image_expiry")
ExternalNotificationMethod.create(name="quay_notification")
ExternalNotificationMethod.create(name="email")

View File

@ -462,3 +462,22 @@ class BuildCancelledEvent(BaseBuildEvent):
def get_summary(self, event_data, notification_data):
return "Build cancelled " + _build_summary(event_data)
class RepoImageExpiryEvent(NotificationEvent):
@classmethod
def event_name(cls):
return "repo_image_expiry"
def get_level(self, event_data, notification_data):
return "info"
def get_summary(self, event_data, notification_data):
return f"Repository {event_data['repository']} image(s) expiring"
def get_sample_data(self, namespace_name, repo_name, event_config):
return build_repository_event_data(
namespace_name,
repo_name,
{"tags": ["latest", "v1"], "expiring_in": f"{event_config.get('days', None)} days"},
)

View File

@ -0,0 +1,254 @@
import datetime
import json
import os
import time
import pytest
import util
from app import notification_queue
from data.database import (
ExternalNotificationMethod,
TagNotificationSuccess,
get_epoch_timestamp_ms,
)
from data.model import notification, repository
from data.model.oci.tag import (
delete_tag,
fetch_repo_tags_for_image_expiry_expiry_event,
list_alive_tags,
set_tag_end_ms,
)
from endpoints.api.repositorynotification_models_pre_oci import pre_oci_model
from test.fixtures import *
from util.notification import *
namespace = "buynlarge"
repo = "orgrepo"
@pytest.fixture
def initial_set(initialized_db):
slack = ExternalNotificationMethod.get(ExternalNotificationMethod.name == "slack")
image_expiry_event = ExternalNotificationEvent.get(
ExternalNotificationEvent.name == "repo_image_expiry"
)
repo_ref = repository.get_repository(namespace, repo)
n_1 = pre_oci_model.create_repo_notification(
namespace_name=namespace,
repository_name=repo,
event_name=image_expiry_event.name,
method_name=slack.name,
method_config={"url": "http://example.com"},
event_config={"days": 5},
title="Image(s) will expire in 5 days",
)
n_1 = notification.get_repo_notification(n_1.uuid)
n_2 = notification.create_repo_notification(
repo_ref,
image_expiry_event.name,
slack.name,
{"url": "http://example.com"},
{"days": 10},
title="Image(s) will expire in 10 days",
)
n_2 = notification.get_repo_notification(n_2.uuid)
tags = list_alive_tags(repo_ref)
for tag in tags:
TagNotificationSuccess.create(notification=n_1.id, tag=tag.id, method=slack.id)
TagNotificationSuccess.create(notification=n_2.id, tag=tag.id, method=slack.id)
return {
"repo_ref": repo_ref,
"image_expiry_event": image_expiry_event,
"slack": slack,
"n_1": n_1,
"n_2": n_2,
"tags": tags,
}
@pytest.fixture
def new_notification(initial_set):
return notification.create_repo_notification(
initial_set["repo_ref"],
initial_set["image_expiry_event"].name,
initial_set["slack"].name,
{"url": "http://example.com"},
{"days": 7},
title="Image(s) will expire in 7 days",
)
def test_tag_notifications_for_delete_repo_notification(initial_set):
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == len(initial_set["tags"])
notification.delete_repo_notification(namespace, repo, initial_set["n_1"].uuid)
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == 0
def test_delete_tag_notifications_for_notification(initial_set):
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == len(initial_set["tags"])
notification.delete_tag_notifications_for_notification(initial_set["n_1"].id)
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == 0
def test_delete_tag_notifications_for_tag(initial_set):
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.tag == initial_set["tags"][0].id)
.count()
)
assert tag_event_count == 2
notification.delete_tag_notifications_for_tag(initial_set["tags"][0])
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.tag == initial_set["tags"][0].id)
.count()
)
assert tag_event_count == 0
def test_fetch_tags_to_notify(initial_set, new_notification):
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == new_notification.id)
.count()
)
assert tag_event_count == 0
track_tags_to_notify(initial_set["tags"], new_notification)
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == new_notification.id)
.count()
)
assert tag_event_count == len(initial_set["tags"])
def test_fetch_notified_tag_ids_for_event(initial_set):
tag_ids = fetch_notified_tag_ids_for_event(initial_set["n_2"])
for tag in initial_set["tags"]:
assert tag.id in tag_ids
def test_fetch_active_notification(initial_set, new_notification):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
util.notification.SKIP_LOCKED = False
# causing the event to failure
for i in range(3):
notification.increment_notification_failure_count(new_notification.uuid)
# creating a `repo_push` event type
notification.create_repo_notification(
initial_set["repo_ref"],
"repo_push",
initial_set["slack"].name,
{"url": "http://example.com"},
{"days": 7},
title="Image(s) will expire in 7 days",
)
time_now = get_epoch_timestamp_ms()
event = fetch_active_notification(initial_set["image_expiry_event"])
assert event.id == initial_set["n_1"].id
event = (
RepositoryNotification.select()
.where(RepositoryNotification.id == initial_set["n_1"].id)
.get()
)
assert event.last_ran_ms >= time_now
def test_scan_for_image_expiry_notifications(initial_set):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
util.notification.SKIP_LOCKED = False
future_ms = (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp() * 1000
for tag in initial_set["tags"]:
set_tag_end_ms(tag, future_ms)
scan_for_image_expiry_notifications(initial_set["image_expiry_event"].name)
time.sleep(2)
job1 = notification_queue.get()
assert job1 is not None
job1 = json.loads(job1["body"])
for tag in initial_set["tags"]:
assert tag.name in job1["event_data"]["tags"]
job2 = notification_queue.get()
assert job2 is not None
job2 = json.loads(job2["body"])
for tag in initial_set["tags"]:
assert tag.name in job2["event_data"]["tags"]
def test_fetch_repo_tags_for_image_expiry_expiry_event(initial_set):
future_ms = (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp() * 1000
expected_tags = []
for tag in initial_set["tags"]:
if tag.name == "prod":
continue
set_tag_end_ms(tag, future_ms)
expected_tags.append(tag.id)
tags = fetch_repo_tags_for_image_expiry_expiry_event(
initial_set["repo_ref"].id, days=2, notified_tags=[]
)
assert len(tags) == len(expected_tags)
for tag in tags:
assert tag.id in expected_tags
def test_notifications_on_tag_expiry_update(initial_set):
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == len(initial_set["tags"])
for tag in initial_set["tags"]:
set_tag_end_ms(tag, get_epoch_timestamp_ms())
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == 0
def test_notifications_on_tag_delete(initial_set):
for tag in initial_set["tags"]:
delete_tag(initial_set["repo_ref"].id, tag.name)
tag_event_count = (
TagNotificationSuccess.select()
.where(TagNotificationSuccess.notification == initial_set["n_1"].id)
.count()
)
assert tag_event_count == 0

121
util/notification.py Normal file
View File

@ -0,0 +1,121 @@
import json
import logging
from data.database import (
ExternalNotificationEvent,
RepositoryNotification,
TagNotificationSuccess,
db_for_update,
get_epoch_timestamp_ms,
)
from data.model import db_transaction, oci
from data.model.user import get_namespace_by_user_id
from data.registry_model import registry_model
from notifications import spawn_notification
logger = logging.getLogger(__name__)
BATCH_SIZE = 10
# Define a constant for the SKIP_LOCKED flag for testing purposes,
# since we test with mysql 5.7 which does not support this flag.
SKIP_LOCKED = True
def fetch_active_notification(event, task_run_interval_ms=5 * 60 * 60 * 1000):
"""
task_run_interval_ms specifies how long a task must wait before being ran again.
"""
with db_transaction():
try:
# Fetch active notifications that match the event_name
query = (
RepositoryNotification.select(
RepositoryNotification.id,
RepositoryNotification.method,
RepositoryNotification.repository,
RepositoryNotification.event_config_json,
)
.where(
RepositoryNotification.event == event.id,
RepositoryNotification.number_of_failures < 3,
(
RepositoryNotification.last_ran_ms
< get_epoch_timestamp_ms() - task_run_interval_ms
)
| (RepositoryNotification.last_ran_ms.is_null(True)),
)
.order_by(RepositoryNotification.last_ran_ms.asc(nulls="first"))
)
notification = db_for_update(query, skip_locked=SKIP_LOCKED).get()
RepositoryNotification.update(last_ran_ms=get_epoch_timestamp_ms()).where(
RepositoryNotification.id == notification.id
).execute()
return notification
except RepositoryNotification.DoesNotExist:
return None
def track_tags_to_notify(tags, notification):
for tag in tags:
TagNotificationSuccess.create(
notification=notification.id, tag=tag.id, method=notification.method
)
def fetch_notified_tag_ids_for_event(notification):
response = (
TagNotificationSuccess.select(TagNotificationSuccess.tag)
.where(
TagNotificationSuccess.notification == notification.id,
TagNotificationSuccess.method == notification.method,
)
.distinct()
)
return [r.tag.id for r in response]
def scan_for_image_expiry_notifications(event_name, batch_size=BATCH_SIZE):
"""
Get the repository notification prioritized by last_ran_ms = None followed by asc order of last_ran_ms.
"""
event = ExternalNotificationEvent.get(ExternalNotificationEvent.name == event_name)
for _ in range(batch_size):
notification = fetch_active_notification(event)
if not notification:
return
repository = notification.repository
repo_id = repository.id
config = json.loads(notification.event_config_json)
if not config.get("days", None):
logger.error(
f"Missing key days in config for notification_id:{notification.id} created for repository_id:{repo_id}"
)
continue
# Fetch tags that were already notified
notified_tags = fetch_notified_tag_ids_for_event(notification)
# Fetch tags matching notification's config
tags = oci.tag.fetch_repo_tags_for_image_expiry_expiry_event(
repo_id, config["days"], notified_tags
)
if not len(tags):
continue
track_tags_to_notify(tags, notification)
namespace_name = get_namespace_by_user_id(repository.namespace_user)
repository_ref = registry_model.lookup_repository(namespace_name, repository.name)
# Push tags into queue notification worker queue
spawn_notification(
repository_ref,
event_name,
{"tags": [tag.name for tag in tags], "expiring_in": f"{config['days']} days"},
)
return

View File

@ -8,8 +8,10 @@ from data.database import Repository, RepositoryState, UseThenDisconnect
from data.model.gc import garbage_collect_repo
from data.model.repository import get_random_gc_policy
from data.registry_model import registry_model
from notifications.notificationevent import RepoImageExpiryEvent
from util.locking import GlobalLock, LockNotAcquiredException
from util.metrics.prometheus import gc_iterations
from util.notification import scan_for_image_expiry_notifications
from workers.gunicorn_worker import GunicornWorker
from workers.worker import Worker
@ -30,6 +32,13 @@ class GarbageCollectionWorker(Worker):
self.add_operation(
self._garbage_collection_repos, app.config.get("GARBAGE_COLLECTION_FREQUENCY", 30)
)
self.add_operation(
self._scan_notifications, app.config.get("GARBAGE_COLLECTION_FREQUENCY", 30)
)
def _scan_notifications(self):
# scan for tags that are expiring based on configured RepositoryNotifications
scan_for_image_expiry_notifications(event_name=RepoImageExpiryEvent.event_name())
def _garbage_collection_repos(self, skip_lock_for_testing=False):
"""
@ -49,10 +58,14 @@ class GarbageCollectionWorker(Worker):
assert features.GARBAGE_COLLECTION
try:
with GlobalLock(
with (
GlobalLock(
"REPO_GARBAGE_COLLECTION_%s" % repo_ref.id,
lock_ttl=REPOSITORY_GC_TIMEOUT + LOCK_TIMEOUT_PADDING,
) if not skip_lock_for_testing else empty_context():
)
if not skip_lock_for_testing
else empty_context()
):
try:
repository = Repository.get(id=repo_ref.id)
except Repository.DoesNotExist: