1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/util/test/test_pullmetrics.py
Ryan Wallace a06cc6fa43 chore: update all black versions to 24.4.2 and run make black (#4754)
* chore(pre-commit): match black version with requirements-dev

* run `make black` against repo

* ci: switch to black 24.4.2

* fix: py312

* fix: flake8 errors

* fix: flake8 conflicts

* chore: add git blame ignore revs file
2025-12-19 11:29:53 -06:00

707 lines
27 KiB
Python

"""
Tests for util/pullmetrics.py module.
This test suite covers:
- PullMetrics class initialization
- Tag pull tracking (sync and async)
- Manifest pull tracking (sync and async)
- Redis key generation
- Pull statistics retrieval from Redis
- Error handling
"""
from datetime import datetime, timezone
from unittest.mock import MagicMock, Mock, patch
import pytest
import redis
from util.pullmetrics import (
CannotReadPullMetricsException,
PullMetrics,
PullMetricsBuilder,
PullMetricsBuilderModule,
)
class TestPullMetricsKeyGeneration:
"""Test Redis key generation patterns."""
def test_tag_pull_key_format(self):
"""Test tag pull key generation format."""
key = PullMetrics._tag_pull_key(123, "latest", "sha256:abc123")
assert key == "pull_events:repo:123:tag:latest:sha256:abc123"
def test_manifest_pull_key_format(self):
"""Test manifest pull key generation format."""
key = PullMetrics._manifest_pull_key(456, "sha256:def456")
assert key == "pull_events:repo:456:digest:sha256:def456"
def test_tag_pull_key_with_special_characters(self):
"""Test tag pull key with special characters in tag name."""
key = PullMetrics._tag_pull_key(789, "v1.2.3-alpha", "sha256:xyz789")
assert key == "pull_events:repo:789:tag:v1.2.3-alpha:sha256:xyz789"
class TestPullMetricsBuilder:
"""Test PullMetricsBuilder class."""
def test_builder_initialization(self):
"""Test PullMetricsBuilder initialization."""
redis_config = {"host": "localhost", "port": 6379}
builder = PullMetricsBuilder(redis_config)
assert builder._redis_config == redis_config
assert builder._max_workers is None
def test_builder_initialization_with_workers(self):
"""Test PullMetricsBuilder initialization with custom worker count."""
redis_config = {"host": "localhost", "port": 6379}
builder = PullMetricsBuilder(redis_config, max_workers=10)
assert builder._max_workers == 10
def test_builder_get_event(self, mock_redis):
"""Test PullMetricsBuilder.get_event() returns PullMetrics instance."""
with patch("util.pullmetrics.redis.StrictRedis") as mock_redis_class:
mock_redis_class.return_value = MagicMock()
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
builder = PullMetricsBuilder(redis_config)
event = builder.get_event()
assert isinstance(event, PullMetrics)
class TestPullMetricsBuilderModule:
"""Test PullMetricsBuilderModule Flask extension."""
def test_module_initialization_without_app(self):
"""Test module initialization without app."""
module = PullMetricsBuilderModule()
assert module.app is None
assert module.state is None
def test_module_initialization_with_app(self):
"""Test module initialization with app."""
app = Mock()
app.config = {
"PULL_METRICS_REDIS": {"host": "localhost", "port": 6379},
"TESTING": True,
}
app.extensions = {}
module = PullMetricsBuilderModule(app)
assert module.app == app
assert module.state is not None
assert isinstance(module.state, PullMetricsBuilder)
assert "pullmetrics" in app.extensions
def test_module_init_app(self):
"""Test module init_app method."""
app = Mock()
app.config = {
"PULL_METRICS_REDIS": {"host": "localhost", "port": 6379},
"TESTING": True,
}
app.extensions = {}
module = PullMetricsBuilderModule()
builder = module.init_app(app)
assert isinstance(builder, PullMetricsBuilder)
assert "pullmetrics" in app.extensions
assert app.extensions["pullmetrics"] == builder
def test_module_init_app_with_old_config_key(self):
"""Test module handles old PULL_METRICS_REDIS_HOSTNAME config key."""
app = Mock()
app.config = {
"PULL_METRICS_REDIS_HOSTNAME": "redis.example.com",
"TESTING": True,
}
app.extensions = {}
module = PullMetricsBuilderModule()
builder = module.init_app(app)
assert isinstance(builder, PullMetricsBuilder)
assert builder._redis_config["host"] == "redis.example.com"
def test_module_init_app_with_custom_worker_count(self):
"""Test module respects custom worker count configuration."""
app = Mock()
app.config = {
"PULL_METRICS_REDIS": {"host": "localhost", "port": 6379},
"PULL_METRICS_WORKER_COUNT": 10,
"TESTING": True,
}
app.extensions = {}
module = PullMetricsBuilderModule()
builder = module.init_app(app)
assert builder._max_workers == 10
def test_module_getattr(self):
"""Test module __getattr__ delegates to state."""
app = Mock()
app.config = {
"PULL_METRICS_REDIS": {"host": "localhost", "port": 6379},
"TESTING": True,
}
app.extensions = {}
module = PullMetricsBuilderModule(app)
# Access an attribute from the builder
assert module._redis_config is not None
@pytest.fixture
def mock_redis():
"""Create a mock Redis client."""
with patch("util.pullmetrics.redis.StrictRedis") as mock_redis_class:
mock_redis_instance = MagicMock()
mock_redis_class.return_value = mock_redis_instance
yield mock_redis_instance
@pytest.fixture
def pull_metrics_testing(mock_redis):
"""Create PullMetrics instance in testing mode (no thread pool)."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
return PullMetrics(redis_config)
@pytest.fixture
def pull_metrics_production(mock_redis):
"""Create PullMetrics instance in production mode (with thread pool)."""
redis_config = {"host": "localhost", "port": 6379}
return PullMetrics(redis_config)
class TestPullMetrics:
"""Test PullMetrics class."""
def test_pullmetrics_initialization_testing_mode(self, mock_redis):
"""Test PullMetrics initialization in testing mode - lazy connection."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# With lazy initialization, Redis connection should be None until first use
assert pm._redis is None
assert pm._executor is None # No thread pool in testing mode
def test_pullmetrics_initialization_production_mode(self, mock_redis):
"""Test PullMetrics initialization in production mode - lazy connection."""
redis_config = {"host": "localhost", "port": 6379}
pm = PullMetrics(redis_config)
# With lazy initialization, Redis connection should be None until first use
assert pm._redis is None
assert pm._executor is not None # Thread pool in production mode
def test_lazy_redis_connection_on_first_use(self, mock_redis):
"""Test that Redis connection is established on first use, not during init."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# Initially no connection
assert pm._redis is None
# Mock ping to succeed
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
# First use should establish connection
repository = Mock()
repository.id = 123
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Now connection should be established
assert pm._redis is not None
# Verify StrictRedis was called to create connection
from util.pullmetrics import redis
redis.StrictRedis.assert_called()
def test_redis_connection_retry_logic(self, mock_redis):
"""Test that Redis connection retries on failure."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True, "retry_attempts": 3}
pm = PullMetrics(redis_config)
# Mock connection failures then success
from util.pullmetrics import redis
call_count = 0
def mock_redis_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
mock_client = MagicMock()
if call_count < 3:
# First two attempts fail
mock_client.ping.side_effect = redis.ConnectionError("Connection failed")
else:
# Third attempt succeeds
mock_client.ping.return_value = True
return mock_client
redis.StrictRedis.side_effect = mock_redis_side_effect
repository = Mock()
repository.id = 123
mock_pipeline = MagicMock()
# After retries, should eventually succeed
with patch.object(pm, "_ensure_redis_connection") as mock_ensure:
mock_redis_client = MagicMock()
mock_redis_client.pipeline.return_value = mock_pipeline
mock_ensure.return_value = mock_redis_client
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
def test_redis_connection_health_check(self, mock_redis):
"""Test that existing connection is health-checked before reuse."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# Establish initial connection
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
repository = Mock()
repository.id = 123
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Connection should be established
assert pm._redis is not None
# Verify ping was called for health check
# (ping is called in _ensure_redis_connection)
assert mock_redis.ping.called
def test_redis_connection_reconnect_on_failure(self, mock_redis):
"""Test that connection is re-established if health check fails."""
redis_config = {"host": "localhost", "port": 6379, "_testing": True}
pm = PullMetrics(redis_config)
# First connection succeeds
mock_redis.ping.return_value = True
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
repository = Mock()
repository.id = 123
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Simulate connection failure on health check
from util.pullmetrics import redis
original_redis = pm._redis
# Use a callable side_effect that only fails on the first call (health check)
# Subsequent calls (reconnection) will succeed
ping_call_count = [0]
def ping_side_effect():
ping_call_count[0] += 1
if ping_call_count[0] == 1:
# First call (health check) fails
raise redis.ConnectionError("Connection lost")
# Subsequent calls succeed
return True
original_redis.ping.side_effect = ping_side_effect
# Note: side_effect takes precedence over return_value, so the callable
# side_effect handles both failure and success cases
pm.track_tag_pull_sync(repository, "latest", "sha256:abc123")
# Should have attempted to reconnect
assert mock_redis.ping.call_count >= 2
def test_track_tag_pull_sync(self, pull_metrics_testing, mock_redis):
"""Test synchronous tag pull tracking."""
# Setup
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock connection establishment (ping) and Lua script execution
mock_redis.ping.return_value = True
mock_redis.eval.return_value = "1" # Return pull_count as string
# Execute
pull_metrics_testing.track_tag_pull_sync(repository, tag_name, manifest_digest)
# Verify Redis connection was established (ping called)
assert mock_redis.ping.called
# Verify Lua script was called with correct arguments
mock_redis.eval.assert_called_once()
call_args = mock_redis.eval.call_args
assert call_args[0][0] == pull_metrics_testing._TRACK_TAG_PULL_SCRIPT
assert call_args[0][1] == 1 # number of keys
assert call_args[0][2] == "pull_events:repo:123:tag:latest:sha256:abc123" # KEYS[1]
assert call_args[0][3] == "123" # ARGV[1] - repository_id
assert call_args[0][4] == "latest" # ARGV[2] - tag_name
assert call_args[0][5] == "sha256:abc123" # ARGV[3] - manifest_digest
# ARGV[4] is timestamp (dynamic, skip check)
assert call_args[0][7] == "tag" # ARGV[5] - pull_method
def test_track_tag_pull_sync_with_repository_id(self, pull_metrics_testing, mock_redis):
"""Test synchronous tag pull tracking with repository ID instead of object."""
# Setup
repository_id = 456
tag_name = "v1.0"
manifest_digest = "sha256:def456"
# Mock Lua script execution
mock_redis.eval.return_value = "1" # Return pull_count as string
# Execute
pull_metrics_testing.track_tag_pull_sync(repository_id, tag_name, manifest_digest)
# Verify Lua script was called with correct repository_id
mock_redis.eval.assert_called_once()
call_args = mock_redis.eval.call_args
assert call_args[0][3] == "456" # ARGV[1] - repository_id as string
def test_track_manifest_pull_sync(self, pull_metrics_testing, mock_redis):
"""Test synchronous manifest pull tracking."""
# Setup
repository = Mock()
repository.id = 789
manifest_digest = "sha256:xyz789"
# Mock Lua script execution
mock_redis.eval.return_value = "1" # Return pull_count as string
# Execute
pull_metrics_testing.track_manifest_pull_sync(repository, manifest_digest)
# Verify Lua script was called with correct arguments
mock_redis.eval.assert_called_once()
call_args = mock_redis.eval.call_args
assert call_args[0][0] == pull_metrics_testing._TRACK_MANIFEST_PULL_SCRIPT
assert call_args[0][1] == 1 # number of keys
assert call_args[0][2] == "pull_events:repo:789:digest:sha256:xyz789" # KEYS[1]
assert call_args[0][3] == "789" # ARGV[1] - repository_id
assert call_args[0][4] == "sha256:xyz789" # ARGV[2] - manifest_digest
# ARGV[3] is timestamp (dynamic, skip check)
assert call_args[0][6] == "digest" # ARGV[4] - pull_method
def test_track_manifest_pull_sync_with_repository_id(self, pull_metrics_testing, mock_redis):
"""Test synchronous manifest pull tracking with repository ID."""
# Setup
repository_id = 999
manifest_digest = "sha256:test999"
# Mock Lua script execution
mock_redis.eval.return_value = "1" # Return pull_count as string
# Execute
pull_metrics_testing.track_manifest_pull_sync(repository_id, manifest_digest)
# Verify Lua script was called with correct repository_id
mock_redis.eval.assert_called_once()
call_args = mock_redis.eval.call_args
assert call_args[0][3] == "999" # ARGV[1] - repository_id as string
def test_track_tag_pull_async_testing_mode(self, pull_metrics_testing, mock_redis):
"""Test async tag pull tracking in testing mode (runs synchronously)."""
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock Lua script execution
mock_redis.eval.return_value = "1" # Return pull_count as string
# Execute
pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest)
# In testing mode, should run synchronously
mock_redis.eval.assert_called_once()
def test_track_tag_pull_async_production_mode(self, mock_redis):
"""Test async tag pull tracking in production mode (runs in thread pool)."""
redis_config = {"host": "localhost", "port": 6379}
pm = PullMetrics(redis_config)
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock pipeline
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
# Execute
pm.track_tag_pull(repository, tag_name, manifest_digest)
# Should submit to executor
assert pm._executor is not None
def test_track_tag_pull_redis_error_handling(self, pull_metrics_testing, mock_redis):
"""Test tag pull tracking handles Redis connection errors gracefully."""
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock connection to fail during health check
mock_redis.ping.side_effect = redis.ConnectionError("Connection failed")
# Execute - should not raise exception, should log warning
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest)
# Connection errors are logged as warnings, not exceptions
mock_logger.warning.assert_called()
# Verify no exception was logged (only warnings for connection errors)
mock_logger.exception.assert_not_called()
def test_track_manifest_pull_async_testing_mode(self, pull_metrics_testing, mock_redis):
"""Test async manifest pull tracking in testing mode."""
repository = Mock()
repository.id = 789
manifest_digest = "sha256:xyz789"
# Mock Lua script execution
mock_redis.eval.return_value = "1" # Return pull_count as string
# Execute
pull_metrics_testing.track_manifest_pull(repository, manifest_digest)
# In testing mode, should run synchronously
mock_redis.eval.assert_called_once()
def test_track_manifest_pull_redis_error_handling(self, pull_metrics_testing, mock_redis):
"""Test manifest pull tracking handles Redis errors gracefully."""
repository = Mock()
repository.id = 789
manifest_digest = "sha256:xyz789"
# Ensure connection can be established (ping succeeds)
mock_redis.ping.return_value = True
# Mock Lua script execution to raise error
mock_redis.eval.side_effect = redis.RedisError("Connection failed")
# Execute - should not raise exception, should log error
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_manifest_pull(repository, manifest_digest)
# Redis errors are logged as errors, not exceptions
mock_logger.error.assert_called()
# Verify no exception was logged (only errors for Redis errors)
mock_logger.exception.assert_not_called()
def test_track_tag_pull_timeout_error_handling(self, pull_metrics_testing, mock_redis):
"""Test tag pull tracking handles Redis timeout errors gracefully."""
repository = Mock()
repository.id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock connection to fail with timeout during health check
mock_redis.ping.side_effect = redis.TimeoutError("Operation timed out")
# Execute - should not raise exception, should log warning
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_tag_pull(repository, tag_name, manifest_digest)
# Timeout errors are logged as warnings, not exceptions
mock_logger.warning.assert_called()
mock_logger.exception.assert_not_called()
def test_track_manifest_pull_pipeline_execute_error(self, pull_metrics_testing, mock_redis):
"""Test manifest pull tracking handles Lua script execution errors gracefully."""
repository = Mock()
repository.id = 789
manifest_digest = "sha256:xyz789"
# Ensure connection can be established (ping succeeds)
mock_redis.ping.return_value = True
# Mock Lua script execution to raise error
mock_redis.eval.side_effect = redis.RedisError("Lua script execution failed")
# Execute - should not raise exception, should log error
with patch("util.pullmetrics.logger") as mock_logger:
pull_metrics_testing.track_manifest_pull(repository, manifest_digest)
# Redis errors are logged as errors, not exceptions
mock_logger.error.assert_called()
mock_logger.exception.assert_not_called()
def test_get_pull_statistics_success(self, pull_metrics_testing, mock_redis):
"""Test retrieving pull statistics from Redis."""
key = "pull_events:repo:123:tag:latest:sha256:abc123"
# Mock Redis response
mock_redis.hgetall.return_value = {
b"repository_id": b"123",
b"tag_name": b"latest",
b"manifest_digest": b"sha256:abc123",
b"pull_count": b"10",
b"last_pull_timestamp": b"1704110400",
}
# Execute
result = pull_metrics_testing._get_pull_statistics(key)
# Verify
assert result is not None
assert result["repository_id"] == "123"
assert result["tag_name"] == "latest"
assert result["manifest_digest"] == "sha256:abc123"
assert result["pull_count"] == 10
assert result["last_pull_timestamp"] == "1704110400"
def test_get_pull_statistics_empty(self, pull_metrics_testing, mock_redis):
"""Test retrieving non-existent pull statistics."""
key = "pull_events:repo:123:tag:nonexistent:sha256:abc123"
# Mock Redis response (empty)
mock_redis.hgetall.return_value = {}
# Execute
result = pull_metrics_testing._get_pull_statistics(key)
# Verify
assert result is None
def test_get_pull_statistics_redis_error(self, pull_metrics_testing, mock_redis):
"""Test pull statistics retrieval handles Redis errors."""
key = "pull_events:repo:123:tag:latest:sha256:abc123"
# Ensure connection can be established (ping succeeds)
mock_redis.ping.return_value = True
# Mock hgetall to raise error when called
mock_redis.hgetall.side_effect = redis.RedisError("Connection failed")
# Execute
with patch("util.pullmetrics.logger") as mock_logger:
result = pull_metrics_testing._get_pull_statistics(key)
# Should return None and log warning (Redis errors are logged as warnings)
assert result is None
mock_logger.warning.assert_called()
# Verify no exception was logged
mock_logger.exception.assert_not_called()
def test_get_tag_pull_statistics(self, pull_metrics_testing, mock_redis):
"""Test get_tag_pull_statistics method."""
repository_id = 123
tag_name = "latest"
manifest_digest = "sha256:abc123"
# Mock Redis response
mock_redis.hgetall.return_value = {
b"repository_id": b"123",
b"tag_name": b"latest",
b"manifest_digest": b"sha256:abc123",
b"pull_count": b"15",
}
# Execute
result = pull_metrics_testing.get_tag_pull_statistics(
repository_id, tag_name, manifest_digest
)
# Verify
assert result is not None
assert result["pull_count"] == 15
mock_redis.hgetall.assert_called_once_with("pull_events:repo:123:tag:latest:sha256:abc123")
def test_get_manifest_pull_statistics(self, pull_metrics_testing, mock_redis):
"""Test get_manifest_pull_statistics method."""
repository_id = 456
manifest_digest = "sha256:def456"
# Mock Redis response
mock_redis.hgetall.return_value = {
b"repository_id": b"456",
b"manifest_digest": b"sha256:def456",
b"pull_count": b"20",
}
# Execute
result = pull_metrics_testing.get_manifest_pull_statistics(repository_id, manifest_digest)
# Verify
assert result is not None
assert result["pull_count"] == 20
mock_redis.hgetall.assert_called_once_with("pull_events:repo:456:digest:sha256:def456")
def test_shutdown_with_executor(self, mock_redis):
"""Test shutdown method with thread pool executor."""
redis_config = {"host": "localhost", "port": 6379}
pm = PullMetrics(redis_config)
# Mock executor
pm._executor = Mock()
# Execute
pm.shutdown()
# Verify
pm._executor.shutdown.assert_called_once_with(wait=True)
def test_shutdown_without_executor(self, pull_metrics_testing):
"""Test shutdown method without thread pool executor."""
# Execute - should not raise exception
pull_metrics_testing.shutdown()
# No executor, so nothing to verify
class TestPullMetricsIntegration:
"""Integration tests with actual Redis operations (if available)."""
def test_full_workflow_tag_pull(self, pull_metrics_testing, mock_redis):
"""Test full workflow of tracking and retrieving tag pulls."""
repository = Mock()
repository.id = 100
tag_name = "integration-test"
manifest_digest = "sha256:integration123"
# Mock pipeline and hgetall
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
mock_redis.hgetall.return_value = {
b"repository_id": b"100",
b"tag_name": b"integration-test",
b"manifest_digest": b"sha256:integration123",
b"pull_count": b"1",
}
# Track a pull
pull_metrics_testing.track_tag_pull_sync(repository, tag_name, manifest_digest)
# Retrieve statistics
stats = pull_metrics_testing.get_tag_pull_statistics(
repository.id, tag_name, manifest_digest
)
# Verify
assert stats is not None
assert stats["tag_name"] == "integration-test"
def test_full_workflow_manifest_pull(self, pull_metrics_testing, mock_redis):
"""Test full workflow of tracking and retrieving manifest pulls."""
repository = Mock()
repository.id = 200
manifest_digest = "sha256:integration456"
# Mock pipeline and hgetall
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
mock_redis.hgetall.return_value = {
b"repository_id": b"200",
b"manifest_digest": b"sha256:integration456",
b"pull_count": b"1",
}
# Track a pull
pull_metrics_testing.track_manifest_pull_sync(repository, manifest_digest)
# Retrieve statistics
stats = pull_metrics_testing.get_manifest_pull_statistics(repository.id, manifest_digest)
# Verify
assert stats is not None
assert stats["manifest_digest"] == "sha256:integration456"