mirror of
https://github.com/quay/quay.git
synced 2026-01-29 08:42:15 +03:00
627 lines
23 KiB
Python
627 lines
23 KiB
Python
import json
|
|
import os
|
|
import random
|
|
import time
|
|
import uuid
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from app import storage
|
|
from data import model
|
|
from data.database import AutoPruneTaskStatus, ImageStorageLocation, Tag
|
|
from data.model.oci.manifest import get_or_create_manifest
|
|
from data.model.oci.tag import list_repository_tag_history, retarget_tag
|
|
from data.queue import WorkQueue
|
|
from digest.digest_tools import sha256_digest
|
|
from image.docker.schema2.manifest import DockerSchema2ManifestBuilder
|
|
from test.fixtures import *
|
|
from util.bytes import Bytes
|
|
from util.timedeltastring import convert_to_timedelta
|
|
from workers.autopruneworker import AutoPruneWorker
|
|
|
|
|
|
def _populate_blob(content, namespace, repo):
|
|
content = Bytes.for_string_or_unicode(content).as_encoded_str()
|
|
digest = str(sha256_digest(content))
|
|
location = ImageStorageLocation.get(name="local_us")
|
|
blob = model.blob.store_blob_record_and_temp_link(
|
|
namespace, repo.name, digest, location, len(content), 120
|
|
)
|
|
storage.put_content(["local_us"], model.storage.get_layer_path(blob), content)
|
|
return blob, digest
|
|
|
|
|
|
def _create_manifest(namespace, repo):
|
|
layer_json = json.dumps(
|
|
{
|
|
"id": "somelegacyid",
|
|
"config": {
|
|
"Labels": [],
|
|
},
|
|
"rootfs": {"type": "layers", "diff_ids": []},
|
|
"history": [
|
|
{
|
|
"created": "2018-04-03T18:37:09.284840891Z",
|
|
"created_by": "do something",
|
|
},
|
|
],
|
|
}
|
|
)
|
|
|
|
# Add a blob containing the config.
|
|
_, config_digest = _populate_blob(layer_json, namespace, repo)
|
|
|
|
v2_builder = DockerSchema2ManifestBuilder()
|
|
v2_builder.set_config_digest(config_digest, len(layer_json.encode("utf-8")))
|
|
v2_manifest = v2_builder.build()
|
|
|
|
return get_or_create_manifest(repo, v2_manifest, storage)
|
|
|
|
|
|
def _create_tag(repo, manifest, start=None):
|
|
name = "tag-%s" % str(uuid.uuid4())
|
|
now_ms = int(time.time() * 1000) if start is None else start
|
|
created = Tag.create(
|
|
name=name,
|
|
repository=repo,
|
|
lifetime_start_ms=now_ms,
|
|
lifetime_end_ms=None,
|
|
reversion=False,
|
|
manifest=manifest,
|
|
tag_kind=Tag.tag_kind.get_id("tag"),
|
|
)
|
|
return created
|
|
|
|
|
|
def _create_tags(repo, manifest, count, start_time_before=None):
|
|
for _ in range(count):
|
|
start_time = (
|
|
_past_timestamp_ms(start_time_before) - random.randint(0, 1000000)
|
|
if start_time_before is not None
|
|
else None
|
|
)
|
|
_create_tag(repo, manifest, start_time)
|
|
|
|
|
|
def _assert_repo_tag_count(repo, count, assert_start_after=None):
|
|
tags, _ = list_repository_tag_history(repo, 1, 100, active_tags_only=True)
|
|
assert len(tags) == count
|
|
if assert_start_after is not None:
|
|
for tag in tags:
|
|
assert tag.lifetime_start_ms > _past_timestamp_ms(assert_start_after)
|
|
|
|
|
|
def _past_timestamp_ms(time_delta):
|
|
return int(time.time() * 1000) - convert_to_timedelta(time_delta).total_seconds() * 1000
|
|
|
|
|
|
# Tests for namespace autoprune policy
|
|
def test_namespace_prune_multiple_repos_by_tag_count(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
new_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "number_of_tags", "value": 5}, create_task=True
|
|
)
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo3 = model.repository.create_repository(
|
|
"sellnsmall", "repo3", None, repo_kind="image", visibility="public"
|
|
)
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
manifest_repo3 = _create_manifest("sellnsmall", repo3)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 10)
|
|
_create_tags(repo2, manifest_repo2.manifest, 3)
|
|
_create_tags(repo3, manifest_repo3.manifest, 5)
|
|
|
|
_assert_repo_tag_count(repo1, 10)
|
|
_assert_repo_tag_count(repo2, 3)
|
|
_assert_repo_tag_count(repo3, 5)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 5)
|
|
_assert_repo_tag_count(repo2, 3)
|
|
_assert_repo_tag_count(repo3, 5)
|
|
|
|
task = model.autoprune.fetch_autoprune_task_by_namespace_id(new_policy.namespace_id)
|
|
assert task.status == "success"
|
|
|
|
|
|
def test_namespace_prune_multiple_repos_by_creation_date(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
new_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "creation_date", "value": "1w"}, create_task=True
|
|
)
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo3 = model.repository.create_repository(
|
|
"sellnsmall", "repo3", None, repo_kind="image", visibility="public"
|
|
)
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
manifest_repo3 = _create_manifest("sellnsmall", repo3)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 5)
|
|
_create_tags(repo1, manifest_repo1.manifest, 5, start_time_before="7d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 5, start_time_before="7d")
|
|
_create_tags(repo3, manifest_repo3.manifest, 10)
|
|
|
|
_assert_repo_tag_count(repo1, 10)
|
|
_assert_repo_tag_count(repo2, 5)
|
|
_assert_repo_tag_count(repo3, 10)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 5, assert_start_after="7d")
|
|
_assert_repo_tag_count(repo2, 0, assert_start_after="7d")
|
|
_assert_repo_tag_count(repo3, 10, assert_start_after="7d")
|
|
|
|
task = model.autoprune.fetch_autoprune_task_by_namespace_id(new_policy.namespace_id)
|
|
assert task.status == "success"
|
|
|
|
|
|
def test_delete_autoprune_task_if_no_namespace_policy_exists(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
org = model.organization.get_organization("sellnsmall")
|
|
model.autoprune.create_autoprune_task(org.id)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
assert not model.autoprune.namespace_has_autoprune_task(org.id)
|
|
assert not model.autoprune.namespace_has_autoprune_policy(org.id)
|
|
|
|
|
|
def test_fetch_tasks_in_correct_order(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
# Start with an empty table
|
|
for task in AutoPruneTaskStatus.select():
|
|
model.autoprune.delete_autoprune_task(task)
|
|
|
|
sellnsmall = model.organization.get_organization("sellnsmall")
|
|
buynlarge = model.organization.get_organization("buynlarge")
|
|
library = model.organization.get_organization("library")
|
|
devtable = model.user.get_user("devtable")
|
|
freshuser = model.user.get_user("freshuser")
|
|
randomuser = model.user.get_user("randomuser")
|
|
|
|
queue = WorkQueue("testgcnamespace", lambda db: db.transaction())
|
|
model.user.mark_namespace_for_deletion(library, [], queue)
|
|
model.user.mark_namespace_for_deletion(randomuser, [], queue)
|
|
|
|
AutoPruneTaskStatus.create(namespace=sellnsmall, status="queued", last_ran_ms=None)
|
|
AutoPruneTaskStatus.create(
|
|
namespace=buynlarge, status="queued", last_ran_ms=_past_timestamp_ms("7d")
|
|
)
|
|
AutoPruneTaskStatus.create(
|
|
namespace=devtable, status="queued", last_ran_ms=_past_timestamp_ms("5w")
|
|
)
|
|
AutoPruneTaskStatus.create(
|
|
namespace=freshuser, status="queued", last_ran_ms=_past_timestamp_ms("2w")
|
|
)
|
|
AutoPruneTaskStatus.create(
|
|
namespace=library, status="queued", last_ran_ms=_past_timestamp_ms("7d")
|
|
)
|
|
AutoPruneTaskStatus.create(namespace=randomuser, status="queued", last_ran_ms=None)
|
|
|
|
expected_calls = [buynlarge.id, freshuser.id, devtable.id, sellnsmall.id]
|
|
should_never_be_called = [library.id, randomuser.id]
|
|
with patch(
|
|
"workers.autopruneworker.get_namespace_autoprune_policies_by_id", MagicMock()
|
|
) as mock_get_policies:
|
|
|
|
def assert_mock_get_policies(namespace_id):
|
|
assert namespace_id not in should_never_be_called
|
|
expected_namespace_id = expected_calls.pop()
|
|
assert namespace_id == expected_namespace_id
|
|
|
|
mock_get_policies.side_effect = assert_mock_get_policies
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
assert len(expected_calls) == 0
|
|
|
|
|
|
# Tests for repository autoprune policy
|
|
def test_repository_prune_multiple_repos_by_tag_count(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo2 = model.repository.create_repository(
|
|
"buynlarge", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
new_repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 5}, create_task=True
|
|
)
|
|
new_repo2_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"buynlarge", "repo2", {"method": "number_of_tags", "value": 4}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("buynlarge", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 10)
|
|
_create_tags(repo2, manifest_repo2.manifest, 3)
|
|
|
|
_assert_repo_tag_count(repo1, 10)
|
|
_assert_repo_tag_count(repo2, 3)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 5)
|
|
_assert_repo_tag_count(repo2, 3)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo1_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo2_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
|
|
def test_repository_prune_multiple_repos_by_creation_date(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo2 = model.repository.create_repository(
|
|
"buynlarge", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo3 = model.repository.create_repository(
|
|
"library", "repo3", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
new_repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "creation_date", "value": "1w"}, create_task=True
|
|
)
|
|
new_repo2_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"buynlarge", "repo2", {"method": "creation_date", "value": "5d"}, create_task=True
|
|
)
|
|
new_repo3_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"library", "repo3", {"method": "creation_date", "value": "24h"}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("buynlarge", repo2)
|
|
manifest_repo3 = _create_manifest("library", repo3)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 5)
|
|
_create_tags(repo1, manifest_repo1.manifest, 5, start_time_before="7d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3, start_time_before="3d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 5, start_time_before="5d")
|
|
_create_tags(repo3, manifest_repo3.manifest, 10)
|
|
_create_tags(repo3, manifest_repo3.manifest, 5, start_time_before="24h")
|
|
|
|
_assert_repo_tag_count(repo1, 10)
|
|
_assert_repo_tag_count(repo2, 8)
|
|
_assert_repo_tag_count(repo3, 15)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 5, assert_start_after="7d")
|
|
_assert_repo_tag_count(repo2, 3, assert_start_after="5d")
|
|
_assert_repo_tag_count(repo3, 10, assert_start_after="24h")
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo1_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo2_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
task3 = model.autoprune.fetch_autoprune_task_by_namespace_id(new_repo3_policy.namespace_id)
|
|
assert task3.status == "success"
|
|
|
|
|
|
def test_delete_autoprune_task_if_no_repository_policy_exists(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
org = model.organization.get_organization("sellnsmall")
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
model.autoprune.create_autoprune_task(org.id)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
assert not model.autoprune.namespace_has_autoprune_task(org.id)
|
|
assert not model.autoprune.repository_has_autoprune_policy(repo1.id)
|
|
|
|
|
|
def test_nspolicy_tagcount_less_than_repopolicy_tagcount(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
ns_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "number_of_tags", "value": 2}, create_task=True
|
|
)
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 4}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 5)
|
|
_create_tags(repo2, manifest_repo2.manifest, 8)
|
|
|
|
_assert_repo_tag_count(repo1, 5)
|
|
_assert_repo_tag_count(repo2, 8)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 2)
|
|
_assert_repo_tag_count(repo2, 2)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
|
|
def test_repopolicy_tagcount_less_than_nspolicy_tagcount(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
ns_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "number_of_tags", "value": 4}, create_task=True
|
|
)
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 2}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 5)
|
|
_create_tags(repo2, manifest_repo2.manifest, 8)
|
|
|
|
_assert_repo_tag_count(repo1, 5)
|
|
_assert_repo_tag_count(repo2, 8)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 2)
|
|
_assert_repo_tag_count(repo2, 4)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
|
|
def test_nspolicy_timespan_older_than_repopolicy_timespan(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
ns_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "creation_date", "value": "5d"}, create_task=True
|
|
)
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "creation_date", "value": "2d"}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 3)
|
|
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="5d")
|
|
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="2d")
|
|
_create_tags(repo1, manifest_repo1.manifest, 4, start_time_before="1d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3)
|
|
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="5d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3, start_time_before="2d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 4, start_time_before="1d")
|
|
|
|
_assert_repo_tag_count(repo1, 12)
|
|
_assert_repo_tag_count(repo2, 12)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 7)
|
|
_assert_repo_tag_count(repo2, 10)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
|
|
def test_repopolicy_timespan_older_than_nspolicy_timespan(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
ns_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "creation_date", "value": "2d"}, create_task=True
|
|
)
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "creation_date", "value": "5d"}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 3)
|
|
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="5d")
|
|
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="2d")
|
|
_create_tags(repo1, manifest_repo1.manifest, 4, start_time_before="1d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3)
|
|
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="5d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3, start_time_before="2d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 4, start_time_before="1d")
|
|
|
|
_assert_repo_tag_count(repo1, 12)
|
|
_assert_repo_tag_count(repo2, 12)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 7)
|
|
_assert_repo_tag_count(repo2, 7)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
|
|
def test_nspolicy_tagcount_repopolicy_creation_date_reconcile(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
ns_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "number_of_tags", "value": 6}, create_task=True
|
|
)
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "creation_date", "value": "3d"}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 1)
|
|
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="3d")
|
|
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="1d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3)
|
|
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="3d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="1d")
|
|
|
|
_assert_repo_tag_count(repo1, 5)
|
|
_assert_repo_tag_count(repo2, 7)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 3)
|
|
_assert_repo_tag_count(repo2, 6)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
|
|
assert task2.status == "success"
|
|
|
|
|
|
def test_nspolicy_creation_date_repopolicy_tagcount_reconcile(initialized_db):
|
|
if "mysql+pymysql" in os.environ.get("TEST_DATABASE_URI", ""):
|
|
model.autoprune.SKIP_LOCKED = False
|
|
|
|
ns_policy = model.autoprune.create_namespace_autoprune_policy(
|
|
"sellnsmall", {"method": "creation_date", "value": "3d"}, create_task=True
|
|
)
|
|
|
|
repo1 = model.repository.create_repository(
|
|
"sellnsmall", "repo1", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo2 = model.repository.create_repository(
|
|
"sellnsmall", "repo2", None, repo_kind="image", visibility="public"
|
|
)
|
|
|
|
repo1_policy = model.autoprune.create_repository_autoprune_policy(
|
|
"sellnsmall", "repo1", {"method": "number_of_tags", "value": 6}, create_task=True
|
|
)
|
|
|
|
manifest_repo1 = _create_manifest("sellnsmall", repo1)
|
|
manifest_repo2 = _create_manifest("sellnsmall", repo2)
|
|
|
|
_create_tags(repo1, manifest_repo1.manifest, 2)
|
|
_create_tags(repo1, manifest_repo1.manifest, 2, start_time_before="3d")
|
|
_create_tags(repo1, manifest_repo1.manifest, 3, start_time_before="1d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 3)
|
|
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="3d")
|
|
_create_tags(repo2, manifest_repo2.manifest, 2, start_time_before="1d")
|
|
|
|
_assert_repo_tag_count(repo1, 7)
|
|
_assert_repo_tag_count(repo2, 7)
|
|
|
|
worker = AutoPruneWorker()
|
|
worker.prune()
|
|
|
|
_assert_repo_tag_count(repo1, 5)
|
|
_assert_repo_tag_count(repo2, 5)
|
|
|
|
task1 = model.autoprune.fetch_autoprune_task_by_namespace_id(ns_policy.namespace_id)
|
|
assert task1.status == "success"
|
|
|
|
task2 = model.autoprune.fetch_autoprune_task_by_namespace_id(repo1_policy.namespace_id)
|
|
assert task2.status == "success"
|