1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/workers/test/test_autopruneworker.py
Sunandadadi 296b5f35c3 notifications: fetch autoprune tags with multiple policies for image expiry notification (PROJQUAY-8117) (#3340)
* notifications: fetch autoprune tags with multiple policies for image expiry notification(PROJQUAY-8117)

* don't fetch notifications if tags expiry is greater than notification days + add tests
2024-10-18 16:17:07 -04:00

1160 lines
40 KiB
Python

import json
import os
import random
import time
import uuid
from unittest.mock import MagicMock, patch
import pytest
from app import storage
from data import model
from data.database import AutoPruneTaskStatus, ImageStorageLocation, Tag
from data.model.oci.manifest import get_or_create_manifest
from data.model.oci.tag import (
get_tag,
list_alive_tags,
list_repository_tag_history,
retarget_tag,
)
from data.model.user import get_active_namespaces
from data.queue import WorkQueue
from digest.digest_tools import sha256_digest
from image.docker.schema2.manifest import DockerSchema2ManifestBuilder
from test.fixtures import *
from util.bytes import Bytes
from util.timedeltastring import convert_to_timedelta
from workers.autopruneworker import AutoPruneWorker
def _populate_blob(content, namespace, repo):
content = Bytes.for_string_or_unicode(content).as_encoded_str()
digest = str(sha256_digest(content))
location = ImageStorageLocation.get(name="local_us")
blob = model.blob.store_blob_record_and_temp_link(
namespace, repo.name, digest, location, len(content), 120
)
storage.put_content(["local_us"], model.storage.get_layer_path(blob), content)
return blob, digest
def create_manifest(namespace, repo):
layer_json = json.dumps(
{
"id": "somelegacyid",
"config": {
"Labels": [],
},
"rootfs": {"type": "layers", "diff_ids": []},
"history": [
{
"created": "2018-04-03T18:37:09.284840891Z",
"created_by": "do something",
},
],
}
)
# Add a blob containing the config.
_, config_digest = _populate_blob(layer_json, namespace, repo)
v2_builder = DockerSchema2ManifestBuilder()
v2_builder.set_config_digest(config_digest, len(layer_json.encode("utf-8")))
v2_manifest = v2_builder.build()
return get_or_create_manifest(repo, v2_manifest, storage)
def create_tag(repo, manifest, start=None, name=None):
name = "tag-%s" % str(uuid.uuid4()) if name is None else name
now_ms = int(time.time() * 1000) if start is None else start
created = Tag.create(
name=name,
repository=repo,
lifetime_start_ms=now_ms,
lifetime_end_ms=None,
reversion=False,
manifest=manifest,
tag_kind=Tag.tag_kind.get_id("tag"),
)
return created
def _create_tags(repo, manifest, count, start_time_before=None):
for _ in range(count):
start_time = (
_past_timestamp_ms(start_time_before) - random.randint(0, 1000000)
if start_time_before is not None
else None
)
create_tag(repo, manifest, start_time)
def _assert_repo_tag_count(repo, count, assert_start_after=None):
tags, _ = list_repository_tag_history(repo, 1, 100, active_tags_only=True)
assert len(tags) == count
if assert_start_after is not None:
for tag in tags:
assert tag.lifetime_start_ms > _past_timestamp_ms(assert_start_after)
def _past_timestamp_ms(time_delta):
return int(time.time() * 1000) - convert_to_timedelta(time_delta).total_seconds() * 1000
# Tests for namespace autoprune policy
def test_namespace_prune_multiple_repos_by_tag_count(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
new_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "number_of_tags", "value": 5}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo3 = model.repository.create_repository(
"sellnsmall", "repo3", None, repo_kind="image", visibility="public"
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
manifest_repo3 = create_manifest("sellnsmall", repo3)
_create_tags(repo1, manifest_repo1.manifest, 10)
_create_tags(repo2, manifest_repo2.manifest, 3)
_create_tags(repo3, manifest_repo3.manifest, 5)
_assert_repo_tag_count(repo1, 10)
_assert_repo_tag_count(repo2, 3)
_assert_repo_tag_count(repo3, 5)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 3)
_assert_repo_tag_count(repo3, 5)
task = model.autoprune.fetch_autoprune_task_by_namespace_id(new_policy.namespace_id)
assert task.status == "success"
def test_namespace_prune_multiple_repos_by_creation_date(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
new_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "creation_date", "value": "1w"}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo3 = model.repository.create_repository(
"sellnsmall", "repo3", None, repo_kind="image", visibility="public"
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
manifest_repo3 = create_manifest("sellnsmall", repo3)
_create_tags(repo1, manifest_repo1.manifest, 5)
_create_tags(repo1, manifest_repo1.manifest, 5, start_time_before="7d")
_create_tags(repo2, manifest_repo2.manifest, 5, start_time_before="7d")
_create_tags(repo3, manifest_repo3.manifest, 10)
_assert_repo_tag_count(repo1, 10)
_assert_repo_tag_count(repo2, 5)
_assert_repo_tag_count(repo3, 10)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 5, assert_start_after="7d")
_assert_repo_tag_count(repo2, 0, assert_start_after="7d")
_assert_repo_tag_count(repo3, 10, assert_start_after="7d")
task = model.autoprune.fetch_autoprune_task_by_namespace_id(new_policy.namespace_id)
assert task.status == "success"
def test_delete_autoprune_task_if_no_namespace_policy_exists(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
org = model.organization.get_organization("sellnsmall")
model.autoprune.create_autoprune_task(org.id)
worker = AutoPruneWorker()
worker.prune()
assert not model.autoprune.namespace_has_autoprune_task(org.id)
assert not model.autoprune.namespace_has_autoprune_policy(org.id)
def test_fetch_tasks_in_correct_order(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
# Start with an empty table
for task in AutoPruneTaskStatus.select():
model.autoprune.delete_autoprune_task(task)
sellnsmall = model.organization.get_organization("sellnsmall")
buynlarge = model.organization.get_organization("buynlarge")
library = model.organization.get_organization("library")
devtable = model.user.get_user("devtable")
freshuser = model.user.get_user("freshuser")
randomuser = model.user.get_user("randomuser")
queue = WorkQueue("testgcnamespace", lambda db: db.transaction())
model.user.mark_namespace_for_deletion(library, [], queue)
model.user.mark_namespace_for_deletion(randomuser, [], queue)
AutoPruneTaskStatus.create(namespace=sellnsmall, status="queued", last_ran_ms=None)
AutoPruneTaskStatus.create(
namespace=buynlarge, status="queued", last_ran_ms=_past_timestamp_ms("7d")
)
AutoPruneTaskStatus.create(
namespace=devtable, status="queued", last_ran_ms=_past_timestamp_ms("5w")
)
AutoPruneTaskStatus.create(
namespace=freshuser, status="queued", last_ran_ms=_past_timestamp_ms("2w")
)
AutoPruneTaskStatus.create(
namespace=library, status="queued", last_ran_ms=_past_timestamp_ms("7d")
)
AutoPruneTaskStatus.create(namespace=randomuser, status="queued", last_ran_ms=None)
expected_calls = [buynlarge.id, freshuser.id, devtable.id, sellnsmall.id]
should_never_be_called = [library.id, randomuser.id]
with patch(
"workers.autopruneworker.get_namespace_autoprune_policies_by_id", MagicMock()
) as mock_get_policies:
def assert_mock_get_policies(namespace_id):
assert namespace_id not in should_never_be_called
expected_namespace_id = expected_calls.pop()
assert namespace_id == expected_namespace_id
mock_get_policies.side_effect = assert_mock_get_policies
worker = AutoPruneWorker()
worker.prune()
assert len(expected_calls) == 0
# Tests for repository autoprune policy
def test_repository_prune_multiple_repos_by_tag_count(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"buynlarge", "repo2", None, repo_kind="image", visibility="public"
)
new_repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 5}, create_task=True
)
new_repo2_policy = model.autoprune.create_repository_autoprune_policy(
"buynlarge", "repo2", {"method": "number_of_tags", "value": 4}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("buynlarge", repo2)
_create_tags(repo1, manifest_repo1.manifest, 10)
_create_tags(repo2, manifest_repo2.manifest, 3)
_assert_repo_tag_count(repo1, 10)
_assert_repo_tag_count(repo2, 3)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 3)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo1_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo2_policy.namespace_id)
assert task2.status == "success"
def test_repository_prune_multiple_repos_by_creation_date(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"buynlarge", "repo2", None, repo_kind="image", visibility="public"
)
repo3 = model.repository.create_repository(
"library", "repo3", None, repo_kind="image", visibility="public"
)
new_repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "creation_date", "value": "1w"}, create_task=True
)
new_repo2_policy = model.autoprune.create_repository_autoprune_policy(
"buynlarge", "repo2", {"method": "creation_date", "value": "5d"}, create_task=True
)
new_repo3_policy = model.autoprune.create_repository_autoprune_policy(
"library", "repo3", {"method": "creation_date", "value": "24h"}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("buynlarge", repo2)
manifest_repo3 = create_manifest("library", repo3)
_create_tags(repo1, manifest_repo1.manifest, 5)
_create_tags(repo1, manifest_repo1.manifest, 5, start_time_before="7d")
_create_tags(repo2, manifest_repo2.manifest, 3, start_time_before="3d")
_create_tags(repo2, manifest_repo2.manifest, 5, start_time_before="5d")
_create_tags(repo3, manifest_repo3.manifest, 10)
_create_tags(repo3, manifest_repo3.manifest, 5, start_time_before="24h")
_assert_repo_tag_count(repo1, 10)
_assert_repo_tag_count(repo2, 8)
_assert_repo_tag_count(repo3, 15)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 5, assert_start_after="7d")
_assert_repo_tag_count(repo2, 3, assert_start_after="5d")
_assert_repo_tag_count(repo3, 10, assert_start_after="24h")
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo1_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo2_policy.namespace_id)
assert task2.status == "success"
task3 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo3_policy.namespace_id)
assert task3.status == "success"
def test_delete_autoprune_task_if_no_repository_policy_exists(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
org = model.organization.get_organization("sellnsmall")
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
model.autoprune.create_autoprune_task(org.id)
worker = AutoPruneWorker()
worker.prune()
assert not model.autoprune.namespace_has_autoprune_task(org.id)
assert not model.autoprune.repository_has_autoprune_policy(repo1.id)
def test_nspolicy_tagcount_less_than_repopolicy_tagcount(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "number_of_tags", "value": 2}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 4}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 5)
_create_tags(repo2, manifest_repo2.manifest, 8)
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 8)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 2)
_assert_repo_tag_count(repo2, 2)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
assert task2.status == "success"
def test_repopolicy_tagcount_less_than_nspolicy_tagcount(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "number_of_tags", "value": 4}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 2}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 5)
_create_tags(repo2, manifest_repo2.manifest, 8)
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 8)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 2)
_assert_repo_tag_count(repo2, 4)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo_policy.namespace_id)
assert task2.status == "success"
def test_nspolicy_timespan_older_than_repopolicy_timespan(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "creation_date", "value": "5d"}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "creation_date", "value": "2d"}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 3)
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="5d")
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="2d")
_create_tags(repo1, manifest_repo1.manifest, 4, start_time_before="1d")
_create_tags(repo2, manifest_repo2.manifest, 3)
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="5d")
_create_tags(repo2, manifest_repo2.manifest, 3, start_time_before="2d")
_create_tags(repo2, manifest_repo2.manifest, 4, start_time_before="1d")
_assert_repo_tag_count(repo1, 12)
_assert_repo_tag_count(repo2, 12)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 7)
_assert_repo_tag_count(repo2, 10)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
assert task2.status == "success"
def test_repopolicy_timespan_older_than_nspolicy_timespan(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "creation_date", "value": "2d"}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "creation_date", "value": "5d"}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 3)
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="5d")
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="2d")
_create_tags(repo1, manifest_repo1.manifest, 4, start_time_before="1d")
_create_tags(repo2, manifest_repo2.manifest, 3)
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="5d")
_create_tags(repo2, manifest_repo2.manifest, 3, start_time_before="2d")
_create_tags(repo2, manifest_repo2.manifest, 4, start_time_before="1d")
_assert_repo_tag_count(repo1, 12)
_assert_repo_tag_count(repo2, 12)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 7)
_assert_repo_tag_count(repo2, 7)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
assert task2.status == "success"
def test_nspolicy_tagcount_repopolicy_creation_date_reconcile(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "number_of_tags", "value": 6}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "creation_date", "value": "3d"}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 1)
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="3d")
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="1d")
_create_tags(repo2, manifest_repo2.manifest, 3)
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="3d")
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="1d")
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 7)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 3)
_assert_repo_tag_count(repo2, 6)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
assert task2.status == "success"
def test_nspolicy_creation_date_repopolicy_tagcount_reconcile(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "creation_date", "value": "3d"}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
)
repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 6}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 2)
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="3d")
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="1d")
_create_tags(repo2, manifest_repo2.manifest, 3)
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="3d")
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="1d")
_assert_repo_tag_count(repo1, 7)
_assert_repo_tag_count(repo2, 7)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 5)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
assert task2.status == "success"
def test_registry_prune(initialized_db):
namespaces = [namespace.username for namespace in get_active_namespaces()]
assert len(namespaces) > 5 # 5 is arbitrary, just need more than 2
mock_config = {
"DEFAULT_NAMESPACE_AUTOPRUNE_POLICY": {"method": "number_of_tags", "value": 10},
"DEFAULT_POLICY_FETCH_NAMESPACES_LIMIT": 2,
}
with patch("workers.autopruneworker.app.config", mock_config):
with patch(
"workers.autopruneworker.execute_namespace_policies", MagicMock()
) as mock_execute_namespace_policies:
def assert_mock_execute_namespace_policies(
policies, namespace, repo_page_limit, tag_page_limit, include_repo_policies
):
assert len(policies) == 1
assert policies[0].config == mock_config["DEFAULT_NAMESPACE_AUTOPRUNE_POLICY"]
assert include_repo_policies is False
assert namespace.username in namespaces
namespaces.remove(namespace.username)
mock_execute_namespace_policies.side_effect = assert_mock_execute_namespace_policies
worker = AutoPruneWorker()
worker.prune_registry(skip_lock_for_testing=True)
assert len(namespaces) == 0
def test_registry_prune_no_default_policy(initialized_db):
worker = AutoPruneWorker()
assert len(worker._operations) == 1
def test_registry_prune_invalid_policy(initialized_db):
mock_config = {
"DEFAULT_NAMESPACE_AUTOPRUNE_POLICY": {"method": "doesnotexist", "value": "doesnotexist"}
}
with patch("workers.autopruneworker.app.config", mock_config):
errored = False
try:
worker = AutoPruneWorker()
worker.prune_registry(skip_lock_for_testing=True)
except model.InvalidNamespaceAutoPrunePolicy as ex:
errored = True
assert errored
@pytest.mark.parametrize(
"tags, expected, matches",
[
(
["test1", "test2", "test3", "test4", "test5"],
["test1", "test2", "test3", "test4", "test5"],
True,
),
(
["match1", "match2", "test1", "test2", "test3", "test4"],
["match1", "match2", "test1", "test2", "test3", "test4"],
True,
),
(
["match1", "match2", "match3", "test1", "test2", "test3", "test4"],
["match1", "match2", "match3", "test1", "test2", "test3", "test4"],
True,
),
(
["match1", "match2", "match3", "match4", "test1", "test2", "test3", "test4"],
["match1", "match2", "match3", "test1", "test2", "test3", "test4"],
True,
),
(
[
"match1",
"match2",
"test1",
"test2",
"test3",
"test4",
"match3",
"match4",
],
["match1", "match2", "test1", "test2", "test3", "test4", "match3"],
True,
),
(
["match1", "match2", "match3", "test1", "test2", "test3", "test4", "match4", "match5"],
["match1", "match2", "match3", "test1", "test2", "test3", "test4"],
True,
),
(
[
"match1",
"test1",
"match2",
"test2",
"match3",
"test3",
"match4",
"test4",
"match5",
"test5",
],
["match1", "test1", "match2", "test2", "match3", "test3", "test4", "test5"],
True,
),
(
[
"match1",
"test1",
"match2",
"test2",
"match3",
"test3",
"match4",
"test4",
"match5",
"test5",
"match6",
"match7",
"match8",
],
["match1", "test1", "match2", "test2", "match3", "test3", "test4", "test5"],
True,
),
(
["test1", "test2", "test3", "test4", "test5"],
["test1", "test2", "test3"],
False,
),
(
["test1", "match1", "test2", "match2", "test3", "match3", "test4", "test5", "match4"],
["test1", "match1", "test2", "match2", "test3", "match3", "match4"],
False,
),
(
["match1", "match2", "match3", "match4", "match5", "test1", "test2", "test3", "test4"],
["match1", "match2", "match3", "match4", "match5", "test1", "test2", "test3"],
False,
),
(
["match1", "match2", "test1", "test2", "test3"],
["match1", "match2", "test1", "test2", "test3"],
False,
),
(
["match1", "match2", "match3", "test1", "test2"],
["match1", "match2", "match3", "test1", "test2"],
False,
),
(
["match1", "match2", "match3", "match4", "test1"],
["match1", "match2", "match3", "match4", "test1"],
False,
),
(
["match1", "match2", "match3", "match4", "match5"],
["match1", "match2", "match3", "match4", "match5"],
False,
),
(
["test1", "test2", "test3", "test4", "test5"],
["test1", "test2", "test3"],
False,
),
(
[
"match1",
"test1",
"match2",
"test2",
"match3",
"test3",
"match4",
"test4",
"match5",
"test5",
],
["match1", "test1", "match2", "test2", "match3", "test3", "match4", "match5"],
False,
),
],
)
def test_prune_by_tag_count_with_tag_filter(tags, expected, matches, initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
new_repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall",
"repo1",
{
"method": "number_of_tags",
"value": 3,
"tag_pattern": "match.*",
"tag_pattern_matches": matches,
},
create_task=True,
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
now_ms = int(time.time() * 1000)
for i, tag in enumerate(tags):
creation_time = now_ms - i
create_tag(repo1, manifest_repo1.manifest, start=creation_time, name=tag)
_assert_repo_tag_count(repo1, len(tags))
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, len(expected))
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo1_policy.namespace_id)
assert task1.status == "success"
for tag in list_alive_tags(repo1):
assert tag.name in expected
expected.remove(tag.name)
assert len(expected) == 0
@pytest.mark.parametrize(
"tags, expected, matches",
[
(
["test1", "test2", "test3", "test4", "test5"],
["test1", "test2", "test3", "test4", "test5"],
True,
),
(["match1", "match2", "test1", "test2", "test3"], ["test1", "test2", "test3"], True),
(["match1", "match2", "match3", "test1", "test2"], ["test1", "test2"], True),
(["match1", "match2", "match3", "match4", "test1"], ["match4", "test1"], True),
(["match1", "match2", "match3", "match4", "match5"], ["match4", "match5"], True),
(
["test1", "test2", "test3", "test4", "test5"],
["test4", "test5"],
False,
),
(["match1", "test1", "test2", "test3", "test4"], ["match1", "test3", "test4"], False),
(
["match1", "match2", "test1", "test2", "test3"],
["match1", "match2", "test2", "test3"],
False,
),
(
["match1", "match2", "match3", "test1", "test2"],
["match1", "match2", "match3", "test1", "test2"],
False,
),
(
["match1", "match2", "match3", "match4", "test1"],
["match1", "match2", "match3", "match4", "test1"],
False,
),
(
["match1", "match2", "match3", "match4", "match5"],
["match1", "match2", "match3", "match4", "match5"],
False,
),
],
)
def test_prune_by_creation_date_with_tag_filter(tags, expected, matches, initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
new_repo1_policy = model.autoprune.create_repository_autoprune_policy(
"sellnsmall",
"repo1",
{
"method": "creation_date",
"value": "5d",
"tag_pattern": "match.*",
"tag_pattern_matches": matches,
},
create_task=True,
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
for i, tag in enumerate(tags):
# Set the first 3 tags to be old enough to be pruned
# We do the -1 to ensure that the creation time is less than the current time
creation_time = _past_timestamp_ms("5d") - 1 if i < 3 else None
create_tag(repo1, manifest_repo1.manifest, start=creation_time, name=tag)
_assert_repo_tag_count(repo1, 5)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, len(expected))
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo1_policy.namespace_id)
assert task1.status == "success"
for tag in list_alive_tags(repo1):
assert tag.name in expected
expected.remove(tag.name)
assert len(expected) == 0
def test_multiple_policies_for_namespace(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
ns_policy1 = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall",
{
"method": "creation_date",
"value": "3d",
"tag_pattern": ".*",
"tag_pattern_matches": True,
},
create_task=True,
)
ns_policy2 = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "number_of_tags", "value": 5}, create_task=True
)
ns_policy3 = model.autoprune.create_namespace_autoprune_policy(
"sellnsmall", {"method": "creation_date", "value": "2d"}, create_task=True
)
repo1 = model.repository.create_repository(
"sellnsmall", "latest", None, repo_kind="image", visibility="public"
)
repo2 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
manifest_repo2 = create_manifest("sellnsmall", repo2)
_create_tags(repo1, manifest_repo1.manifest, 2)
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="4d")
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="1d")
_create_tags(repo2, manifest_repo2.manifest, 3)
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="4d")
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="1d")
_assert_repo_tag_count(repo1, 7)
_assert_repo_tag_count(repo2, 7)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 5)
_assert_repo_tag_count(repo2, 5)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy1.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy2.namespace_id)
assert task2.status == "success"
task3 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy3.namespace_id)
assert task3.status == "success"
def test_multiple_policies_for_repository(initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
repo1_policy1 = model.autoprune.create_repository_autoprune_policy(
"sellnsmall",
"repo1",
{
"method": "creation_date",
"value": "3d",
"tag_pattern": ".*",
"tag_pattern_matches": True,
},
create_task=True,
)
repo1_policy2 = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 3}, create_task=True
)
repo1_policy3 = model.autoprune.create_repository_autoprune_policy(
"sellnsmall", "repo1", {"method": "creation_date", "value": "2d"}, create_task=True
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
_create_tags(repo1, manifest_repo1.manifest, 2)
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="4d")
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="1d")
_assert_repo_tag_count(repo1, 7)
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, 3)
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy1.namespace_id)
assert task1.status == "success"
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy2.namespace_id)
assert task2.status == "success"
task3 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy3.namespace_id)
assert task3.status == "success"
@pytest.mark.parametrize(
"tags, expected, plcy1, plcy2",
[
(
["test1", "test2", "check1", "test3", "check2"],
["test1", "test2", "test3", "check2"],
["creation_date", "5m", "^c", True],
["number_of_tags", 4, None, None],
),
(
["test1", "test2", "check1", "test3", "check2"],
["test1", "test2", "test3", "check2"],
["number_of_tags", 4, None, None],
["creation_date", "5m", "^c", True],
),
(
["test1", "test2", "test3", "test4", "test5"],
["test4", "test5"],
["creation_date", "5m", None, None],
["number_of_tags", 4, None, None],
),
(
["test1", "test2", "test3", "test4", "test5"],
["test4", "test5"],
["number_of_tags", 4, None, None],
["creation_date", "5m", None, None],
),
(
["test1", "test2", "test3", "test4", "test5"],
["test4", "test5"],
["number_of_tags", 2, None, None],
["number_of_tags", 3, None, None],
),
(
["test1", "test2", "test3", "test4", "test5"],
["test4", "test5"],
["number_of_tags", 3, None, None],
["number_of_tags", 2, None, None],
),
(
["test1", "test2", "check1", "test3", "check2"],
["check1", "test3", "check2"],
["number_of_tags", 4, None, None],
["creation_date", "5m", "^c", False],
),
(
["test1", "test2", "check1", "test3", "check2"],
["check1", "test3", "check2"],
["creation_date", "5m", "^c", False],
["number_of_tags", 4, None, None],
),
],
)
def test_policy_order(tags, expected, plcy1, plcy2, initialized_db):
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
model.autoprune.SKIP_LOCKED = False
repo1 = model.repository.create_repository(
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
)
policy1 = model.autoprune.create_repository_autoprune_policy(
"sellnsmall",
"repo1",
{
"method": plcy1[0],
"value": plcy1[1],
"tag_pattern": plcy1[2],
"tag_pattern_matches": plcy1[3],
},
create_task=True,
)
model.autoprune.create_repository_autoprune_policy(
"sellnsmall",
"repo1",
{
"method": plcy2[0],
"value": plcy2[1],
"tag_pattern": plcy2[2],
"tag_pattern_matches": plcy2[3],
},
)
manifest_repo1 = create_manifest("sellnsmall", repo1)
for i, tag in enumerate(tags):
# Set the first 3 tags to be old enough to be pruned
# We do the -1 to ensure that the creation time is less than the current time
creation_time = _past_timestamp_ms("5m") - 1 if i < 3 else None
create_tag(repo1, manifest_repo1.manifest, start=creation_time, name=tag)
_assert_repo_tag_count(repo1, len(tags))
worker = AutoPruneWorker()
worker.prune()
_assert_repo_tag_count(repo1, len(expected))
task = model.autoprune.fetch_autoprune_task_by_namespace_id(policy1.namespace_id)
assert task.status == "success"
for tag in list_alive_tags(repo1):
assert tag.name in expected
expected.remove(tag.name)
assert len(expected) == 0