mirror of
https://github.com/quay/quay.git
synced 2026-01-27 18:42:52 +03:00
When a tag is deleted and re-pushed, pull statistics now start fresh at 0 instead of persisting from the deleted tag. Changes: - Clear TagPullStatistics in _delete_tag() - Clear TagPullStatistics in remove_tag_from_timemachine() - Add tests for tag deletion clearing pull statistics - Add test for re-push scenario starting with fresh stats 🤖 Generated with [Claude Code](https://claude.com/claude-code) Signed-off-by: Brady Pratt <bpratt@redhat.com> Co-authored-by: Claude <noreply@anthropic.com>
866 lines
34 KiB
Python
866 lines
34 KiB
Python
from datetime import datetime, timedelta
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from peewee import IntegrityError
|
|
|
|
from data import model
|
|
from data.database import ManifestPullStatistics, TagPullStatistics
|
|
from data.model.oci import tag as oci_tag
|
|
from data.model.oci.tag import retarget_tag
|
|
from data.model.oci.test.test_oci_manifest import create_manifest_for_testing
|
|
from data.model.pull_statistics import (
|
|
PullStatisticsException,
|
|
bulk_upsert_manifest_statistics,
|
|
bulk_upsert_tag_statistics,
|
|
get_manifest_pull_statistics,
|
|
get_tag_pull_statistics,
|
|
)
|
|
from data.model.repository import create_repository
|
|
from data.model.user import get_user
|
|
from test.fixtures import *
|
|
|
|
|
|
class TestPullStatistics:
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self, initialized_db):
|
|
self.user = get_user("devtable")
|
|
self.repo = create_repository("devtable", "newrepo", self.user, repo_kind="image")
|
|
assert self.repo is not None, "Failed to create test repository"
|
|
self.repo_id = self.repo.id
|
|
|
|
def _normalize_datetime(self, dt):
|
|
"""
|
|
Normalize datetime objects from different database backends for comparison.
|
|
|
|
- SQLite: Returns string representations
|
|
- MySQL: Returns timezone-naive datetime objects
|
|
- PostgreSQL: Returns timezone-aware datetime objects
|
|
"""
|
|
if isinstance(dt, str):
|
|
# SQLite might return datetime as string
|
|
return datetime.fromisoformat(dt.replace("Z", "+00:00"))
|
|
elif dt.tzinfo is None:
|
|
# MySQL returns timezone-naive datetime - assume UTC
|
|
from datetime import timezone
|
|
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
else:
|
|
# PostgreSQL returns timezone-aware - use as-is
|
|
return dt
|
|
|
|
def test_bulk_upsert_tag_statistics_new_records(self, initialized_db):
|
|
"""Test inserting new tag pull statistics records."""
|
|
tag_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": datetime(2024, 1, 1, 12, 0, 0),
|
|
},
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "v1.0",
|
|
"manifest_digest": "sha256:def456",
|
|
"pull_count": 3,
|
|
"last_pull_timestamp": datetime(2024, 1, 2, 12, 0, 0),
|
|
},
|
|
]
|
|
|
|
rows_affected = bulk_upsert_tag_statistics(tag_updates)
|
|
assert rows_affected == 2
|
|
|
|
# Verify records were created
|
|
latest_stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "latest"
|
|
)
|
|
assert latest_stats.tag_pull_count == 5
|
|
assert latest_stats.current_manifest_digest == "sha256:abc123"
|
|
assert latest_stats.last_tag_pull_date == datetime(2024, 1, 1, 12, 0, 0)
|
|
|
|
v1_stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "v1.0"
|
|
)
|
|
assert v1_stats.tag_pull_count == 3
|
|
assert v1_stats.current_manifest_digest == "sha256:def456"
|
|
assert v1_stats.last_tag_pull_date == datetime(2024, 1, 2, 12, 0, 0)
|
|
|
|
def test_bulk_upsert_tag_statistics_update_existing(self, initialized_db):
|
|
"""Test updating existing tag pull statistics records."""
|
|
# Create initial record
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="latest",
|
|
tag_pull_count=5,
|
|
last_tag_pull_date=datetime(2024, 1, 1, 12, 0, 0),
|
|
current_manifest_digest="sha256:old123",
|
|
)
|
|
|
|
# Update with new data
|
|
tag_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:new456",
|
|
"pull_count": 3, # Additional pulls
|
|
"last_pull_timestamp": datetime(2024, 1, 3, 12, 0, 0),
|
|
}
|
|
]
|
|
|
|
rows_affected = bulk_upsert_tag_statistics(tag_updates)
|
|
assert rows_affected == 1
|
|
|
|
# Verify record was updated (count should be incremented atomically)
|
|
updated_stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "latest"
|
|
)
|
|
assert updated_stats.tag_pull_count == 8 # 5 + 3 = 8 (atomic SQL addition, not max)
|
|
assert updated_stats.current_manifest_digest == "sha256:new456"
|
|
assert updated_stats.last_tag_pull_date == datetime(
|
|
2024, 1, 3, 12, 0, 0
|
|
) # SQL CASE keeps latest
|
|
|
|
def test_bulk_upsert_manifest_statistics_new_records(self, initialized_db):
|
|
"""Test inserting new manifest pull statistics records."""
|
|
manifest_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": 10,
|
|
"last_pull_timestamp": datetime(2024, 1, 1, 12, 0, 0),
|
|
},
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"manifest_digest": "sha256:def456",
|
|
"pull_count": 7,
|
|
"last_pull_timestamp": datetime(2024, 1, 2, 12, 0, 0),
|
|
},
|
|
]
|
|
|
|
rows_affected = bulk_upsert_manifest_statistics(manifest_updates)
|
|
assert rows_affected == 2
|
|
|
|
# Verify records were created
|
|
abc_stats = ManifestPullStatistics.get(
|
|
ManifestPullStatistics.repository == self.repo_id,
|
|
ManifestPullStatistics.manifest_digest == "sha256:abc123",
|
|
)
|
|
assert abc_stats.manifest_pull_count == 10
|
|
assert abc_stats.last_manifest_pull_date == datetime(2024, 1, 1, 12, 0, 0)
|
|
|
|
def_stats = ManifestPullStatistics.get(
|
|
ManifestPullStatistics.repository == self.repo_id,
|
|
ManifestPullStatistics.manifest_digest == "sha256:def456",
|
|
)
|
|
assert def_stats.manifest_pull_count == 7
|
|
assert def_stats.last_manifest_pull_date == datetime(2024, 1, 2, 12, 0, 0)
|
|
|
|
def test_bulk_upsert_manifest_statistics_update_existing(self, initialized_db):
|
|
"""Test updating existing manifest pull statistics records."""
|
|
# Create initial record
|
|
ManifestPullStatistics.create(
|
|
repository=self.repo,
|
|
manifest_digest="sha256:abc123",
|
|
manifest_pull_count=10,
|
|
last_manifest_pull_date=datetime(2024, 1, 1, 12, 0, 0),
|
|
)
|
|
|
|
# Update with new data
|
|
manifest_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": 5, # Additional pulls
|
|
"last_pull_timestamp": datetime(2024, 1, 3, 12, 0, 0),
|
|
}
|
|
]
|
|
|
|
rows_affected = bulk_upsert_manifest_statistics(manifest_updates)
|
|
assert rows_affected == 1
|
|
|
|
# Verify record was updated (count should be incremented atomically)
|
|
updated_stats = ManifestPullStatistics.get(
|
|
ManifestPullStatistics.repository == self.repo_id,
|
|
ManifestPullStatistics.manifest_digest == "sha256:abc123",
|
|
)
|
|
assert updated_stats.manifest_pull_count == 15 # 10 + 5 = 15 (atomic SQL addition, not max)
|
|
assert updated_stats.last_manifest_pull_date == datetime(
|
|
2024, 1, 3, 12, 0, 0
|
|
) # SQL CASE keeps latest
|
|
|
|
def test_get_tag_pull_statistics_existing(self, initialized_db):
|
|
"""Test retrieving existing tag pull statistics."""
|
|
# Create test data
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="v2.0",
|
|
tag_pull_count=25,
|
|
last_tag_pull_date=datetime(2024, 1, 15, 10, 30, 0),
|
|
current_manifest_digest="sha256:test789",
|
|
)
|
|
|
|
stats = get_tag_pull_statistics(self.repo_id, "v2.0")
|
|
|
|
assert stats is not None
|
|
assert stats["repository_id"] == self.repo_id
|
|
assert stats["tag_name"] == "v2.0"
|
|
assert stats["pull_count"] == 25
|
|
assert stats["last_pull_date"] == datetime(2024, 1, 15, 10, 30, 0)
|
|
assert stats["current_manifest_digest"] == "sha256:test789"
|
|
|
|
def test_get_tag_pull_statistics_missing(self, initialized_db):
|
|
"""Test retrieving non-existent tag pull statistics."""
|
|
stats = get_tag_pull_statistics(self.repo_id, "nonexistent")
|
|
assert stats is None
|
|
|
|
def test_get_manifest_pull_statistics_existing(self, initialized_db):
|
|
"""Test retrieving existing manifest pull statistics."""
|
|
# Create test data
|
|
ManifestPullStatistics.create(
|
|
repository=self.repo,
|
|
manifest_digest="sha256:test999",
|
|
manifest_pull_count=42,
|
|
last_manifest_pull_date=datetime(2024, 1, 20, 14, 45, 0),
|
|
)
|
|
|
|
stats = get_manifest_pull_statistics(self.repo_id, "sha256:test999")
|
|
|
|
assert stats is not None
|
|
assert stats["repository_id"] == self.repo_id
|
|
assert stats["manifest_digest"] == "sha256:test999"
|
|
assert stats["pull_count"] == 42
|
|
assert stats["last_pull_date"] == datetime(2024, 1, 20, 14, 45, 0)
|
|
|
|
def test_get_manifest_pull_statistics_missing(self, initialized_db):
|
|
"""Test retrieving non-existent manifest pull statistics."""
|
|
stats = get_manifest_pull_statistics(self.repo_id, "sha256:nonexistent")
|
|
assert stats is None
|
|
|
|
def test_bulk_upsert_tag_statistics_empty_list(self, initialized_db):
|
|
"""Test bulk upsert with empty list."""
|
|
# Empty list should return 0
|
|
rows_affected = bulk_upsert_tag_statistics([])
|
|
assert rows_affected == 0
|
|
|
|
def test_concurrent_updates_tag_statistics(self, initialized_db):
|
|
"""Test sequential updates to same tag statistics - atomic SQL ensures correct accumulation."""
|
|
# Create initial record
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="concurrent",
|
|
tag_pull_count=10,
|
|
last_tag_pull_date=datetime(2024, 1, 1, 12, 0, 0),
|
|
current_manifest_digest="sha256:initial",
|
|
)
|
|
|
|
# Simulate concurrent updates with different timestamps
|
|
updates1 = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "concurrent",
|
|
"manifest_digest": "sha256:update1",
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": datetime(2024, 1, 2, 12, 0, 0),
|
|
}
|
|
]
|
|
|
|
updates2 = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "concurrent",
|
|
"manifest_digest": "sha256:update2",
|
|
"pull_count": 3,
|
|
"last_pull_timestamp": datetime(2024, 1, 3, 12, 0, 0), # Later timestamp
|
|
}
|
|
]
|
|
|
|
# Apply both updates
|
|
bulk_upsert_tag_statistics(updates1)
|
|
bulk_upsert_tag_statistics(updates2)
|
|
|
|
# Verify final state - atomic SQL updates ensure correct accumulation
|
|
final_stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "concurrent"
|
|
)
|
|
assert final_stats.tag_pull_count == 18 # 10 + 5 + 3 = 18 (atomic SQL addition)
|
|
assert final_stats.current_manifest_digest == "sha256:update2" # Latest update
|
|
assert final_stats.last_tag_pull_date == datetime(
|
|
2024, 1, 3, 12, 0, 0
|
|
) # SQL CASE keeps latest timestamp
|
|
|
|
def test_unique_constraints_tag_statistics(self, initialized_db):
|
|
"""Test that unique constraints are enforced for tag statistics."""
|
|
# Create initial tag statistics
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="unique_test",
|
|
tag_pull_count=1,
|
|
last_tag_pull_date=datetime.now(),
|
|
current_manifest_digest="sha256:test",
|
|
)
|
|
|
|
# Try to create duplicate (should fail)
|
|
with pytest.raises(IntegrityError): # Unique constraint violation
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="unique_test", # Same tag name for same repo
|
|
tag_pull_count=2,
|
|
last_tag_pull_date=datetime.now(),
|
|
current_manifest_digest="sha256:test2",
|
|
)
|
|
|
|
def test_unique_constraints_manifest_statistics(self, initialized_db):
|
|
"""Test that unique constraints are enforced for manifest statistics."""
|
|
# Create initial manifest statistics
|
|
ManifestPullStatistics.create(
|
|
repository=self.repo,
|
|
manifest_digest="sha256:unique_test",
|
|
manifest_pull_count=1,
|
|
last_manifest_pull_date=datetime.now(),
|
|
)
|
|
|
|
# Try to create duplicate (should fail)
|
|
with pytest.raises(IntegrityError): # Unique constraint violation
|
|
ManifestPullStatistics.create(
|
|
repository=self.repo,
|
|
manifest_digest="sha256:unique_test", # Same digest for same repo
|
|
manifest_pull_count=2,
|
|
last_manifest_pull_date=datetime.now(),
|
|
)
|
|
|
|
def test_bulk_operations_performance(self, initialized_db):
|
|
"""Test bulk operations with larger datasets."""
|
|
# Generate a larger set of updates
|
|
tag_updates = []
|
|
manifest_updates = []
|
|
|
|
for i in range(100):
|
|
tag_updates.append(
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": f"tag_{i}",
|
|
"manifest_digest": f"sha256:digest_{i}",
|
|
"pull_count": i + 1,
|
|
"last_pull_timestamp": datetime(2024, 1, 1, 12, i % 60, 0),
|
|
}
|
|
)
|
|
|
|
manifest_updates.append(
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"manifest_digest": f"sha256:manifest_{i}",
|
|
"pull_count": (i + 1) * 2,
|
|
"last_pull_timestamp": datetime(2024, 1, 1, 12, i % 60, 0),
|
|
}
|
|
)
|
|
|
|
# Test bulk insert performance
|
|
tag_rows = bulk_upsert_tag_statistics(tag_updates)
|
|
manifest_rows = bulk_upsert_manifest_statistics(manifest_updates)
|
|
|
|
assert tag_rows == 100
|
|
assert manifest_rows == 100
|
|
|
|
# Verify a few random records
|
|
assert (
|
|
TagPullStatistics.select().where(TagPullStatistics.repository == self.repo_id).count()
|
|
== 100
|
|
)
|
|
assert (
|
|
ManifestPullStatistics.select()
|
|
.where(ManifestPullStatistics.repository == self.repo_id)
|
|
.count()
|
|
== 100
|
|
)
|
|
|
|
# Test sample record
|
|
sample_tag = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "tag_50"
|
|
)
|
|
assert sample_tag.tag_pull_count == 51
|
|
assert sample_tag.current_manifest_digest == "sha256:digest_50"
|
|
|
|
def test_bulk_upsert_mixed_new_and_existing(self, initialized_db):
|
|
"""Test bulk upsert with both new records and existing record updates."""
|
|
# Create some existing records first
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="existing1",
|
|
tag_pull_count=10,
|
|
last_tag_pull_date=datetime(2024, 1, 1, 12, 0, 0),
|
|
current_manifest_digest="sha256:old1",
|
|
)
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="existing2",
|
|
tag_pull_count=20,
|
|
last_tag_pull_date=datetime(2024, 1, 2, 12, 0, 0),
|
|
current_manifest_digest="sha256:old2",
|
|
)
|
|
|
|
# Mix of updates to existing records and new record creation
|
|
tag_updates = [
|
|
# Update existing record
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "existing1",
|
|
"manifest_digest": "sha256:new1",
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": datetime(2024, 1, 5, 12, 0, 0),
|
|
},
|
|
# Create new record
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "brand_new",
|
|
"manifest_digest": "sha256:new",
|
|
"pull_count": 3,
|
|
"last_pull_timestamp": datetime(2024, 1, 3, 12, 0, 0),
|
|
},
|
|
# Update another existing record
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "existing2",
|
|
"manifest_digest": "sha256:new2",
|
|
"pull_count": 7,
|
|
"last_pull_timestamp": datetime(2024, 1, 4, 12, 0, 0),
|
|
},
|
|
]
|
|
|
|
rows_affected = bulk_upsert_tag_statistics(tag_updates)
|
|
assert rows_affected == 3
|
|
|
|
# Verify updated existing records - atomic SQL updates ensure correct values
|
|
existing1 = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "existing1"
|
|
)
|
|
assert existing1.tag_pull_count == 15 # 10 + 5 = 15 (atomic SQL addition)
|
|
assert existing1.current_manifest_digest == "sha256:new1"
|
|
assert existing1.last_tag_pull_date == datetime(
|
|
2024, 1, 5, 12, 0, 0
|
|
) # SQL CASE keeps latest (Jan 5 > Jan 1)
|
|
|
|
existing2 = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "existing2"
|
|
)
|
|
assert existing2.tag_pull_count == 27 # 20 + 7 = 27 (atomic SQL addition)
|
|
assert existing2.current_manifest_digest == "sha256:new2"
|
|
assert existing2.last_tag_pull_date == datetime(
|
|
2024, 1, 4, 12, 0, 0
|
|
) # SQL CASE keeps latest (Jan 4 > Jan 2)
|
|
|
|
# Verify new record was created
|
|
new_record = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "brand_new"
|
|
)
|
|
assert new_record.tag_pull_count == 3
|
|
assert new_record.current_manifest_digest == "sha256:new"
|
|
assert new_record.last_tag_pull_date == datetime(2024, 1, 3, 12, 0, 0)
|
|
|
|
def test_bulk_upsert_string_timestamps(self, initialized_db):
|
|
"""Test bulk upsert with string timestamp formats (ISO format)."""
|
|
tag_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "string_timestamp",
|
|
"manifest_digest": "sha256:string_test",
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": "2024-01-15T14:30:00Z", # ISO string with Z
|
|
},
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "string_timestamp2",
|
|
"manifest_digest": "sha256:string_test2",
|
|
"pull_count": 3,
|
|
"last_pull_timestamp": "2024-01-16T10:15:30.123Z", # ISO string with milliseconds
|
|
},
|
|
]
|
|
|
|
rows_affected = bulk_upsert_tag_statistics(tag_updates)
|
|
assert rows_affected == 2
|
|
|
|
# Verify records were created with correct parsed timestamps
|
|
record1 = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "string_timestamp",
|
|
)
|
|
assert record1.tag_pull_count == 5
|
|
|
|
# Handle both datetime objects and string representations (SQLite vs PostgreSQL vs MySQL)
|
|
last_pull_date = self._normalize_datetime(record1.last_tag_pull_date)
|
|
expected_time1 = datetime.fromisoformat("2024-01-15T14:30:00+00:00")
|
|
assert last_pull_date == expected_time1
|
|
|
|
record2 = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "string_timestamp2",
|
|
)
|
|
# Test that milliseconds are handled (might be truncated depending on DB)
|
|
assert record2.tag_pull_count == 3
|
|
|
|
# Verify the timestamp was parsed and stored correctly (allowing for millisecond truncation)
|
|
last_pull_date2 = self._normalize_datetime(record2.last_tag_pull_date)
|
|
|
|
# Check that it's at least the correct date and time (milliseconds might be truncated)
|
|
expected_time2_base = datetime.fromisoformat("2024-01-16T10:15:30+00:00")
|
|
assert last_pull_date2 >= expected_time2_base
|
|
|
|
def test_bulk_upsert_invalid_timestamp_format(self, initialized_db):
|
|
"""Test bulk upsert handles invalid timestamp formats gracefully."""
|
|
tag_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "invalid_timestamp",
|
|
"manifest_digest": "sha256:invalid",
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": "not-a-valid-timestamp", # Invalid format
|
|
}
|
|
]
|
|
|
|
# Should raise PullStatisticsException due to invalid timestamp
|
|
with pytest.raises(PullStatisticsException) as exc_info:
|
|
bulk_upsert_tag_statistics(tag_updates)
|
|
|
|
assert "Invalid tag update data" in str(exc_info.value)
|
|
|
|
def test_bulk_upsert_maintains_last_pull_date_logic(self, initialized_db):
|
|
"""Test that bulk upsert properly handles last_pull_date using atomic SQL CASE to keep latest timestamp."""
|
|
# Create existing record with later timestamp
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="timestamp_test",
|
|
tag_pull_count=10,
|
|
last_tag_pull_date=datetime(2024, 1, 10, 12, 0, 0), # Later date
|
|
current_manifest_digest="sha256:old",
|
|
)
|
|
|
|
# Try to update with earlier timestamp
|
|
tag_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "timestamp_test",
|
|
"manifest_digest": "sha256:new",
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": datetime(2024, 1, 5, 12, 0, 0), # Earlier date
|
|
}
|
|
]
|
|
|
|
bulk_upsert_tag_statistics(tag_updates)
|
|
|
|
# Verify the later timestamp is preserved using atomic SQL CASE (max for timestamp, addition for count)
|
|
updated = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "timestamp_test",
|
|
)
|
|
assert updated.tag_pull_count == 15 # 10 + 5 = 15 (atomic SQL addition for count)
|
|
assert updated.last_tag_pull_date == datetime(
|
|
2024, 1, 10, 12, 0, 0
|
|
) # SQL CASE keeps later date atomically (max for timestamp)
|
|
assert updated.current_manifest_digest == "sha256:new" # But update manifest
|
|
|
|
def test_delete_tag_clears_pull_statistics(self, initialized_db):
|
|
"""Test that deleting a tag clears its pull statistics."""
|
|
with patch("data.model.oci.tag.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = True
|
|
|
|
# Create a manifest
|
|
manifest, _ = create_manifest_for_testing(self.repo, "test1")
|
|
|
|
# Create a tag
|
|
_ = retarget_tag("test-tag", manifest)
|
|
|
|
# Create pull statistics for the tag
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="test-tag",
|
|
tag_pull_count=42,
|
|
last_tag_pull_date=datetime(2024, 1, 1, 12, 0, 0),
|
|
current_manifest_digest=manifest.digest,
|
|
)
|
|
|
|
# Verify statistics exist
|
|
stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "test-tag",
|
|
)
|
|
assert stats.tag_pull_count == 42
|
|
|
|
# Delete the tag
|
|
deleted_tag = oci_tag.delete_tag(self.repo.id, "test-tag")
|
|
assert deleted_tag is not None
|
|
|
|
# Verify pull statistics were deleted
|
|
stats_query = TagPullStatistics.select().where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "test-tag",
|
|
)
|
|
assert stats_query.count() == 0
|
|
|
|
def test_delete_tag_with_feature_disabled(self, initialized_db):
|
|
"""Test that deleting a tag with IMAGE_PULL_STATS disabled preserves pull statistics."""
|
|
with patch("data.model.oci.tag.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = False
|
|
|
|
# Create a manifest
|
|
manifest, _ = create_manifest_for_testing(self.repo, "test2")
|
|
|
|
# Create a tag
|
|
_ = retarget_tag("test-tag-disabled", manifest)
|
|
|
|
# Create pull statistics for the tag
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="test-tag-disabled",
|
|
tag_pull_count=99,
|
|
last_tag_pull_date=datetime(2024, 1, 15, 12, 0, 0),
|
|
current_manifest_digest=manifest.digest,
|
|
)
|
|
|
|
# Verify statistics exist
|
|
stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "test-tag-disabled",
|
|
)
|
|
assert stats.tag_pull_count == 99
|
|
|
|
# Delete the tag
|
|
deleted_tag = oci_tag.delete_tag(self.repo.id, "test-tag-disabled")
|
|
assert deleted_tag is not None
|
|
|
|
# Verify pull statistics still exist (not deleted when feature is disabled)
|
|
stats_query = TagPullStatistics.select().where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "test-tag-disabled",
|
|
)
|
|
assert stats_query.count() == 1
|
|
stats = stats_query.get()
|
|
assert stats.tag_pull_count == 99
|
|
|
|
def test_repush_tag_after_deletion_starts_fresh(self, initialized_db):
|
|
"""Test that re-pushing a tag after deletion starts with fresh pull statistics."""
|
|
with patch("data.model.oci.tag.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = True
|
|
|
|
# Create first manifest
|
|
manifest1, _ = create_manifest_for_testing(self.repo, "1")
|
|
|
|
# Create tag
|
|
_ = retarget_tag("redis", manifest1)
|
|
|
|
# Simulate pulls by creating statistics
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="redis",
|
|
tag_pull_count=100,
|
|
last_tag_pull_date=datetime(2024, 1, 1, 12, 0, 0),
|
|
current_manifest_digest=manifest1.digest,
|
|
)
|
|
|
|
# Delete the tag
|
|
oci_tag.delete_tag(self.repo.id, "redis")
|
|
|
|
# Verify statistics were cleared
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "redis",
|
|
)
|
|
.count()
|
|
== 0
|
|
)
|
|
|
|
# Re-push the same tag name (simulating user scenario from bug report)
|
|
manifest2, _ = create_manifest_for_testing(self.repo, "2")
|
|
|
|
_ = retarget_tag("redis", manifest2)
|
|
|
|
# Simulate new pulls
|
|
tag_updates = [
|
|
{
|
|
"repository_id": self.repo_id,
|
|
"tag_name": "redis",
|
|
"manifest_digest": manifest2.digest,
|
|
"pull_count": 5,
|
|
"last_pull_timestamp": datetime(2024, 2, 1, 12, 0, 0),
|
|
}
|
|
]
|
|
bulk_upsert_tag_statistics(tag_updates)
|
|
|
|
# Verify statistics start fresh (not 105, but 5)
|
|
stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id, TagPullStatistics.tag_name == "redis"
|
|
)
|
|
assert stats.tag_pull_count == 5 # Fresh start, not 100 + 5
|
|
assert stats.last_tag_pull_date == datetime(2024, 2, 1, 12, 0, 0)
|
|
assert stats.current_manifest_digest == manifest2.digest
|
|
|
|
def test_remove_tag_from_timemachine_clears_pull_statistics(self, initialized_db):
|
|
"""Test that permanently deleting a tag clears its pull statistics."""
|
|
with patch("data.model.oci.tag.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = True
|
|
|
|
# Create a manifest
|
|
manifest, _ = create_manifest_for_testing(self.repo, "perm")
|
|
|
|
# Create tag
|
|
_ = retarget_tag("permanent-test", manifest)
|
|
|
|
# Create pull statistics
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="permanent-test",
|
|
tag_pull_count=75,
|
|
last_tag_pull_date=datetime(2024, 3, 1, 10, 0, 0),
|
|
current_manifest_digest=manifest.digest,
|
|
)
|
|
|
|
# Verify statistics exist
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-test",
|
|
)
|
|
.count()
|
|
== 1
|
|
)
|
|
|
|
# Permanently delete the tag (bypass time machine)
|
|
result = oci_tag.remove_tag_from_timemachine(
|
|
self.repo.id,
|
|
"permanent-test",
|
|
manifest.id,
|
|
include_submanifests=False,
|
|
is_alive=True,
|
|
)
|
|
assert result is True
|
|
|
|
# Verify pull statistics were cleared
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-test",
|
|
)
|
|
.count()
|
|
== 0
|
|
)
|
|
|
|
def test_remove_tag_from_timemachine_with_feature_disabled_alive(self, initialized_db):
|
|
"""Test that permanently deleting an alive tag with IMAGE_PULL_STATS disabled preserves pull statistics."""
|
|
with patch("data.model.oci.tag.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = False
|
|
|
|
# Create a manifest
|
|
manifest, _ = create_manifest_for_testing(self.repo, "perm-disabled")
|
|
|
|
# Create tag
|
|
_ = retarget_tag("permanent-disabled-alive", manifest)
|
|
|
|
# Create pull statistics
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="permanent-disabled-alive",
|
|
tag_pull_count=85,
|
|
last_tag_pull_date=datetime(2024, 3, 15, 10, 0, 0),
|
|
current_manifest_digest=manifest.digest,
|
|
)
|
|
|
|
# Verify statistics exist
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-disabled-alive",
|
|
)
|
|
.count()
|
|
== 1
|
|
)
|
|
|
|
# Permanently delete the tag (bypass time machine)
|
|
result = oci_tag.remove_tag_from_timemachine(
|
|
self.repo.id,
|
|
"permanent-disabled-alive",
|
|
manifest.id,
|
|
include_submanifests=False,
|
|
is_alive=True,
|
|
)
|
|
assert result is True
|
|
|
|
# Verify pull statistics still exist (not deleted when feature is disabled)
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-disabled-alive",
|
|
)
|
|
.count()
|
|
== 1
|
|
)
|
|
stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-disabled-alive",
|
|
)
|
|
assert stats.tag_pull_count == 85
|
|
|
|
def test_remove_tag_from_timemachine_with_feature_disabled_not_alive(self, initialized_db):
|
|
"""Test that permanently deleting a non-alive tag with IMAGE_PULL_STATS disabled preserves pull statistics."""
|
|
with patch("data.model.oci.tag.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = False
|
|
|
|
# Create a manifest
|
|
manifest, _ = create_manifest_for_testing(self.repo, "perm-not-alive")
|
|
|
|
# Create tag
|
|
tag = retarget_tag("permanent-disabled-not-alive", manifest)
|
|
|
|
# Delete the tag first to make it not alive (but still in time machine)
|
|
oci_tag.delete_tag(self.repo.id, "permanent-disabled-not-alive")
|
|
|
|
# Create pull statistics for the deleted tag
|
|
TagPullStatistics.create(
|
|
repository=self.repo,
|
|
tag_name="permanent-disabled-not-alive",
|
|
tag_pull_count=95,
|
|
last_tag_pull_date=datetime(2024, 3, 20, 10, 0, 0),
|
|
current_manifest_digest=manifest.digest,
|
|
)
|
|
|
|
# Verify statistics exist
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-disabled-not-alive",
|
|
)
|
|
.count()
|
|
== 1
|
|
)
|
|
|
|
# Permanently delete the tag from time machine (is_alive=False)
|
|
result = oci_tag.remove_tag_from_timemachine(
|
|
self.repo.id,
|
|
"permanent-disabled-not-alive",
|
|
manifest.id,
|
|
include_submanifests=False,
|
|
is_alive=False,
|
|
)
|
|
assert result is True
|
|
|
|
# Verify pull statistics still exist (not deleted when feature is disabled)
|
|
assert (
|
|
TagPullStatistics.select()
|
|
.where(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-disabled-not-alive",
|
|
)
|
|
.count()
|
|
== 1
|
|
)
|
|
stats = TagPullStatistics.get(
|
|
TagPullStatistics.repository == self.repo_id,
|
|
TagPullStatistics.tag_name == "permanent-disabled-not-alive",
|
|
)
|
|
assert stats.tag_pull_count == 95
|