""" Redis Pull Metrics Flush Worker Background worker that periodically flushes pull metrics from Redis to the database. Processes Redis keys matching patterns: - pull_events:repo:*:tag:*:* - pull_events:repo:*:digest:* Converts Redis pull events into persistent TagPullStatistics and ManifestPullStatistics records. """ import logging.config import time import uuid from datetime import datetime from typing import Dict, List, Set, Tuple import redis import features from app import app from data.model.pull_statistics import ( PullStatisticsException, bulk_upsert_manifest_statistics, bulk_upsert_tag_statistics, ) from digest.digest_tools import Digest, InvalidDigestException from util.log import logfile_path from workers.gunicorn_worker import GunicornWorker from workers.worker import Worker logger = logging.getLogger(__name__) # Configuration constants POLL_PERIOD = app.config.get("REDIS_FLUSH_INTERVAL_SECONDS", 300) # 5 minutes BATCH_SIZE = app.config.get("REDIS_FLUSH_WORKER_BATCH_SIZE", 1000) REDIS_SCAN_COUNT = app.config.get("REDIS_FLUSH_WORKER_SCAN_COUNT", 100) # RENAME atomically claims the key, then we delete only after successful DB write # This prevents data loss if database flush fails class RedisFlushWorker(Worker): """ Worker that flushes pull metrics from Redis to database tables. This worker: 1. Scans Redis for pull_events:* keys 2. Processes pull events and aggregates data 3. Performs bulk upserts to TagPullStatistics and ManifestPullStatistics tables 4. Cleans up Redis keys after successful database writes """ def __init__(self): super(RedisFlushWorker, self).__init__() self.redis_client = None self._initialize_redis_client() self.add_operation(self._flush_pull_metrics, POLL_PERIOD) def _initialize_redis_client(self): """Initialize Redis client for pull metrics.""" try: redis_config = app.config.get("PULL_METRICS_REDIS", {}) redis_host = redis_config.get("host", "localhost") redis_port = redis_config.get("port", 6379) redis_db = redis_config.get("db", 1) redis_password = redis_config.get("password") redis_connection_timeout = app.config.get("REDIS_CONNECTION_TIMEOUT", 5) # Create Redis client self.redis_client = redis.StrictRedis( host=redis_host, port=redis_port, db=redis_db, password=redis_password, decode_responses=True, socket_connect_timeout=redis_connection_timeout, socket_timeout=redis_connection_timeout, ) # Test connection self.redis_client.ping() logger.info("RedisFlushWorker: Initialized Redis client for pull metrics") except redis.ConnectionError: logger.warning("RedisFlushWorker: Redis connection failed (will retry)") self.redis_client = None except redis.RedisError as re: logger.error(f"RedisFlushWorker: Redis initialization error: {re}") self.redis_client = None except Exception as e: logger.error(f"RedisFlushWorker: Failed to initialize Redis client: {e}") self.redis_client = None def _flush_pull_metrics(self): """Main method to flush pull metrics from Redis to database.""" if not self.redis_client: logger.warning("RedisFlushWorker: Redis client not initialized, skipping flush") return try: logger.debug("RedisFlushWorker: Starting pull metrics flush") start_time = time.time() all_keys = self._scan_redis_keys("pull_events:*", BATCH_SIZE) # Deduplicate keys (scan might return duplicates) all_keys = list(set(all_keys)) # Separate regular keys from orphaned processing keys pull_event_keys = [key for key in all_keys if ":processing:" not in key] orphaned_processing_keys = [key for key in all_keys if ":processing:" in key] # Process orphaned processing keys (from previous failed flushes) # These should not be deleted and be reprocessed in next cycle to avoid data loss if orphaned_processing_keys: logger.info( f"RedisFlushWorker: Found {len(orphaned_processing_keys)} orphaned processing keys, will process them" ) pull_event_keys.extend(orphaned_processing_keys) if not pull_event_keys: logger.debug("RedisFlushWorker: No pull event keys found") return logger.info(f"RedisFlushWorker: Processing {len(pull_event_keys)} Redis keys") # Process keys and aggregate data ( tag_updates, manifest_updates, database_dependent_keys, ) = self._process_redis_events(pull_event_keys) # Perform bulk database operations success = self._flush_to_database(tag_updates, manifest_updates) if success: # Clean up Redis keys after successful database writes self._cleanup_redis_keys(database_dependent_keys) elapsed_time = time.time() - start_time total_processed = len(database_dependent_keys) logger.info( f"RedisFlushWorker: Successfully processed {total_processed} keys " f"({len(tag_updates)} tag updates, {len(manifest_updates)} manifest updates) " f"in {elapsed_time:.2f}s" ) else: logger.warning( f"RedisFlushWorker: Database flush failed, keeping {len(database_dependent_keys)} keys for retry" ) except redis.RedisError as re: logger.error(f"RedisFlushWorker: Redis error during pull metrics flush: {re}") except Exception as e: logger.error(f"RedisFlushWorker: Error during pull metrics flush: {e}") def _scan_redis_keys(self, pattern: str, limit: int) -> List[str]: """ Scan Redis for keys matching the pattern. Args: pattern: Redis key pattern to match limit: Maximum number of keys to return Returns: List of matching Redis keys """ try: keys_set: Set[str] = set() cursor = 0 while len(keys_set) < limit: if self.redis_client is None: break cursor, batch_keys = self.redis_client.scan( cursor=cursor, match=pattern, count=REDIS_SCAN_COUNT ) if batch_keys: # Add keys to set to automatically deduplicate keys_set.update(batch_keys) # Break if we've scanned through all keys if cursor == 0: break # Convert set back to list and limit results keys_list = list(keys_set) return keys_list[:limit] # Ensure we don't exceed the limit except redis.RedisError as re: logger.error(f"RedisFlushWorker: Redis error during key scan: {re}") return [] except Exception as e: logger.error(f"RedisFlushWorker: Error scanning Redis keys: {e}") return [] def _process_redis_events(self, keys: List[str]) -> Tuple[List[Dict], List[Dict], Set[str]]: """ Process Redis events and aggregate data for database updates. Args: keys: List of Redis keys to process Returns: Tuple of (tag_updates, manifest_updates, database_dependent_keys) """ # Use dictionaries to aggregate updates by unique key tag_updates_dict: Dict[Tuple[int, str], Dict] = {} # Key: (repository_id, tag_name) manifest_updates_dict: Dict[ Tuple[int, str], Dict ] = {} # Key: (repository_id, manifest_digest) database_dependent_keys = ( set() ) # Keys that should only be cleaned up after successful DB write for key in keys: try: # Basic validation - ensure it's a pull_events key if not key.startswith("pull_events:"): continue # ATOMIC CLAIM: Use RENAME to atomically claim the key # This prevents race conditions while preserving data for retry if DB flush fails if self.redis_client is None: logger.warning("RedisFlushWorker: Redis client is None, skipping key") continue # Check if this is already a processing key (orphaned from previous run) is_processing_key = ":processing:" in key if is_processing_key: # This is an orphaned processing key - read it directly processing_key = key metrics_data = self.redis_client.hgetall(processing_key) else: # Atomically rename the key to a processing namespace # This prevents new increments from affecting our read # Use timestamp + UUID to ensure uniqueness even if multiple workers process same key simultaneously processing_key = ( f"{key}:processing:{int(time.time() * 1000)}:{uuid.uuid4().hex[:8]}" ) try: # New pulls will create a new key, which will be processed in next cycle self.redis_client.rename(key, processing_key) except redis.ResponseError as e: # RENAME raises ResponseError if source key doesn't exist error_msg = str(e).lower() if "no such key" in error_msg or "no such file" in error_msg: # Key doesn't exist (already processed or never created) continue else: logger.warning(f"RedisFlushWorker: RENAME failed for key {key}: {e}") continue # the original key is deleted and new pulls create new keys this avoids race conditions metrics_data = self.redis_client.hgetall(processing_key) if not metrics_data: # Clean up the processing key immediately self.redis_client.delete(processing_key) continue # Validate data before processing if not self._validate_redis_key_data(key, metrics_data): # Invalid key, clean up the processing key immediately logger.warning(f"RedisFlushWorker: Key {key} failed validation, cleaning up") self.redis_client.delete(processing_key) continue # Extract data from Redis hash repository_id = int(metrics_data.get("repository_id", 0)) tag_name = metrics_data.get("tag_name", "") manifest_digest = metrics_data.get("manifest_digest", "") pull_count = int(metrics_data.get("pull_count", 0)) last_pull_timestamp = int(metrics_data.get("last_pull_timestamp", 0)) pull_method = metrics_data.get("pull_method", "") if pull_count <= 0: # No pulls, clean up the processing key immediately self.redis_client.delete(processing_key) continue # Convert timestamp pull_timestamp = ( datetime.fromtimestamp(last_pull_timestamp) if last_pull_timestamp > 0 else None ) # Aggregate manifest stats (both tag and digest pulls) manifest_key = (repository_id, manifest_digest) if manifest_key in manifest_updates_dict: # Aggregate with existing entry existing = manifest_updates_dict[manifest_key] existing["pull_count"] += pull_count # Keep the latest timestamp if pull_timestamp and existing["last_pull_timestamp"]: if pull_timestamp > existing["last_pull_timestamp"]: existing["last_pull_timestamp"] = pull_timestamp elif pull_timestamp: existing["last_pull_timestamp"] = pull_timestamp else: # New entry manifest_updates_dict[manifest_key] = { "repository_id": repository_id, "manifest_digest": manifest_digest, "pull_count": pull_count, "last_pull_timestamp": pull_timestamp, } # Additionally aggregate tag stats for tag pulls if pull_method == "tag" and tag_name: tag_key = (repository_id, tag_name) if tag_key in tag_updates_dict: # Aggregate with existing entry existing = tag_updates_dict[tag_key] existing["pull_count"] += pull_count # Keep the latest timestamp and manifest if pull_timestamp and existing["last_pull_timestamp"]: if pull_timestamp > existing["last_pull_timestamp"]: existing["last_pull_timestamp"] = pull_timestamp existing["manifest_digest"] = manifest_digest elif pull_timestamp: existing["last_pull_timestamp"] = pull_timestamp existing["manifest_digest"] = manifest_digest else: # New entry tag_updates_dict[tag_key] = { "repository_id": repository_id, "tag_name": tag_name, "manifest_digest": manifest_digest, "pull_count": pull_count, "last_pull_timestamp": pull_timestamp, } # Mark processing key for cleanup after successful database write # This ensures we don't lose data if DB flush fails (key can be retried) database_dependent_keys.add(processing_key) except redis.RedisError as re: logger.error(f"RedisFlushWorker: Redis error processing key {key}: {re}") continue except Exception as e: logger.error(f"RedisFlushWorker: Error processing key {key}: {e}") continue # Convert aggregated dictionaries to lists tag_updates = list(tag_updates_dict.values()) manifest_updates = list(manifest_updates_dict.values()) return tag_updates, manifest_updates, database_dependent_keys def _flush_to_database(self, tag_updates: List[Dict], manifest_updates: List[Dict]) -> bool: """ Flush aggregated updates to the database. Args: tag_updates: List of tag update dictionaries manifest_updates: List of manifest update dictionaries Returns: True if successful, False otherwise """ try: tag_count = 0 manifest_count = 0 has_updates = bool(tag_updates or manifest_updates) # Process tag updates if tag_updates: tag_count = bulk_upsert_tag_statistics(tag_updates) logger.info( f"RedisFlushWorker: Successfully updated {tag_count}/{len(tag_updates)} tag statistics" ) # Process manifest updates if manifest_updates: manifest_count = bulk_upsert_manifest_statistics(manifest_updates) logger.info( f"RedisFlushWorker: Successfully updated {manifest_count}/{len(manifest_updates)} manifest statistics" ) # Consider it successful if: # 1. We processed some records, OR # 2. There were no updates to process (empty batch) return (tag_count > 0 or manifest_count > 0) or not has_updates except PullStatisticsException as e: logger.error(f"RedisFlushWorker: Pull statistics error during database flush: {e}") return False except Exception as e: logger.error(f"RedisFlushWorker: Error flushing to database: {e}") return False def _cleanup_redis_keys(self, keys: Set[str]): """ Clean up Redis keys after successful database write. Args: keys: Set of Redis keys to delete """ if not keys: return try: # Delete keys in batches to avoid blocking Redis key_list = list(keys) batch_size = 100 failed_deletions = [] for i in range(0, len(key_list), batch_size): batch = key_list[i : i + batch_size] if self.redis_client is not None: try: deleted_count = self.redis_client.delete(*batch) if deleted_count != len(batch): # Some keys may have been deleted by another process or expired logger.debug( f"RedisFlushWorker: Expected to delete {len(batch)} keys, actually deleted {deleted_count}" ) else: logger.debug( f"RedisFlushWorker: Successfully deleted {deleted_count} Redis keys" ) except Exception as batch_e: logger.warning( f"RedisFlushWorker: Failed to delete batch of keys: {batch_e}" ) failed_deletions.extend(batch) if failed_deletions: logger.warning( f"RedisFlushWorker: Failed to delete {len(failed_deletions)} Redis keys - they may be retried later" ) except redis.RedisError as re: logger.error(f"RedisFlushWorker: Redis error during key cleanup: {re}") except Exception as e: logger.error(f"RedisFlushWorker: Error cleaning up Redis keys: {e}") def _validate_redis_key_data(self, key: str, metrics_data: Dict) -> bool: """ Validate Redis key data for consistency and completeness. Args: key: Redis key being processed metrics_data: Data retrieved from Redis Returns: True if data is valid, False otherwise """ try: # Check required fields required_fields = [ "repository_id", "manifest_digest", "pull_count", "last_pull_timestamp", ] for field in required_fields: if field not in metrics_data: logger.debug(f"RedisFlushWorker: Key {key} missing required field: {field}") return False # Validate data types and ranges repository_id = int(metrics_data.get("repository_id", 0)) pull_count = int(metrics_data.get("pull_count", 0)) last_pull_timestamp = int(metrics_data.get("last_pull_timestamp", 0)) if repository_id <= 0: logger.debug( f"RedisFlushWorker: Key {key} has invalid repository_id: {repository_id}" ) return False if pull_count < 0: # 0 is valid for cleanup logger.debug(f"RedisFlushWorker: Key {key} has invalid pull_count: {pull_count}") return False if last_pull_timestamp < 0: logger.debug( f"RedisFlushWorker: Key {key} has invalid timestamp: {last_pull_timestamp}" ) return False manifest_digest = metrics_data.get("manifest_digest", "") if not manifest_digest: logger.debug(f"RedisFlushWorker: Key {key} missing manifest_digest") return False try: Digest.parse_digest(manifest_digest) except InvalidDigestException: logger.debug( f"RedisFlushWorker: Key {key} has invalid manifest_digest: {manifest_digest}" ) return False return True except (ValueError, TypeError) as e: logger.debug(f"RedisFlushWorker: Key {key} has invalid data format: {e}") return False def create_gunicorn_worker(): """Create the Gunicorn worker instance.""" worker = GunicornWorker(__name__, app, RedisFlushWorker(), features.IMAGE_PULL_STATS) return worker if __name__ == "__main__": if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) # Check if Redis pull metrics feature is enabled if not features.IMAGE_PULL_STATS: logger.debug("Redis pull metrics disabled; skipping redisflushworker") while True: time.sleep(100000) # Check if Redis is configured if not app.config.get("PULL_METRICS_REDIS"): logger.debug("PULL_METRICS_REDIS not configured; skipping redis flush worker") while True: time.sleep(100000) if app.config.get("TESTING", False): logger.debug("Skipping redis flush worker during tests") while True: time.sleep(100000) logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) worker = RedisFlushWorker() worker.start()