1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/workers/pullstatsredisflushworker.py
Shubhra Deshpande 57101500ea bug: fixing pull statistics Redis flush worker with aggregation support (PROJQUAY-9607) (#4407)
* fixing pull statistics Redis flush worker with aggregation support

---------

Co-authored-by: shudeshp <shudeshp@redhat.com>
2025-10-29 15:23:39 -04:00

496 lines
19 KiB
Python

"""
Redis Pull Metrics Flush Worker
Background worker that periodically flushes pull metrics from Redis to the database.
Processes Redis keys matching patterns:
- pull_events:repo:*:tag:*:*
- pull_events:repo:*:digest:*
Converts Redis pull events into persistent TagPullStatistics and ManifestPullStatistics records.
"""
import logging.config
import time
from datetime import datetime
from typing import Dict, List, Set, Tuple
import redis
import features
from app import app
from data.model.pull_statistics import (
PullStatisticsException,
bulk_upsert_manifest_statistics,
bulk_upsert_tag_statistics,
)
from digest.digest_tools import Digest, InvalidDigestException
from util.log import logfile_path
from workers.gunicorn_worker import GunicornWorker
from workers.worker import Worker
logger = logging.getLogger(__name__)
# Configuration constants
POLL_PERIOD = app.config.get("REDIS_FLUSH_INTERVAL_SECONDS", 300) # 5 minutes
BATCH_SIZE = app.config.get("REDIS_FLUSH_WORKER_BATCH_SIZE", 1000)
REDIS_SCAN_COUNT = app.config.get("REDIS_FLUSH_WORKER_SCAN_COUNT", 100)
class RedisFlushWorker(Worker):
"""
Worker that flushes pull metrics from Redis to database tables.
This worker:
1. Scans Redis for pull_events:* keys
2. Processes pull events and aggregates data
3. Performs bulk upserts to TagPullStatistics and ManifestPullStatistics tables
4. Cleans up Redis keys after successful database writes
"""
def __init__(self):
super(RedisFlushWorker, self).__init__()
self.redis_client = None
self._initialize_redis_client()
self.add_operation(self._flush_pull_metrics, POLL_PERIOD)
def _initialize_redis_client(self):
"""Initialize Redis client for pull metrics."""
try:
redis_config = app.config.get("PULL_METRICS_REDIS", {})
redis_host = redis_config.get("host", "localhost")
redis_port = redis_config.get("port", 6379)
redis_db = redis_config.get("db", 1)
redis_password = redis_config.get("password")
redis_connection_timeout = app.config.get("REDIS_CONNECTION_TIMEOUT", 5)
# Create Redis client
self.redis_client = redis.StrictRedis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True,
socket_connect_timeout=redis_connection_timeout,
socket_timeout=redis_connection_timeout,
)
# Test connection
self.redis_client.ping()
logger.info("RedisFlushWorker: Initialized Redis client for pull metrics")
except redis.ConnectionError:
logger.warning("RedisFlushWorker: Redis connection failed (will retry)")
self.redis_client = None
except redis.RedisError as re:
logger.error(f"RedisFlushWorker: Redis initialization error: {re}")
self.redis_client = None
except Exception as e:
logger.error(f"RedisFlushWorker: Failed to initialize Redis client: {e}")
self.redis_client = None
def _flush_pull_metrics(self):
"""Main method to flush pull metrics from Redis to database."""
if not self.redis_client:
logger.warning("RedisFlushWorker: Redis client not initialized, skipping flush")
return
try:
logger.debug("RedisFlushWorker: Starting pull metrics flush")
start_time = time.time()
# Scan for pull event keys
pull_event_keys = self._scan_redis_keys("pull_events:*", BATCH_SIZE)
if not pull_event_keys:
logger.debug("RedisFlushWorker: No pull event keys found")
return
logger.info(f"RedisFlushWorker: Processing {len(pull_event_keys)} Redis keys")
# Process keys and aggregate data
(
tag_updates,
manifest_updates,
cleanable_keys,
database_dependent_keys,
) = self._process_redis_events(pull_event_keys)
# Always clean up empty/invalid keys first
if cleanable_keys:
self._cleanup_redis_keys(cleanable_keys)
logger.debug(
f"RedisFlushWorker: Cleaned up {len(cleanable_keys)} empty/invalid keys"
)
# Perform bulk database operations
success = self._flush_to_database(tag_updates, manifest_updates)
if success:
# Clean up Redis keys after successful database writes
self._cleanup_redis_keys(database_dependent_keys)
elapsed_time = time.time() - start_time
total_processed = len(cleanable_keys) + len(database_dependent_keys)
logger.info(
f"RedisFlushWorker: Successfully processed {total_processed} keys "
f"({len(tag_updates)} tag updates, {len(manifest_updates)} manifest updates) "
f"in {elapsed_time:.2f}s"
)
else:
logger.warning(
f"RedisFlushWorker: Database flush failed, keeping {len(database_dependent_keys)} keys for retry"
)
except redis.RedisError as re:
logger.error(f"RedisFlushWorker: Redis error during pull metrics flush: {re}")
except Exception as e:
logger.error(f"RedisFlushWorker: Error during pull metrics flush: {e}")
def _scan_redis_keys(self, pattern: str, limit: int) -> List[str]:
"""
Scan Redis for keys matching the pattern.
Args:
pattern: Redis key pattern to match
limit: Maximum number of keys to return
Returns:
List of matching Redis keys
"""
try:
keys_set: Set[str] = set()
cursor = 0
while len(keys_set) < limit:
if self.redis_client is None:
break
cursor, batch_keys = self.redis_client.scan(
cursor=cursor, match=pattern, count=REDIS_SCAN_COUNT
)
if batch_keys:
# Add keys to set to automatically deduplicate
keys_set.update(batch_keys)
# Break if we've scanned through all keys
if cursor == 0:
break
# Convert set back to list and limit results
keys_list = list(keys_set)
return keys_list[:limit] # Ensure we don't exceed the limit
except redis.RedisError as re:
logger.error(f"RedisFlushWorker: Redis error during key scan: {re}")
return []
except Exception as e:
logger.error(f"RedisFlushWorker: Error scanning Redis keys: {e}")
return []
def _process_redis_events(
self, keys: List[str]
) -> Tuple[List[Dict], List[Dict], Set[str], Set[str]]:
"""
Process Redis events and aggregate data for database updates.
Args:
keys: List of Redis keys to process
Returns:
Tuple of (tag_updates, manifest_updates, cleanable_keys, database_dependent_keys)
"""
# Use dictionaries to aggregate updates by unique key
tag_updates_dict: Dict[Tuple[int, str], Dict] = {} # Key: (repository_id, tag_name)
manifest_updates_dict: Dict[
Tuple[int, str], Dict
] = {} # Key: (repository_id, manifest_digest)
cleanable_keys = set() # Keys that can always be cleaned up (empty/invalid)
database_dependent_keys = (
set()
) # Keys that should only be cleaned up after successful DB write
for key in keys:
try:
# Basic validation - ensure it's a pull_events key
if not key.startswith("pull_events:"):
continue
# Get the data from Redis
if self.redis_client is None:
continue
metrics_data = self.redis_client.hgetall(key)
if not metrics_data:
cleanable_keys.add(key) # Empty key, can always be cleaned up
continue
# Validate data before processing
if not self._validate_redis_key_data(key, metrics_data):
cleanable_keys.add(key) # Invalid key, can be cleaned up
continue
# Extract data from Redis hash
repository_id = int(metrics_data.get("repository_id", 0))
tag_name = metrics_data.get("tag_name", "")
manifest_digest = metrics_data.get("manifest_digest", "")
pull_count = int(metrics_data.get("pull_count", 0))
last_pull_timestamp = int(metrics_data.get("last_pull_timestamp", 0))
pull_method = metrics_data.get("pull_method", "")
if pull_count <= 0:
cleanable_keys.add(key) # No pulls, can always be cleaned up
continue
# Convert timestamp
pull_timestamp = (
datetime.fromtimestamp(last_pull_timestamp) if last_pull_timestamp > 0 else None
)
# Aggregate manifest stats (both tag and digest pulls)
manifest_key = (repository_id, manifest_digest)
if manifest_key in manifest_updates_dict:
# Aggregate with existing entry
existing = manifest_updates_dict[manifest_key]
existing["pull_count"] += pull_count
# Keep the latest timestamp
if pull_timestamp and existing["last_pull_timestamp"]:
if pull_timestamp > existing["last_pull_timestamp"]:
existing["last_pull_timestamp"] = pull_timestamp
elif pull_timestamp:
existing["last_pull_timestamp"] = pull_timestamp
else:
# New entry
manifest_updates_dict[manifest_key] = {
"repository_id": repository_id,
"manifest_digest": manifest_digest,
"pull_count": pull_count,
"last_pull_timestamp": pull_timestamp,
}
# Additionally aggregate tag stats for tag pulls
if pull_method == "tag" and tag_name:
tag_key = (repository_id, tag_name)
if tag_key in tag_updates_dict:
# Aggregate with existing entry
existing = tag_updates_dict[tag_key]
existing["pull_count"] += pull_count
# Keep the latest timestamp and manifest
if pull_timestamp and existing["last_pull_timestamp"]:
if pull_timestamp > existing["last_pull_timestamp"]:
existing["last_pull_timestamp"] = pull_timestamp
existing["manifest_digest"] = manifest_digest
elif pull_timestamp:
existing["last_pull_timestamp"] = pull_timestamp
existing["manifest_digest"] = manifest_digest
else:
# New entry
tag_updates_dict[tag_key] = {
"repository_id": repository_id,
"tag_name": tag_name,
"manifest_digest": manifest_digest,
"pull_count": pull_count,
"last_pull_timestamp": pull_timestamp,
}
# Mark this key for cleanup only after successful database write
# ToDo: add exception handling after database write implementation
database_dependent_keys.add(key)
except redis.RedisError as re:
logger.error(f"RedisFlushWorker: Redis error processing key {key}: {re}")
continue
except Exception as e:
logger.error(f"RedisFlushWorker: Error processing key {key}: {e}")
continue
# Convert aggregated dictionaries to lists
tag_updates = list(tag_updates_dict.values())
manifest_updates = list(manifest_updates_dict.values())
return tag_updates, manifest_updates, cleanable_keys, database_dependent_keys
def _flush_to_database(self, tag_updates: List[Dict], manifest_updates: List[Dict]) -> bool:
"""
Flush aggregated updates to the database.
Args:
tag_updates: List of tag update dictionaries
manifest_updates: List of manifest update dictionaries
Returns:
True if successful, False otherwise
"""
try:
tag_count = 0
manifest_count = 0
has_updates = bool(tag_updates or manifest_updates)
# Process tag updates
if tag_updates:
tag_count = bulk_upsert_tag_statistics(tag_updates)
logger.info(
f"RedisFlushWorker: Successfully updated {tag_count}/{len(tag_updates)} tag statistics"
)
# Process manifest updates
if manifest_updates:
manifest_count = bulk_upsert_manifest_statistics(manifest_updates)
logger.info(
f"RedisFlushWorker: Successfully updated {manifest_count}/{len(manifest_updates)} manifest statistics"
)
# Consider it successful if:
# 1. We processed some records, OR
# 2. There were no updates to process (empty batch)
return (tag_count > 0 or manifest_count > 0) or not has_updates
except PullStatisticsException as e:
logger.error(f"RedisFlushWorker: Pull statistics error during database flush: {e}")
return False
except Exception as e:
logger.error(f"RedisFlushWorker: Error flushing to database: {e}")
return False
def _cleanup_redis_keys(self, keys: Set[str]):
"""
Clean up Redis keys after successful database write.
Args:
keys: Set of Redis keys to delete
"""
if not keys:
return
try:
# Delete keys in batches to avoid blocking Redis
key_list = list(keys)
batch_size = 100
failed_deletions = []
for i in range(0, len(key_list), batch_size):
batch = key_list[i : i + batch_size]
if self.redis_client is not None:
try:
deleted_count = self.redis_client.delete(*batch)
if deleted_count != len(batch):
# Some keys may have been deleted by another process or expired
logger.debug(
f"RedisFlushWorker: Expected to delete {len(batch)} keys, actually deleted {deleted_count}"
)
else:
logger.debug(
f"RedisFlushWorker: Successfully deleted {deleted_count} Redis keys"
)
except Exception as batch_e:
logger.warning(
f"RedisFlushWorker: Failed to delete batch of keys: {batch_e}"
)
failed_deletions.extend(batch)
if failed_deletions:
logger.warning(
f"RedisFlushWorker: Failed to delete {len(failed_deletions)} Redis keys - they may be retried later"
)
except redis.RedisError as re:
logger.error(f"RedisFlushWorker: Redis error during key cleanup: {re}")
except Exception as e:
logger.error(f"RedisFlushWorker: Error cleaning up Redis keys: {e}")
def _validate_redis_key_data(self, key: str, metrics_data: Dict) -> bool:
"""
Validate Redis key data for consistency and completeness.
Args:
key: Redis key being processed
metrics_data: Data retrieved from Redis
Returns:
True if data is valid, False otherwise
"""
try:
# Check required fields
required_fields = [
"repository_id",
"manifest_digest",
"pull_count",
"last_pull_timestamp",
]
for field in required_fields:
if field not in metrics_data:
logger.debug(f"RedisFlushWorker: Key {key} missing required field: {field}")
return False
# Validate data types and ranges
repository_id = int(metrics_data.get("repository_id", 0))
pull_count = int(metrics_data.get("pull_count", 0))
last_pull_timestamp = int(metrics_data.get("last_pull_timestamp", 0))
if repository_id <= 0:
logger.debug(
f"RedisFlushWorker: Key {key} has invalid repository_id: {repository_id}"
)
return False
if pull_count < 0: # 0 is valid for cleanup
logger.debug(f"RedisFlushWorker: Key {key} has invalid pull_count: {pull_count}")
return False
if last_pull_timestamp < 0:
logger.debug(
f"RedisFlushWorker: Key {key} has invalid timestamp: {last_pull_timestamp}"
)
return False
manifest_digest = metrics_data.get("manifest_digest", "")
if not manifest_digest:
logger.debug(f"RedisFlushWorker: Key {key} missing manifest_digest")
return False
try:
Digest.parse_digest(manifest_digest)
except InvalidDigestException:
logger.debug(
f"RedisFlushWorker: Key {key} has invalid manifest_digest: {manifest_digest}"
)
return False
return True
except (ValueError, TypeError) as e:
logger.debug(f"RedisFlushWorker: Key {key} has invalid data format: {e}")
return False
def create_gunicorn_worker():
"""Create the Gunicorn worker instance."""
worker = GunicornWorker(__name__, app, RedisFlushWorker(), features.IMAGE_PULL_STATS)
return worker
if __name__ == "__main__":
if app.config.get("ACCOUNT_RECOVERY_MODE", False):
logger.debug("Quay running in account recovery mode")
while True:
time.sleep(100000)
# Check if Redis pull metrics feature is enabled
if not features.IMAGE_PULL_STATS:
logger.debug("Redis pull metrics disabled; skipping redisflushworker")
while True:
time.sleep(100000)
# Check if Redis is configured
if not app.config.get("PULL_METRICS_REDIS"):
logger.debug("PULL_METRICS_REDIS not configured; skipping redis flush worker")
while True:
time.sleep(100000)
if app.config.get("TESTING", False):
logger.debug("Skipping redis flush worker during tests")
while True:
time.sleep(100000)
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
worker = RedisFlushWorker()
worker.start()