diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index c86259011..1e46c5d67 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -134,7 +134,10 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref, registry_ metrics = pullmetrics.get_event() metrics.track_tag_pull(repository_ref, manifest_ref, manifest_digest) except Exception as e: - logger.debug("Could not track tag pull metrics: %s", e) + logger.warning( + "Could not track tag pull metrics: %s. " "Pull statistics may not be recorded.", + str(e), + ) return Response( manifest_bytes.as_unicode(), @@ -189,7 +192,11 @@ def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref, registry_m metrics = pullmetrics.get_event() metrics.track_manifest_pull(repository_ref, manifest_ref) except Exception as e: - logger.debug("Could not track manifest pull metrics: %s", e) + logger.warning( + "Could not track manifest pull metrics: %s. " + "Pull statistics may not be recorded.", + str(e), + ) return Response( manifest.internal_manifest_bytes.as_unicode(), diff --git a/util/pullmetrics.py b/util/pullmetrics.py index b9795c1bd..e86e914d8 100644 --- a/util/pullmetrics.py +++ b/util/pullmetrics.py @@ -1,4 +1,5 @@ import logging +import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone @@ -7,6 +8,9 @@ import redis logger = logging.getLogger(__name__) DEFAULT_PULL_METRICS_WORKER_COUNT = 5 +DEFAULT_REDIS_CONNECTION_TIMEOUT = 5 +DEFAULT_REDIS_RETRY_ATTEMPTS = 3 +DEFAULT_REDIS_RETRY_DELAY = 1.0 class CannotReadPullMetricsException(Exception): @@ -65,18 +69,92 @@ class PullMetricsBuilderModule(object): class PullMetrics(object): """ Defines a helper class for tracking pull metrics as backed by Redis. + + Uses lazy initialization for Redis connection to handle cases where Redis + may not be immediately available during application startup. """ def __init__(self, redis_config, max_workers=None): - self._redis = redis.StrictRedis(socket_connect_timeout=2, socket_timeout=2, **redis_config) + redis_config = (redis_config.copy() if redis_config else {}) or {} + + # Extract internal flags and connection settings (not passed to Redis) + testing_mode = redis_config.pop("_testing", False) + self._connection_timeout = redis_config.pop( + "socket_connect_timeout", DEFAULT_REDIS_CONNECTION_TIMEOUT + ) + self._socket_timeout = redis_config.pop("socket_timeout", DEFAULT_REDIS_CONNECTION_TIMEOUT) + self._retry_attempts = redis_config.pop("retry_attempts", DEFAULT_REDIS_RETRY_ATTEMPTS) + self._retry_delay = redis_config.pop("retry_delay", DEFAULT_REDIS_RETRY_DELAY) + + # Store only Redis connection parameters + self._redis_config = redis_config + self._redis = None + + # Initialize thread pool (skip in testing mode) worker_count = max_workers or DEFAULT_PULL_METRICS_WORKER_COUNT - # Skip thread pool creation during tests to avoid interference - if not redis_config.get("_testing", False): - self._executor = ThreadPoolExecutor( - max_workers=worker_count, thread_name_prefix="pullmetrics" - ) - else: - self._executor = None + self._executor = ( + None + if testing_mode + else ThreadPoolExecutor(max_workers=worker_count, thread_name_prefix="pullmetrics") + ) + + def _ensure_redis_connection(self): + """ + Ensure Redis connection is established with retry logic. + + Returns: + redis.StrictRedis: Connected Redis client + + Raises: + redis.RedisError: If connection fails after retries + """ + # If we have a working connection, return it + if self._redis is not None: + try: + # Quick health check - ping the connection + self._redis.ping() + return self._redis + except (redis.ConnectionError, redis.TimeoutError, AttributeError): + # Connection is broken, reset and reconnect + logger.debug("Redis connection lost, reconnecting...") + self._redis = None + + # Try to establish connection with retries + last_exception = None + for attempt in range(1, self._retry_attempts + 1): + try: + self._redis = redis.StrictRedis( + socket_connect_timeout=self._connection_timeout, + socket_timeout=self._socket_timeout, + **self._redis_config, + ) + self._redis.ping() + if attempt > 1: + logger.info( + "Redis connection established after %d attempt(s) for pull metrics", attempt + ) + return self._redis + except (redis.ConnectionError, redis.TimeoutError) as e: + last_exception = e + self._redis = None + if attempt < self._retry_attempts: + logger.warning( + "Redis connection attempt %d/%d failed for pull metrics: %s. Retrying in %.1fs...", + attempt, + self._retry_attempts, + str(e), + self._retry_delay, + ) + time.sleep(self._retry_delay) + else: + logger.error( + "Failed to connect to Redis after %d attempts for pull metrics: %s", + self._retry_attempts, + str(e), + ) + + # All retries failed + raise last_exception or redis.ConnectionError("Failed to connect to Redis for pull metrics") @staticmethod def _tag_pull_key(repository_id, tag_name, manifest_digest): @@ -109,6 +187,8 @@ class PullMetrics(object): tag_name: Name of the tag manifest_digest: Manifest digest """ + # Ensure Redis connection is available + redis_client = self._ensure_redis_connection() # Get repository_id if object is passed repository_id = repository_ref.id if hasattr(repository_ref, "id") else repository_ref @@ -119,7 +199,7 @@ class PullMetrics(object): manifest_key = self._manifest_pull_key(repository_id, manifest_digest) # Use Redis pipeline for atomic operations - pipe = self._redis.pipeline() + pipe = redis_client.pipeline() # Tag pull event - the worker will create both tag and manifest stats from this pipe.hset(tag_key, "repository_id", repository_id) @@ -148,8 +228,16 @@ class PullMetrics(object): def conduct(): try: self.track_tag_pull_sync(repository, tag_name, manifest_digest) - except redis.RedisError: - logger.exception("Could not track tag pull metrics") + except (redis.ConnectionError, redis.TimeoutError) as e: + logger.warning( + "Could not track tag pull metrics (connection error): %s. " + "Pull statistics may not be recorded until Redis is available.", + str(e), + ) + except redis.RedisError as e: + logger.warning("Could not track tag pull metrics (Redis error): %s", str(e)) + except Exception as e: + logger.exception("Unexpected error tracking tag pull metrics: %s", e) if self._executor: self._executor.submit(conduct) @@ -165,6 +253,8 @@ class PullMetrics(object): repository_ref: Repository object or repository_id manifest_digest: Manifest digest """ + # Ensure Redis connection is available + redis_client = self._ensure_redis_connection() # Get repository_id if object is passed repository_id = repository_ref.id if hasattr(repository_ref, "id") else repository_ref @@ -173,7 +263,7 @@ class PullMetrics(object): manifest_key = self._manifest_pull_key(repository_id, manifest_digest) - pipe = self._redis.pipeline() + pipe = redis_client.pipeline() pipe.hset(manifest_key, "repository_id", repository_id) pipe.hset(manifest_key, "manifest_digest", manifest_digest) pipe.hincrby(manifest_key, "pull_count", 1) @@ -193,8 +283,16 @@ class PullMetrics(object): def conduct(): try: self.track_manifest_pull_sync(repository, manifest_digest) - except redis.RedisError: - logger.exception("Could not track manifest pull metrics") + except (redis.ConnectionError, redis.TimeoutError) as e: + logger.warning( + "Could not track manifest pull metrics (connection error): %s. " + "Pull statistics may not be recorded until Redis is available.", + str(e), + ) + except redis.RedisError as e: + logger.warning("Could not track manifest pull metrics (Redis error): %s", str(e)) + except Exception as e: + logger.exception("Unexpected error tracking manifest pull metrics: %s", e) if self._executor: self._executor.submit(conduct) @@ -207,7 +305,8 @@ class PullMetrics(object): Get pull statistics for a given Redis key. """ try: - data = self._redis.hgetall(key) + redis_client = self._ensure_redis_connection() + data = redis_client.hgetall(key) if not data: return None @@ -225,8 +324,11 @@ class PullMetrics(object): result[key] = value return result + except (redis.ConnectionError, redis.TimeoutError) as e: + logger.warning("Could not get pull statistics (connection error): %s", str(e)) + return None except redis.RedisError as e: - logger.exception("Could not get pull statistics: %s", e) + logger.warning("Could not get pull statistics (Redis error): %s", str(e)) return None def get_tag_pull_statistics(self, repository_id, tag_name, manifest_digest): diff --git a/util/test/test_pullmetrics.py b/util/test/test_pullmetrics.py index e0fe661e9..00781f365 100644 --- a/util/test/test_pullmetrics.py +++ b/util/test/test_pullmetrics.py @@ -179,21 +179,142 @@ class TestPullMetrics: """Test PullMetrics class.""" def test_pullmetrics_initialization_testing_mode(self, mock_redis): - """Test PullMetrics initialization in testing mode.""" + """Test PullMetrics initialization in testing mode - lazy connection.""" redis_config = {"host": "localhost", "port": 6379, "_testing": True} pm = PullMetrics(redis_config) - assert pm._redis is not None + # With lazy initialization, Redis connection should be None until first use + assert pm._redis is None assert pm._executor is None # No thread pool in testing mode def test_pullmetrics_initialization_production_mode(self, mock_redis): - """Test PullMetrics initialization in production mode.""" + """Test PullMetrics initialization in production mode - lazy connection.""" redis_config = {"host": "localhost", "port": 6379} pm = PullMetrics(redis_config) - assert pm._redis is not None + # With lazy initialization, Redis connection should be None until first use + assert pm._redis is None assert pm._executor is not None # Thread pool in production mode + def test_lazy_redis_connection_on_first_use(self, mock_redis): + """Test that Redis connection is established on first use, not during init.""" + redis_config = {"host": "localhost", "port": 6379, "_testing": True} + pm = PullMetrics(redis_config) + + # Initially no connection + assert pm._redis is None + + # Mock ping to succeed + mock_redis.ping.return_value = True + mock_pipeline = MagicMock() + mock_redis.pipeline.return_value = mock_pipeline + + # First use should establish connection + repository = Mock() + repository.id = 123 + pm.track_tag_pull_sync(repository, "latest", "sha256:abc123") + + # Now connection should be established + assert pm._redis is not None + # Verify StrictRedis was called to create connection + from util.pullmetrics import redis + + redis.StrictRedis.assert_called() + + def test_redis_connection_retry_logic(self, mock_redis): + """Test that Redis connection retries on failure.""" + redis_config = {"host": "localhost", "port": 6379, "_testing": True, "retry_attempts": 3} + pm = PullMetrics(redis_config) + + # Mock connection failures then success + from util.pullmetrics import redis + + call_count = 0 + + def mock_redis_side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + mock_client = MagicMock() + if call_count < 3: + # First two attempts fail + mock_client.ping.side_effect = redis.ConnectionError("Connection failed") + else: + # Third attempt succeeds + mock_client.ping.return_value = True + return mock_client + + redis.StrictRedis.side_effect = mock_redis_side_effect + + repository = Mock() + repository.id = 123 + mock_pipeline = MagicMock() + + # After retries, should eventually succeed + with patch.object(pm, "_ensure_redis_connection") as mock_ensure: + mock_redis_client = MagicMock() + mock_redis_client.pipeline.return_value = mock_pipeline + mock_ensure.return_value = mock_redis_client + pm.track_tag_pull_sync(repository, "latest", "sha256:abc123") + + def test_redis_connection_health_check(self, mock_redis): + """Test that existing connection is health-checked before reuse.""" + redis_config = {"host": "localhost", "port": 6379, "_testing": True} + pm = PullMetrics(redis_config) + + # Establish initial connection + mock_redis.ping.return_value = True + mock_pipeline = MagicMock() + mock_redis.pipeline.return_value = mock_pipeline + + repository = Mock() + repository.id = 123 + pm.track_tag_pull_sync(repository, "latest", "sha256:abc123") + + # Connection should be established + assert pm._redis is not None + + # Verify ping was called for health check + # (ping is called in _ensure_redis_connection) + assert mock_redis.ping.called + + def test_redis_connection_reconnect_on_failure(self, mock_redis): + """Test that connection is re-established if health check fails.""" + redis_config = {"host": "localhost", "port": 6379, "_testing": True} + pm = PullMetrics(redis_config) + + # First connection succeeds + mock_redis.ping.return_value = True + mock_pipeline = MagicMock() + mock_redis.pipeline.return_value = mock_pipeline + + repository = Mock() + repository.id = 123 + pm.track_tag_pull_sync(repository, "latest", "sha256:abc123") + + # Simulate connection failure on health check + from util.pullmetrics import redis + + original_redis = pm._redis + # Use a callable side_effect that only fails on the first call (health check) + # Subsequent calls (reconnection) will succeed + ping_call_count = [0] + + def ping_side_effect(): + ping_call_count[0] += 1 + if ping_call_count[0] == 1: + # First call (health check) fails + raise redis.ConnectionError("Connection lost") + # Subsequent calls succeed + return True + + original_redis.ping.side_effect = ping_side_effect + # Note: side_effect takes precedence over return_value, so the callable + # side_effect handles both failure and success cases + pm.track_tag_pull_sync(repository, "latest", "sha256:abc123") + + # Should have attempted to reconnect + assert mock_redis.ping.call_count >= 2 + def test_track_tag_pull_sync(self, pull_metrics_testing, mock_redis): """Test synchronous tag pull tracking.""" # Setup @@ -202,13 +323,16 @@ class TestPullMetrics: tag_name = "latest" manifest_digest = "sha256:abc123" - # Mock pipeline + # Mock connection establishment (ping) and pipeline + mock_redis.ping.return_value = True mock_pipeline = MagicMock() mock_redis.pipeline.return_value = mock_pipeline # Execute pull_metrics_testing.track_tag_pull_sync(repository, tag_name, manifest_digest) + # Verify Redis connection was established (ping called) + assert mock_redis.ping.called # Verify Redis pipeline calls mock_redis.pipeline.assert_called_once() mock_pipeline.hset.assert_any_call( @@ -324,19 +448,22 @@ class TestPullMetrics: assert pm._executor is not None def test_track_tag_pull_redis_error_handling(self, pull_metrics_testing, mock_redis): - """Test tag pull tracking handles Redis errors gracefully.""" + """Test tag pull tracking handles Redis connection errors gracefully.""" repository = Mock() repository.id = 123 tag_name = "latest" manifest_digest = "sha256:abc123" - # Mock Redis to raise error - mock_redis.pipeline.side_effect = redis.RedisError("Connection failed") + # Mock connection to fail during health check + mock_redis.ping.side_effect = redis.ConnectionError("Connection failed") - # Execute - should not raise exception + # Execute - should not raise exception, should log warning with patch("util.pullmetrics.logger") as mock_logger: pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest) - mock_logger.exception.assert_called_once() + # Connection errors are logged as warnings, not exceptions + mock_logger.warning.assert_called() + # Verify no exception was logged (only warnings for connection errors) + mock_logger.exception.assert_not_called() def test_track_manifest_pull_async_testing_mode(self, pull_metrics_testing, mock_redis): """Test async manifest pull tracking in testing mode.""" @@ -361,13 +488,55 @@ class TestPullMetrics: repository.id = 789 manifest_digest = "sha256:xyz789" - # Mock Redis to raise error + # Ensure connection can be established (ping succeeds) + mock_redis.ping.return_value = True + # Mock pipeline to raise error when called mock_redis.pipeline.side_effect = redis.RedisError("Connection failed") - # Execute - should not raise exception + # Execute - should not raise exception, should log warning with patch("util.pullmetrics.logger") as mock_logger: pull_metrics_testing.track_manifest_pull(repository, manifest_digest) - mock_logger.exception.assert_called_once() + # Redis errors are logged as warnings, not exceptions + mock_logger.warning.assert_called() + # Verify no exception was logged (only warnings for Redis errors) + mock_logger.exception.assert_not_called() + + def test_track_tag_pull_timeout_error_handling(self, pull_metrics_testing, mock_redis): + """Test tag pull tracking handles Redis timeout errors gracefully.""" + repository = Mock() + repository.id = 123 + tag_name = "latest" + manifest_digest = "sha256:abc123" + + # Mock connection to fail with timeout during health check + mock_redis.ping.side_effect = redis.TimeoutError("Operation timed out") + + # Execute - should not raise exception, should log warning + with patch("util.pullmetrics.logger") as mock_logger: + pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest) + # Timeout errors are logged as warnings, not exceptions + mock_logger.warning.assert_called() + mock_logger.exception.assert_not_called() + + def test_track_manifest_pull_pipeline_execute_error(self, pull_metrics_testing, mock_redis): + """Test manifest pull tracking handles pipeline execute errors gracefully.""" + repository = Mock() + repository.id = 789 + manifest_digest = "sha256:xyz789" + + # Ensure connection can be established (ping succeeds) + mock_redis.ping.return_value = True + # Mock pipeline and make execute() raise error + mock_pipeline = MagicMock() + mock_pipeline.execute.side_effect = redis.RedisError("Pipeline execution failed") + mock_redis.pipeline.return_value = mock_pipeline + + # Execute - should not raise exception, should log warning + with patch("util.pullmetrics.logger") as mock_logger: + pull_metrics_testing.track_manifest_pull(repository, manifest_digest) + # Redis errors are logged as warnings, not exceptions + mock_logger.warning.assert_called() + mock_logger.exception.assert_not_called() def test_get_pull_statistics_success(self, pull_metrics_testing, mock_redis): """Test retrieving pull statistics from Redis.""" @@ -410,16 +579,20 @@ class TestPullMetrics: """Test pull statistics retrieval handles Redis errors.""" key = "pull_events:repo:123:tag:latest:sha256:abc123" - # Mock Redis to raise error + # Ensure connection can be established (ping succeeds) + mock_redis.ping.return_value = True + # Mock hgetall to raise error when called mock_redis.hgetall.side_effect = redis.RedisError("Connection failed") # Execute with patch("util.pullmetrics.logger") as mock_logger: result = pull_metrics_testing._get_pull_statistics(key) - # Should return None and log error + # Should return None and log warning (Redis errors are logged as warnings) assert result is None - mock_logger.exception.assert_called_once() + mock_logger.warning.assert_called() + # Verify no exception was logged + mock_logger.exception.assert_not_called() def test_get_tag_pull_statistics(self, pull_metrics_testing, mock_redis): """Test get_tag_pull_statistics method."""