1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

fix: Added lazy initialization for redis to retry the connection after pod restart automatically connect when available (PROJQUAY-9791) (#4538)

* Fix: Add lazy Redis connection with retry logic for pull metrics

- Implement lazy initialization to prevent startup failures when Redis unavailable
- Add retry logic (3 attempts, 1s delay) for automatic reconnection
- Add health checks before each Redis operation
- Improve error logging from DEBUG to WARNING level
- Fix silent failures after pod restart when Redis not immediately available

This fixes the issue where pull statistics tracking was permanently broken
after registry component restart if Redis wasn't available at startup.

---------

Co-authored-by: shudeshp <shudeshp@redhat.com>
This commit is contained in:
Shubhra Deshpande
2025-11-18 01:15:37 -05:00
committed by GitHub
parent a123b72d02
commit fb9bedc91c
3 changed files with 316 additions and 34 deletions

View File

@@ -134,7 +134,10 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref, registry_
metrics = pullmetrics.get_event()
metrics.track_tag_pull(repository_ref, manifest_ref, manifest_digest)
except Exception as e:
logger.debug("Could not track tag pull metrics: %s", e)
logger.warning(
"Could not track tag pull metrics: %s. " "Pull statistics may not be recorded.",
str(e),
)
return Response(
manifest_bytes.as_unicode(),
@@ -189,7 +192,11 @@ def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref, registry_m
metrics = pullmetrics.get_event()
metrics.track_manifest_pull(repository_ref, manifest_ref)
except Exception as e:
logger.debug("Could not track manifest pull metrics: %s", e)
logger.warning(
"Could not track manifest pull metrics: %s. "
"Pull statistics may not be recorded.",
str(e),
)
return Response(
manifest.internal_manifest_bytes.as_unicode(),

View File

@@ -1,4 +1,5 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
@@ -7,6 +8,9 @@ import redis
logger = logging.getLogger(__name__)
DEFAULT_PULL_METRICS_WORKER_COUNT = 5
DEFAULT_REDIS_CONNECTION_TIMEOUT = 5
DEFAULT_REDIS_RETRY_ATTEMPTS = 3
DEFAULT_REDIS_RETRY_DELAY = 1.0
class CannotReadPullMetricsException(Exception):
@@ -65,18 +69,92 @@ class PullMetricsBuilderModule(object):
class PullMetrics(object):
"""
Defines a helper class for tracking pull metrics as backed by Redis.
Uses lazy initialization for Redis connection to handle cases where Redis
may not be immediately available during application startup.
"""
def __init__(self, redis_config, max_workers=None):
self._redis = redis.StrictRedis(socket_connect_timeout=2, socket_timeout=2, **redis_config)
redis_config = (redis_config.copy() if redis_config else {}) or {}
# Extract internal flags and connection settings (not passed to Redis)
testing_mode = redis_config.pop("_testing", False)
self._connection_timeout = redis_config.pop(
"socket_connect_timeout", DEFAULT_REDIS_CONNECTION_TIMEOUT
)
self._socket_timeout = redis_config.pop("socket_timeout", DEFAULT_REDIS_CONNECTION_TIMEOUT)
self._retry_attempts = redis_config.pop("retry_attempts", DEFAULT_REDIS_RETRY_ATTEMPTS)
self._retry_delay = redis_config.pop("retry_delay", DEFAULT_REDIS_RETRY_DELAY)
# Store only Redis connection parameters
self._redis_config = redis_config
self._redis = None
# Initialize thread pool (skip in testing mode)
worker_count = max_workers or DEFAULT_PULL_METRICS_WORKER_COUNT
# Skip thread pool creation during tests to avoid interference
if not redis_config.get("_testing", False):
self._executor = ThreadPoolExecutor(
max_workers=worker_count, thread_name_prefix="pullmetrics"
)
else:
self._executor = None
self._executor = (
None
if testing_mode
else ThreadPoolExecutor(max_workers=worker_count, thread_name_prefix="pullmetrics")
)
def _ensure_redis_connection(self):
"""
Ensure Redis connection is established with retry logic.
Returns:
redis.StrictRedis: Connected Redis client
Raises:
redis.RedisError: If connection fails after retries
"""
# If we have a working connection, return it
if self._redis is not None:
try:
# Quick health check - ping the connection
self._redis.ping()
return self._redis
except (redis.ConnectionError, redis.TimeoutError, AttributeError):
# Connection is broken, reset and reconnect
logger.debug("Redis connection lost, reconnecting...")
self._redis = None
# Try to establish connection with retries
last_exception = None
for attempt in range(1, self._retry_attempts + 1):
try:
self._redis = redis.StrictRedis(
socket_connect_timeout=self._connection_timeout,
socket_timeout=self._socket_timeout,
**self._redis_config,
)
self._redis.ping()
if attempt > 1:
logger.info(
"Redis connection established after %d attempt(s) for pull metrics", attempt
)
return self._redis
except (redis.ConnectionError, redis.TimeoutError) as e:
last_exception = e
self._redis = None
if attempt < self._retry_attempts:
logger.warning(
"Redis connection attempt %d/%d failed for pull metrics: %s. Retrying in %.1fs...",
attempt,
self._retry_attempts,
str(e),
self._retry_delay,
)
time.sleep(self._retry_delay)
else:
logger.error(
"Failed to connect to Redis after %d attempts for pull metrics: %s",
self._retry_attempts,
str(e),
)
# All retries failed
raise last_exception or redis.ConnectionError("Failed to connect to Redis for pull metrics")
@staticmethod
def _tag_pull_key(repository_id, tag_name, manifest_digest):
@@ -109,6 +187,8 @@ class PullMetrics(object):
tag_name: Name of the tag
manifest_digest: Manifest digest
"""
# Ensure Redis connection is available
redis_client = self._ensure_redis_connection()
# Get repository_id if object is passed
repository_id = repository_ref.id if hasattr(repository_ref, "id") else repository_ref
@@ -119,7 +199,7 @@ class PullMetrics(object):
manifest_key = self._manifest_pull_key(repository_id, manifest_digest)
# Use Redis pipeline for atomic operations
pipe = self._redis.pipeline()
pipe = redis_client.pipeline()
# Tag pull event - the worker will create both tag and manifest stats from this
pipe.hset(tag_key, "repository_id", repository_id)
@@ -148,8 +228,16 @@ class PullMetrics(object):
def conduct():
try:
self.track_tag_pull_sync(repository, tag_name, manifest_digest)
except redis.RedisError:
logger.exception("Could not track tag pull metrics")
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(
"Could not track tag pull metrics (connection error): %s. "
"Pull statistics may not be recorded until Redis is available.",
str(e),
)
except redis.RedisError as e:
logger.warning("Could not track tag pull metrics (Redis error): %s", str(e))
except Exception as e:
logger.exception("Unexpected error tracking tag pull metrics: %s", e)
if self._executor:
self._executor.submit(conduct)
@@ -165,6 +253,8 @@ class PullMetrics(object):
repository_ref: Repository object or repository_id
manifest_digest: Manifest digest
"""
# Ensure Redis connection is available
redis_client = self._ensure_redis_connection()
# Get repository_id if object is passed
repository_id = repository_ref.id if hasattr(repository_ref, "id") else repository_ref
@@ -173,7 +263,7 @@ class PullMetrics(object):
manifest_key = self._manifest_pull_key(repository_id, manifest_digest)
pipe = self._redis.pipeline()
pipe = redis_client.pipeline()
pipe.hset(manifest_key, "repository_id", repository_id)
pipe.hset(manifest_key, "manifest_digest", manifest_digest)
pipe.hincrby(manifest_key, "pull_count", 1)
@@ -193,8 +283,16 @@ class PullMetrics(object):
def conduct():
try:
self.track_manifest_pull_sync(repository, manifest_digest)
except redis.RedisError:
logger.exception("Could not track manifest pull metrics")
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(
"Could not track manifest pull metrics (connection error): %s. "
"Pull statistics may not be recorded until Redis is available.",
str(e),
)
except redis.RedisError as e:
logger.warning("Could not track manifest pull metrics (Redis error): %s", str(e))
except Exception as e:
logger.exception("Unexpected error tracking manifest pull metrics: %s", e)
if self._executor:
self._executor.submit(conduct)
@@ -207,7 +305,8 @@ class PullMetrics(object):
Get pull statistics for a given Redis key.
"""
try:
data = self._redis.hgetall(key)
redis_client = self._ensure_redis_connection()
data = redis_client.hgetall(key)
if not data:
return None
@@ -225,8 +324,11 @@ class PullMetrics(object):
result[key] = value
return result
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning("Could not get pull statistics (connection error): %s", str(e))
return None
except redis.RedisError as e:
logger.exception("Could not get pull statistics: %s", e)
logger.warning("Could not get pull statistics (Redis error): %s", str(e))
return None
def get_tag_pull_statistics(self, repository_id, tag_name, manifest_digest):

View File

@@ -179,21 +179,142 @@ class TestPullMetrics:
"""Test PullMetrics class."""
def test_pullmetrics_initialization_testing_mode(self, mock_redis):
"""Test PullMetrics initialization in testing mode."""
"""Test PullMetrics initialization in testing mode - lazy connection."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
assert pm._redis is not None
# With lazy initialization, Redis connection should be None until first use
assert pm._redis is None
assert pm._executor is None # No thread pool in testing mode
def test_pullmetrics_initialization_production_mode(self, mock_redis):
"""Test PullMetrics initialization in production mode."""
"""Test PullMetrics initialization in production mode - lazy connection."""
redis_config = {"host": "localhost", "port": 6379}
pm = PullMetrics(redis_config)
assert pm._redis is not None
# With lazy initialization, Redis connection should be None until first use
assert pm._redis is None
assert pm._executor is not None # Thread pool in production mode
def test_lazy_redis_connection_on_first_use(self, mock_redis):
"""Test that Redis connection is established on first use, not during init."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# Initially no connection
assert pm._redis is None
# Mock ping to succeed
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
# First use should establish connection
repository = Mock()
repository.id = 123
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Now connection should be established
assert pm._redis is not None
# Verify StrictRedis was called to create connection
from util.pullmetrics import redis
redis.StrictRedis.assert_called()
def test_redis_connection_retry_logic(self, mock_redis):
"""Test that Redis connection retries on failure."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True, "retry_attempts": 3}
pm = PullMetrics(redis_config)
# Mock connection failures then success
from util.pullmetrics import redis
call_count = 0
def mock_redis_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
mock_client = MagicMock()
if call_count < 3:
# First two attempts fail
mock_client.ping.side_effect = redis.ConnectionError("Connection failed")
else:
# Third attempt succeeds
mock_client.ping.return_value = True
return mock_client
redis.StrictRedis.side_effect = mock_redis_side_effect
repository = Mock()
repository.id = 123
mock_pipeline = MagicMock()
# After retries, should eventually succeed
with patch.object(pm, "_ensure_redis_connection") as mock_ensure:
mock_redis_client = MagicMock()
mock_redis_client.pipeline.return_value = mock_pipeline
mock_ensure.return_value = mock_redis_client
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
def test_redis_connection_health_check(self, mock_redis):
"""Test that existing connection is health-checked before reuse."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# Establish initial connection
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
repository = Mock()
repository.id = 123
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Connection should be established
assert pm._redis is not None
# Verify ping was called for health check
# (ping is called in _ensure_redis_connection)
assert mock_redis.ping.called
def test_redis_connection_reconnect_on_failure(self, mock_redis):
"""Test that connection is re-established if health check fails."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# First connection succeeds
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
repository = Mock()
repository.id = 123
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Simulate connection failure on health check
from util.pullmetrics import redis
original_redis = pm._redis
# Use a callable side_effect that only fails on the first call (health check)
# Subsequent calls (reconnection) will succeed
ping_call_count = [0]
def ping_side_effect():
ping_call_count[0] += 1
if ping_call_count[0] == 1:
# First call (health check) fails
raise redis.ConnectionError("Connection lost")
# Subsequent calls succeed
return True
original_redis.ping.side_effect = ping_side_effect
# Note: side_effect takes precedence over return_value, so the callable
# side_effect handles both failure and success cases
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Should have attempted to reconnect
assert mock_redis.ping.call_count >= 2
def test_track_tag_pull_sync(self, pull_metrics_testing, mock_redis):
"""Test synchronous tag pull tracking."""
# Setup
@@ -202,13 +323,16 @@ class TestPullMetrics:
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock pipeline
# Mock connection establishment (ping) and pipeline
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
# Execute
pull_metrics_testing.track_tag_pull_sync(repository, tag_name, manifest_digest)
# Verify Redis connection was established (ping called)
assert mock_redis.ping.called
# Verify Redis pipeline calls
mock_redis.pipeline.assert_called_once()
mock_pipeline.hset.assert_any_call(
@@ -324,19 +448,22 @@ class TestPullMetrics:
assert pm._executor is not None
def test_track_tag_pull_redis_error_handling(self, pull_metrics_testing, mock_redis):
"""Test tag pull tracking handles Redis errors gracefully."""
"""Test tag pull tracking handles Redis connection errors gracefully."""
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock Redis to raise error
mock_redis.pipeline.side_effect = redis.RedisError("Connection failed")
# Mock connection to fail during health check
mock_redis.ping.side_effect = redis.ConnectionError("Connection failed")
# Execute - should not raise exception
# Execute - should not raise exception, should log warning
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest)
mock_logger.exception.assert_called_once()
# Connection errors are logged as warnings, not exceptions
mock_logger.warning.assert_called()
# Verify no exception was logged (only warnings for connection errors)
mock_logger.exception.assert_not_called()
def test_track_manifest_pull_async_testing_mode(self, pull_metrics_testing, mock_redis):
"""Test async manifest pull tracking in testing mode."""
@@ -361,13 +488,55 @@ class TestPullMetrics:
repository.id = 789
manifest_digest = "sha256:xyz789"
# Mock Redis to raise error
# Ensure connection can be established (ping succeeds)
mock_redis.ping.return_value = True
# Mock pipeline to raise error when called
mock_redis.pipeline.side_effect = redis.RedisError("Connection failed")
# Execute - should not raise exception
# Execute - should not raise exception, should log warning
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_manifest_pull(repository, manifest_digest)
mock_logger.exception.assert_called_once()
# Redis errors are logged as warnings, not exceptions
mock_logger.warning.assert_called()
# Verify no exception was logged (only warnings for Redis errors)
mock_logger.exception.assert_not_called()
def test_track_tag_pull_timeout_error_handling(self, pull_metrics_testing, mock_redis):
"""Test tag pull tracking handles Redis timeout errors gracefully."""
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock connection to fail with timeout during health check
mock_redis.ping.side_effect = redis.TimeoutError("Operation timed out")
# Execute - should not raise exception, should log warning
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest)
# Timeout errors are logged as warnings, not exceptions
mock_logger.warning.assert_called()
mock_logger.exception.assert_not_called()
def test_track_manifest_pull_pipeline_execute_error(self, pull_metrics_testing, mock_redis):
"""Test manifest pull tracking handles pipeline execute errors gracefully."""
repository = Mock()
repository.id = 789
manifest_digest = "sha256:xyz789"
# Ensure connection can be established (ping succeeds)
mock_redis.ping.return_value = True
# Mock pipeline and make execute() raise error
mock_pipeline = MagicMock()
mock_pipeline.execute.side_effect = redis.RedisError("Pipeline execution failed")
mock_redis.pipeline.return_value = mock_pipeline
# Execute - should not raise exception, should log warning
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_manifest_pull(repository, manifest_digest)
# Redis errors are logged as warnings, not exceptions
mock_logger.warning.assert_called()
mock_logger.exception.assert_not_called()
def test_get_pull_statistics_success(self, pull_metrics_testing, mock_redis):
"""Test retrieving pull statistics from Redis."""
@@ -410,16 +579,20 @@ class TestPullMetrics:
"""Test pull statistics retrieval handles Redis errors."""
key = "pull_events:repo:123:tag:latest:sha256:abc123"
# Mock Redis to raise error
# Ensure connection can be established (ping succeeds)
mock_redis.ping.return_value = True
# Mock hgetall to raise error when called
mock_redis.hgetall.side_effect = redis.RedisError("Connection failed")
# Execute
with patch("util.pullmetrics.logger") as mock_logger:
result = pull_metrics_testing._get_pull_statistics(key)
# Should return None and log error
# Should return None and log warning (Redis errors are logged as warnings)
assert result is None
mock_logger.exception.assert_called_once()
mock_logger.warning.assert_called()
# Verify no exception was logged
mock_logger.exception.assert_not_called()
def test_get_tag_pull_statistics(self, pull_metrics_testing, mock_redis):
"""Test get_tag_pull_statistics method."""