1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/model/pull_statistics.py
Ryan Wallace a06cc6fa43 chore: update all black versions to 24.4.2 and run make black (#4754)
* chore(pre-commit): match black version with requirements-dev

* run `make black` against repo

* ci: switch to black 24.4.2

* fix: py312

* fix: flake8 errors

* fix: flake8 conflicts

* chore: add git blame ignore revs file
2025-12-19 11:29:53 -06:00

380 lines
15 KiB
Python

"""
Business logic for pull statistics operations.
This module provides functions for managing pull statistics data:
- Bulk upsert operations for tag and manifest statistics
- Helper functions for individual record operations
- Query functions for retrieving statistics
"""
import logging
from datetime import datetime
from functools import reduce
from typing import Dict, List, Optional
from peewee import Case, IntegrityError, fn
from data.database import (
ManifestPullStatistics,
Repository,
TagPullStatistics,
db_transaction,
)
from data.model import DataModelException
logger = logging.getLogger(__name__)
class PullStatisticsException(DataModelException):
"""
Exception raised when pull statistics operations fail.
"""
pass
def bulk_upsert_tag_statistics(tag_updates: List[Dict]) -> int:
"""
Upsert tag pull statistics in bulk using optimized batch operations.
Args:
tag_updates: List of dictionaries with tag statistics data:
[
{
'repository_id': 123,
'tag_name': 'latest',
'pull_count': 5,
'last_pull_timestamp': '2024-01-01T12:00:00Z',
'manifest_digest': 'sha256:abc...'
}
]
Returns:
Number of rows affected
Raises:
PullStatisticsException: If bulk operation fails
"""
if not tag_updates:
return 0
try:
with db_transaction():
# Parse and validate all timestamps first
parsed_updates = []
for update in tag_updates:
try:
last_pull_timestamp = update["last_pull_timestamp"]
if isinstance(last_pull_timestamp, datetime):
last_pull = last_pull_timestamp
elif isinstance(last_pull_timestamp, str):
last_pull = datetime.fromisoformat(
last_pull_timestamp.replace("Z", "+00:00")
)
else:
raise ValueError(f"Invalid timestamp format: {last_pull_timestamp}")
parsed_updates.append(
{
"repository_id": update["repository_id"],
"tag_name": update["tag_name"],
"pull_count": update["pull_count"],
"last_pull_date": last_pull,
"manifest_digest": update["manifest_digest"],
}
)
except (ValueError, KeyError, Exception) as e:
logger.error(f"Failed to parse tag update: {e}")
raise PullStatisticsException(f"Invalid tag update data: {e}")
# Build conditions for existing records lookup
# Create a list of (repository_id, tag_name) tuples for bulk lookup
lookup_keys = [(u["repository_id"], u["tag_name"]) for u in parsed_updates]
# Chunk the lookup to avoid SQLite parser stack overflow with large OR queries
CHUNK_SIZE = 50 # Reasonable limit for OR conditions
existing_records = {}
for i in range(0, len(lookup_keys), CHUNK_SIZE):
chunk = lookup_keys[i : i + CHUNK_SIZE]
if chunk:
conditions = []
for repo_id, tag_name in chunk:
conditions.append(
(TagPullStatistics.repository == repo_id)
& (TagPullStatistics.tag_name == tag_name)
)
# Combine all conditions with OR using bitwise operator
combined_condition = reduce(lambda a, b: a | b, conditions)
existing_query = TagPullStatistics.select().where(combined_condition)
# Add results to our existing_records map
for record in existing_query:
key = (record.repository.id, record.tag_name)
existing_records[key] = record
# Separate updates into existing vs new records
updates_for_existing = []
new_records_data = []
for update in parsed_updates:
key = (update["repository_id"], update["tag_name"])
if key in existing_records:
# Record exists - prepare for bulk update
existing_record = existing_records[key]
# Note: update["pull_count"] represents new pulls since last flush (from Redis),
increment = update["pull_count"]
updates_for_existing.append(
{
"record": existing_record,
"increment": increment,
"new_date": update[
"last_pull_date"
], # Pass incoming date, make SQL handle the latest between the concurrent requestsmax
"new_digest": update["manifest_digest"],
}
)
else:
# New record - prepare for bulk insert
new_records_data.append(
{
"repository": update["repository_id"],
"tag_name": update["tag_name"],
"tag_pull_count": update["pull_count"],
"last_tag_pull_date": update["last_pull_date"],
"current_manifest_digest": update["manifest_digest"],
}
)
rows_affected = 0
# Bulk update existing records
# Use atomic SQL updates to avoid race conditions with concurrent flush workers
for update_data in updates_for_existing:
record = update_data["record"]
increment = update_data["increment"]
new_date = update_data["new_date"]
# Use atomic SQL update: tag_pull_count = tag_pull_count + increment
# This prevents race conditions where multiple flush workers update simultaneously
TagPullStatistics.update(
tag_pull_count=TagPullStatistics.tag_pull_count + increment,
last_tag_pull_date=Case(
None,
((TagPullStatistics.last_tag_pull_date < new_date, new_date),),
TagPullStatistics.last_tag_pull_date,
),
current_manifest_digest=update_data["new_digest"],
).where(TagPullStatistics.id == record.id).execute()
rows_affected += 1
# Bulk insert new records
if new_records_data:
TagPullStatistics.insert_many(new_records_data).execute()
rows_affected += len(new_records_data)
return rows_affected
except (IntegrityError, Exception) as e:
logger.error(f"Failed to bulk upsert tag statistics: {e}")
raise PullStatisticsException(f"Bulk upsert failed: {e}")
def bulk_upsert_manifest_statistics(manifest_updates: List[Dict]) -> int:
"""
Upsert manifest pull statistics in bulk using optimized batch operations.
Args:
manifest_updates: List of dictionaries with manifest statistics data:
[
{
'repository_id': 123,
'manifest_digest': 'sha256:abc...',
'pull_count': 5,
'last_pull_timestamp': '2024-01-01T12:00:00Z'
}
]
Returns:
Number of rows affected
Raises:
PullStatisticsException: If bulk operation fails
"""
if not manifest_updates:
return 0
try:
with db_transaction():
# Parse and validate all timestamps first
parsed_updates = []
for update in manifest_updates:
try:
last_pull_timestamp = update["last_pull_timestamp"]
if isinstance(last_pull_timestamp, datetime):
last_pull = last_pull_timestamp
elif isinstance(last_pull_timestamp, str):
last_pull = datetime.fromisoformat(
last_pull_timestamp.replace("Z", "+00:00")
)
else:
raise ValueError(f"Invalid timestamp format: {last_pull_timestamp}")
parsed_updates.append(
{
"repository_id": update["repository_id"],
"manifest_digest": update["manifest_digest"],
"pull_count": update["pull_count"],
"last_pull_date": last_pull,
}
)
except (ValueError, KeyError, Exception) as e:
logger.error(f"Failed to parse manifest update: {e}")
raise PullStatisticsException(f"Invalid manifest update data: {e}")
# Build conditions for existing records lookup
# Create a list of (repository_id, manifest_digest) tuples for bulk lookup
lookup_keys = [(u["repository_id"], u["manifest_digest"]) for u in parsed_updates]
# Chunk the lookup to avoid SQLite parser stack overflow with large OR queries
CHUNK_SIZE = 50 # Reasonable limit for OR conditions
existing_records = {}
for i in range(0, len(lookup_keys), CHUNK_SIZE):
chunk = lookup_keys[i : i + CHUNK_SIZE]
if chunk:
conditions = []
for repo_id, manifest_digest in chunk:
conditions.append(
(ManifestPullStatistics.repository == repo_id)
& (ManifestPullStatistics.manifest_digest == manifest_digest)
)
# Combine all conditions with OR using bitwise operator
combined_condition = reduce(lambda a, b: a | b, conditions)
existing_query = ManifestPullStatistics.select().where(combined_condition)
# Add results to our existing_records map
for record in existing_query:
key = (record.repository.id, record.manifest_digest)
existing_records[key] = record
# Separate updates into existing vs new records
updates_for_existing = []
new_records_data = []
for update in parsed_updates:
key = (update["repository_id"], update["manifest_digest"])
if key in existing_records:
# Record exists - prepare for bulk update
existing_record = existing_records[key]
# Note: update["pull_count"] represents new pulls since last flush (from Redis),
increment = update["pull_count"]
updates_for_existing.append(
{
"record": existing_record,
"increment": increment,
"new_date": update["last_pull_date"],
}
)
else:
# New record - prepare for bulk insert
new_records_data.append(
{
"repository": update["repository_id"],
"manifest_digest": update["manifest_digest"],
"manifest_pull_count": update["pull_count"],
"last_manifest_pull_date": update["last_pull_date"],
}
)
rows_affected = 0
# Bulk update existing records
for update_data in updates_for_existing:
record = update_data["record"]
increment = update_data["increment"]
new_date = update_data["new_date"]
# Use atomic SQL update: manifest_pull_count = manifest_pull_count + increment
# For timestamp, CASE will atomically keep the latest timestamp
ManifestPullStatistics.update(
manifest_pull_count=ManifestPullStatistics.manifest_pull_count + increment,
last_manifest_pull_date=Case(
None,
((ManifestPullStatistics.last_manifest_pull_date < new_date, new_date),),
ManifestPullStatistics.last_manifest_pull_date,
),
).where(ManifestPullStatistics.id == record.id).execute()
rows_affected += 1
# Bulk insert new records
if new_records_data:
ManifestPullStatistics.insert_many(new_records_data).execute()
rows_affected += len(new_records_data)
return rows_affected
except (IntegrityError, Exception) as e:
logger.error(f"Failed to bulk upsert manifest statistics: {e}")
raise PullStatisticsException(f"Bulk upsert failed: {e}")
def get_tag_pull_statistics(repository_id: int, tag_name: str) -> Optional[Dict]:
"""
Get pull statistics for a specific tag.
Args:
repository_id: Repository ID
tag_name: Tag name
Returns:
Dictionary with tag statistics or None if not found
"""
try:
stats = TagPullStatistics.get(
TagPullStatistics.repository == repository_id, TagPullStatistics.tag_name == tag_name
)
return {
"repository_id": stats.repository.id,
"tag_name": stats.tag_name,
"pull_count": stats.tag_pull_count,
"last_pull_date": stats.last_tag_pull_date,
"current_manifest_digest": stats.current_manifest_digest,
}
except TagPullStatistics.DoesNotExist:
return None
def get_manifest_pull_statistics(repository_id: int, manifest_digest: str) -> Optional[Dict]:
"""
Get pull statistics for a specific manifest.
Args:
repository_id: Repository ID
manifest_digest: Manifest digest
Returns:
Dictionary with manifest statistics or None if not found
"""
try:
stats = ManifestPullStatistics.get(
ManifestPullStatistics.repository == repository_id,
ManifestPullStatistics.manifest_digest == manifest_digest,
)
return {
"repository_id": stats.repository.id,
"manifest_digest": stats.manifest_digest,
"pull_count": stats.manifest_pull_count,
"last_pull_date": stats.last_manifest_pull_date,
}
except ManifestPullStatistics.DoesNotExist:
return None