mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* chore(pre-commit): match black version with requirements-dev * run `make black` against repo * ci: switch to black 24.4.2 * fix: py312 * fix: flake8 errors * fix: flake8 conflicts * chore: add git blame ignore revs file
1811 lines
67 KiB
Python
1811 lines
67 KiB
Python
"""
|
|
Tests for RedisFlushWorker.
|
|
|
|
Tests the main worker that processes Redis pull events.
|
|
"""
|
|
|
|
import sys
|
|
from typing import List, Set
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import redis
|
|
|
|
from workers.pullstatsredisflushworker import RedisFlushWorker, create_gunicorn_worker
|
|
|
|
|
|
def test_redis_flush_worker_init():
|
|
"""Test RedisFlushWorker initialization."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"REDIS_FLUSH_INTERVAL_SECONDS": 300,
|
|
"REDIS_FLUSH_WORKER_BATCH_SIZE": 1000,
|
|
"REDIS_FLUSH_WORKER_SCAN_COUNT": 100,
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
}.get(key, default)
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Should have one operation scheduled
|
|
assert len(worker._operations) == 1
|
|
# Note: Default frequency is 300, but may be overridden by config
|
|
assert worker._operations[0][1] in [30, 300] # Allow for test config overrides
|
|
|
|
|
|
def test_initialize_redis_client_success():
|
|
"""Test successful Redis client initialization."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
}.get(key, default)
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
# Mock Redis client
|
|
mock_client = MagicMock()
|
|
mock_client.ping.return_value = True
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
# Test
|
|
worker = RedisFlushWorker()
|
|
|
|
# Verify
|
|
assert worker.redis_client == mock_client
|
|
mock_redis_module.StrictRedis.assert_called_once()
|
|
mock_client.ping.assert_called_once()
|
|
|
|
|
|
def test_initialize_redis_client_connection_failure():
|
|
"""Test Redis client initialization when connection fails."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
}.get(key, default)
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
import redis
|
|
|
|
# Mock Redis client that fails on ping with ConnectionError
|
|
mock_client = MagicMock()
|
|
mock_client.ping.side_effect = redis.ConnectionError("Connection failed")
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
# Ensure the exception classes are available in the mocked module
|
|
mock_redis_module.ConnectionError = redis.ConnectionError
|
|
mock_redis_module.RedisError = redis.RedisError
|
|
|
|
# Test
|
|
worker = RedisFlushWorker()
|
|
|
|
# Verify
|
|
assert worker.redis_client is None
|
|
|
|
|
|
def test_initialize_redis_client_redis_error():
|
|
"""Test Redis client initialization when a general Redis error occurs."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
}.get(key, default)
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
import redis
|
|
|
|
# Mock Redis client that fails on ping with RedisError
|
|
mock_client = MagicMock()
|
|
mock_client.ping.side_effect = redis.RedisError("Redis error")
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
# Ensure the exception classes are available in the mocked module
|
|
mock_redis_module.ConnectionError = redis.ConnectionError
|
|
mock_redis_module.RedisError = redis.RedisError
|
|
|
|
# Test
|
|
worker = RedisFlushWorker()
|
|
|
|
# Verify
|
|
assert worker.redis_client is None
|
|
|
|
|
|
def test_initialize_redis_client_no_config():
|
|
"""Test Redis client initialization when PULL_METRICS_REDIS is not configured."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
}.get(key, default)
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
# Mock Redis client
|
|
mock_client = MagicMock()
|
|
mock_client.ping.return_value = True
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
# Test - should still initialize Redis client with default config
|
|
worker = RedisFlushWorker()
|
|
|
|
# Should have Redis client with default config
|
|
assert worker.redis_client == mock_client
|
|
mock_redis_module.StrictRedis.assert_called_once()
|
|
|
|
|
|
def test_process_redis_events():
|
|
"""Test processing of Redis events."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Mock Redis client
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
},
|
|
{
|
|
"repository_id": "456",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:def456",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "1694168460",
|
|
"pull_method": "digest",
|
|
},
|
|
]
|
|
|
|
# Test keys
|
|
keys = [
|
|
"pull_events:repo:123:tag:latest:sha256:abc123",
|
|
"pull_events:repo:456:digest:sha256:def456",
|
|
]
|
|
|
|
# Mock RENAME operation (atomic claim of keys)
|
|
mock_client.rename.return_value = None # RENAME succeeds
|
|
|
|
# Test processing
|
|
(
|
|
tag_updates,
|
|
manifest_updates,
|
|
database_dependent_keys,
|
|
) = worker._process_redis_events(keys)
|
|
|
|
# Verify key separation results
|
|
assert len(database_dependent_keys) == 2 # Both keys have valid data
|
|
assert len(tag_updates) == 1 # One tag update
|
|
assert len(manifest_updates) == 2 # Two manifest updates (tag + digest)
|
|
|
|
# Check tag update
|
|
tag_update = tag_updates[0]
|
|
assert tag_update["repository_id"] == 123
|
|
assert tag_update["tag_name"] == "latest"
|
|
assert tag_update["pull_count"] == 5
|
|
assert tag_update["manifest_digest"] == "sha256:abc123"
|
|
|
|
|
|
def test_cleanup_redis_keys_success():
|
|
"""Test successful Redis key cleanup."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Mock Redis client
|
|
mock_client = MagicMock()
|
|
mock_client.delete.return_value = 3 # Simulate successful deletion of 3 keys
|
|
worker.redis_client = mock_client
|
|
|
|
# Test cleanup
|
|
test_keys = {"key1", "key2", "key3"}
|
|
worker._cleanup_redis_keys(test_keys)
|
|
|
|
# Verify delete was called with the correct keys (order doesn't matter for sets)
|
|
mock_client.delete.assert_called_once()
|
|
called_args = mock_client.delete.call_args[0] # Get the positional arguments
|
|
assert set(called_args) == test_keys
|
|
|
|
|
|
def test_cleanup_redis_keys_no_client():
|
|
"""Test Redis key cleanup when no client is available."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
# No Redis client set
|
|
worker.redis_client = None
|
|
|
|
# Should handle gracefully
|
|
keys_to_clean = {"key1", "key2"}
|
|
worker._cleanup_redis_keys(keys_to_clean) # Should not raise exception
|
|
|
|
|
|
def test_validate_redis_key_data():
|
|
"""Test Redis key data validation."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Valid data
|
|
valid_data = {
|
|
"repository_id": "123",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", valid_data) is True
|
|
|
|
# Missing required field
|
|
invalid_data = {
|
|
"repository_id": "123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
# missing manifest_digest
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
# Invalid repository_id
|
|
invalid_data = {
|
|
"repository_id": "0",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
# Invalid manifest digest
|
|
invalid_data = {
|
|
"repository_id": "123",
|
|
"manifest_digest": "invalid_digest",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
# Invalid digest format (missing colon)
|
|
invalid_data = {
|
|
"repository_id": "123",
|
|
"manifest_digest": "sha256123123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
|
|
def test_scan_redis_keys_no_client():
|
|
"""Test Redis key scanning when client is None."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
worker.redis_client = None
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
assert result == []
|
|
|
|
|
|
def test_scan_redis_keys_success():
|
|
"""Test successful Redis key scanning."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.return_value = (0, ["key1", "key2", "key3"])
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
assert len(result) == 3
|
|
assert set(result) == {"key1", "key2", "key3"}
|
|
mock_client.scan.assert_called_once_with(cursor=0, match="pull_events:*", count=100)
|
|
|
|
|
|
def test_scan_redis_keys_empty_results():
|
|
"""Test Redis key scanning when no keys are found."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.return_value = (0, [])
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
assert result == []
|
|
mock_client.scan.assert_called_once_with(cursor=0, match="pull_events:*", count=100)
|
|
|
|
|
|
def test_scan_redis_keys_multiple_batches():
|
|
"""Test Redis key scanning with multiple scan batches."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = [
|
|
(10, ["key1", "key2"]), # First batch with cursor 10
|
|
(20, ["key3", "key4"]), # Second batch with cursor 20
|
|
(0, ["key5"]), # Final batch with cursor 0 (end)
|
|
]
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
# Verify all keys are returned
|
|
assert len(result) == 5
|
|
assert set(result) == {"key1", "key2", "key3", "key4", "key5"}
|
|
|
|
# Verify scan was called multiple times with correct cursors
|
|
expected_calls = [
|
|
{"cursor": 0, "match": "pull_events:*", "count": 100},
|
|
{"cursor": 10, "match": "pull_events:*", "count": 100},
|
|
{"cursor": 20, "match": "pull_events:*", "count": 100},
|
|
]
|
|
assert mock_client.scan.call_count == 3
|
|
for i, call in enumerate(mock_client.scan.call_args_list):
|
|
assert call.kwargs == expected_calls[i]
|
|
|
|
|
|
def test_scan_redis_keys_limit_reached():
|
|
"""Test Redis key scanning stops when limit is reached."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = [
|
|
(10, ["key1", "key2", "key3"]), # First batch
|
|
(20, ["key4", "key5", "key6"]), # Second batch - would exceed limit
|
|
]
|
|
worker.redis_client = mock_client
|
|
|
|
# Test with limit of 4
|
|
result = worker._scan_redis_keys("pull_events:*", 4)
|
|
|
|
# Verify only limit number of keys are returned
|
|
assert len(result) == 4
|
|
assert set(result).issubset({"key1", "key2", "key3", "key4", "key5", "key6"})
|
|
|
|
# Should call scan at least once
|
|
assert mock_client.scan.call_count >= 1
|
|
|
|
|
|
def test_scan_redis_keys_deduplication():
|
|
"""Test Redis key scanning deduplicates keys properly."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = [
|
|
(10, ["key1", "key2", "key1"]), # Duplicate key1 in same batch
|
|
(0, ["key2", "key3"]), # Duplicate key2 across batches
|
|
]
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
# Verify deduplication
|
|
assert len(result) == 3
|
|
assert set(result) == {"key1", "key2", "key3"}
|
|
|
|
|
|
def test_scan_redis_keys_redis_error():
|
|
"""Test Redis key scanning handles Redis errors."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = redis.RedisError("Connection lost")
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
# Verify error is handled gracefully
|
|
assert result == []
|
|
|
|
|
|
def test_scan_redis_keys_general_exception():
|
|
"""Test Redis key scanning handles general exceptions."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = Exception("Unexpected error")
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
# Verify error is handled gracefully
|
|
assert result == []
|
|
|
|
|
|
def test_scan_redis_keys_empty_batch_in_middle():
|
|
"""Test Redis key scanning handles empty batches in the middle of scanning."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = [
|
|
(10, ["key1", "key2"]), # First batch with keys
|
|
(20, []), # Second batch empty
|
|
(0, ["key3"]), # Final batch with keys
|
|
]
|
|
worker.redis_client = mock_client
|
|
|
|
result = worker._scan_redis_keys("pull_events:*", 10)
|
|
|
|
# Verify all non-empty batches are processed
|
|
assert len(result) == 3
|
|
assert set(result) == {"key1", "key2", "key3"}
|
|
|
|
|
|
def test_flush_to_database_successful_tag_updates():
|
|
"""Test successful database flush with tag updates only."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.return_value = 2
|
|
mock_manifest_upsert.return_value = 0
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = [
|
|
{"repository_id": 123, "tag_name": "latest", "pull_count": 5},
|
|
{"repository_id": 124, "tag_name": "v1.0", "pull_count": 3},
|
|
]
|
|
manifest_updates = []
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
assert result is True
|
|
mock_tag_upsert.assert_called_once_with(tag_updates)
|
|
mock_manifest_upsert.assert_not_called()
|
|
|
|
|
|
def test_flush_to_database_successful_manifest_updates():
|
|
"""Test successful database flush with manifest updates only."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.return_value = 0
|
|
mock_manifest_upsert.return_value = 3
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = []
|
|
manifest_updates = [
|
|
{"repository_id": 123, "manifest_digest": "sha256:abc", "pull_count": 5},
|
|
{"repository_id": 124, "manifest_digest": "sha256:def", "pull_count": 3},
|
|
{"repository_id": 125, "manifest_digest": "sha256:ghi", "pull_count": 2},
|
|
]
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
assert result is True
|
|
mock_tag_upsert.assert_not_called()
|
|
mock_manifest_upsert.assert_called_once_with(manifest_updates)
|
|
|
|
|
|
def test_flush_to_database_successful_both_updates():
|
|
"""Test successful database flush with both tag and manifest updates."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.return_value = 1
|
|
mock_manifest_upsert.return_value = 2
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = [
|
|
{"repository_id": 123, "tag_name": "latest", "pull_count": 5},
|
|
]
|
|
manifest_updates = [
|
|
{"repository_id": 123, "manifest_digest": "sha256:abc", "pull_count": 5},
|
|
{"repository_id": 124, "manifest_digest": "sha256:def", "pull_count": 3},
|
|
]
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
assert result is True
|
|
mock_tag_upsert.assert_called_once_with(tag_updates)
|
|
mock_manifest_upsert.assert_called_once_with(manifest_updates)
|
|
|
|
|
|
def test_flush_to_database_empty_updates():
|
|
"""Test database flush with no updates (empty batch)."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = []
|
|
manifest_updates = []
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
# Should return True for empty batch (no updates needed)
|
|
assert result is True
|
|
mock_tag_upsert.assert_not_called()
|
|
mock_manifest_upsert.assert_not_called()
|
|
|
|
|
|
def test_flush_to_database_exception_handling():
|
|
"""Test database flush handles exceptions."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.side_effect = Exception("Database error")
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = [
|
|
{"repository_id": 123, "tag_name": "latest", "pull_count": 5},
|
|
]
|
|
manifest_updates = []
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
assert result is False
|
|
mock_tag_upsert.assert_called_once_with(tag_updates)
|
|
|
|
|
|
def test_flush_to_database_partial_success():
|
|
"""Test database flush with partial success (zero updates returned)."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.return_value = 0
|
|
mock_manifest_upsert.return_value = 0
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = [
|
|
{"repository_id": 123, "tag_name": "latest", "pull_count": 5},
|
|
]
|
|
manifest_updates = [
|
|
{"repository_id": 123, "manifest_digest": "sha256:abc", "pull_count": 5},
|
|
]
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
# Should return False since no actual updates were made
|
|
assert result is False
|
|
mock_tag_upsert.assert_called_once_with(tag_updates)
|
|
mock_manifest_upsert.assert_called_once_with(manifest_updates)
|
|
|
|
|
|
def test_create_gunicorn_worker():
|
|
"""Test the create_gunicorn_worker function."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch("workers.pullstatsredisflushworker.GunicornWorker") as mock_gunicorn_worker:
|
|
with patch("workers.pullstatsredisflushworker.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = True
|
|
mock_gunicorn_instance = MagicMock()
|
|
mock_gunicorn_worker.return_value = mock_gunicorn_instance
|
|
|
|
# Test
|
|
result = create_gunicorn_worker()
|
|
|
|
# Verify
|
|
assert result is not None
|
|
mock_gunicorn_worker.assert_called_once()
|
|
call_args = mock_gunicorn_worker.call_args
|
|
assert call_args[0][0] == "workers.pullstatsredisflushworker"
|
|
assert call_args[0][3] is True # IMAGE_PULL_STATS feature flag
|
|
|
|
|
|
def test_flush_pull_metrics_no_redis_client():
|
|
"""Test _flush_pull_metrics when Redis client is not initialized."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
worker.redis_client = None
|
|
|
|
# Should return early without error
|
|
worker._flush_pull_metrics() # No assertions needed - just verifies no exception
|
|
|
|
|
|
def test_flush_pull_metrics_no_keys_found():
|
|
"""Test _flush_pull_metrics when no Redis keys are found."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.return_value = (0, [])
|
|
worker.redis_client = mock_client
|
|
|
|
# Should return early when no keys found
|
|
worker._flush_pull_metrics()
|
|
|
|
# Verify scan was called
|
|
mock_client.scan.assert_called_once()
|
|
|
|
|
|
def test_flush_pull_metrics_successful_processing():
|
|
"""Test successful _flush_pull_metrics execution."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.return_value = 1
|
|
mock_manifest_upsert.return_value = 1
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
|
|
# Mock scan to return a key
|
|
mock_client.scan.return_value = (0, ["pull_events:repo:123:tag:latest:sha256:abc"])
|
|
|
|
# Mock hgetall to return valid data
|
|
mock_client.hgetall.return_value = {
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
}
|
|
|
|
# Mock delete to succeed
|
|
mock_client.delete.return_value = 1
|
|
|
|
worker.redis_client = mock_client
|
|
|
|
worker._flush_pull_metrics()
|
|
|
|
# Verify database operations were called
|
|
mock_tag_upsert.assert_called_once()
|
|
mock_manifest_upsert.assert_called_once()
|
|
|
|
# Verify cleanup was called
|
|
mock_client.delete.assert_called()
|
|
|
|
|
|
def test_flush_pull_metrics_with_redis_error():
|
|
"""Test _flush_pull_metrics handles Redis errors gracefully."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = redis.RedisError("Connection lost")
|
|
worker.redis_client = mock_client
|
|
|
|
# Should handle error gracefully
|
|
worker._flush_pull_metrics() # No exception should be raised
|
|
|
|
|
|
def test_cleanup_redis_keys_batch_processing():
|
|
"""Test _cleanup_redis_keys processes keys in batches."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.delete.return_value = 100 # Successful deletion
|
|
worker.redis_client = mock_client
|
|
|
|
# Create 250 keys (should be split into 3 batches of 100, 100, 50)
|
|
keys = {f"key{i}" for i in range(250)}
|
|
worker._cleanup_redis_keys(keys)
|
|
|
|
# Verify delete was called 3 times (3 batches)
|
|
assert mock_client.delete.call_count == 3
|
|
|
|
|
|
def test_cleanup_redis_keys_partial_deletion():
|
|
"""Test _cleanup_redis_keys when some keys fail to delete."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
# Return fewer deletions than requested
|
|
mock_client.delete.return_value = 50 # Only 50 out of 100 deleted
|
|
worker.redis_client = mock_client
|
|
|
|
keys = {f"key{i}" for i in range(100)}
|
|
worker._cleanup_redis_keys(keys)
|
|
|
|
# Should still complete without raising exception
|
|
mock_client.delete.assert_called_once()
|
|
|
|
|
|
def test_cleanup_redis_keys_batch_exception():
|
|
"""Test _cleanup_redis_keys handles batch exceptions."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
# First batch fails, second succeeds
|
|
mock_client.delete.side_effect = [
|
|
Exception("First batch error"),
|
|
100, # Second batch succeeds
|
|
]
|
|
worker.redis_client = mock_client
|
|
|
|
keys = {f"key{i}" for i in range(150)}
|
|
worker._cleanup_redis_keys(keys)
|
|
|
|
# Should handle error and continue
|
|
assert mock_client.delete.call_count == 2
|
|
|
|
|
|
def test_cleanup_redis_keys_redis_error():
|
|
"""Test _cleanup_redis_keys handles Redis errors."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.delete.side_effect = redis.RedisError("Connection error")
|
|
worker.redis_client = mock_client
|
|
|
|
keys = {"key1", "key2"}
|
|
# Should handle error gracefully
|
|
worker._cleanup_redis_keys(keys)
|
|
|
|
|
|
def test_process_redis_events_invalid_key_format():
|
|
"""Test _process_redis_events handles invalid key formats."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Test with keys that don't start with pull_events:
|
|
keys = ["invalid_key", "another_invalid"]
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should skip invalid keys
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 0
|
|
|
|
|
|
def test_process_redis_events_empty_hash_data():
|
|
"""Test _process_redis_events with empty hash data."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.hgetall.return_value = {} # Empty data
|
|
worker.redis_client = mock_client
|
|
|
|
keys = ["pull_events:repo:123:tag:latest:sha256:abc"]
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Empty data should be cleaned up immediately (not in db_dependent)
|
|
assert len(db_dependent) == 0
|
|
|
|
|
|
def test_process_redis_events_zero_pull_count():
|
|
"""Test _process_redis_events with zero pull count."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.hgetall.return_value = {
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "0", # Zero pulls
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
}
|
|
worker.redis_client = mock_client
|
|
|
|
keys = ["pull_events:repo:123:tag:latest:sha256:abc"]
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Zero pull count should be cleaned up immediately
|
|
assert len(db_dependent) == 0
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 0
|
|
|
|
|
|
def test_process_redis_events_redis_error_during_processing():
|
|
"""Test _process_redis_events handles Redis errors during key processing."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.hgetall.side_effect = redis.RedisError("Connection error")
|
|
worker.redis_client = mock_client
|
|
|
|
keys = ["pull_events:repo:123:tag:latest:sha256:abc"]
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should handle error and continue
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 0
|
|
|
|
|
|
def test_process_redis_events_digest_pull():
|
|
"""Test _process_redis_events with digest pull (no tag)."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.hgetall.return_value = {
|
|
"repository_id": "123",
|
|
"tag_name": "", # No tag for digest pull
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "digest",
|
|
}
|
|
worker.redis_client = mock_client
|
|
|
|
keys = ["pull_events:repo:123:digest:sha256:abc"]
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should only have manifest update, no tag update
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 1
|
|
assert len(db_dependent) == 1
|
|
|
|
|
|
def test_flush_to_database_pull_statistics_exception():
|
|
"""Test _flush_to_database handles PullStatisticsException."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
from data.model.pull_statistics import PullStatisticsException
|
|
|
|
mock_tag_upsert.side_effect = PullStatisticsException("DB error")
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
tag_updates = [{"repository_id": 123, "tag_name": "latest", "pull_count": 5}]
|
|
manifest_updates = []
|
|
|
|
result = worker._flush_to_database(tag_updates, manifest_updates)
|
|
|
|
assert result is False
|
|
|
|
|
|
def test_initialize_redis_client_general_exception():
|
|
"""Test _initialize_redis_client handles general exceptions."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
}.get(key, default)
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
# Mock Redis client that fails with general exception
|
|
mock_client = MagicMock()
|
|
mock_client.ping.side_effect = Exception("Unexpected error")
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
# Ensure the exception classes are available
|
|
mock_redis_module.ConnectionError = redis.ConnectionError
|
|
mock_redis_module.RedisError = redis.RedisError
|
|
|
|
# Test
|
|
worker = RedisFlushWorker()
|
|
|
|
# Verify
|
|
assert worker.redis_client is None
|
|
|
|
|
|
def test_validate_redis_key_data_type_error():
|
|
"""Test _validate_redis_key_data handles type conversion errors."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Invalid data types
|
|
invalid_data = {
|
|
"repository_id": "not_a_number",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "not_a_number",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
|
|
def test_validate_redis_key_data_negative_values():
|
|
"""Test _validate_redis_key_data handles negative values correctly."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Negative pull count
|
|
invalid_data = {
|
|
"repository_id": "123",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "-1",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
# Negative timestamp
|
|
invalid_data = {
|
|
"repository_id": "123",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "-1",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
|
|
def test_validate_redis_key_data_empty_digest():
|
|
"""Test _validate_redis_key_data with empty manifest digest."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Empty digest
|
|
invalid_data = {
|
|
"repository_id": "123",
|
|
"manifest_digest": "",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
}
|
|
assert worker._validate_redis_key_data("test_key", invalid_data) is False
|
|
|
|
|
|
def test_process_redis_events_shared_digest_across_repositories():
|
|
"""Test _process_redis_events with same manifest digest in different repositories."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Mock Redis client
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - same manifest digest in different repositories
|
|
shared_digest = "sha256:92a29b8e530685cb620b9aced7c2d447d091885f1c5a3ace8d98fb5855687d05"
|
|
mock_client.hgetall.side_effect = [
|
|
# Repository 1: alpine tag pulls
|
|
{
|
|
"repository_id": "1",
|
|
"tag_name": "alpine",
|
|
"manifest_digest": shared_digest,
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
},
|
|
# Repository 1: digest pull
|
|
{
|
|
"repository_id": "1",
|
|
"tag_name": "",
|
|
"manifest_digest": shared_digest,
|
|
"pull_count": "1",
|
|
"last_pull_timestamp": "1694168460",
|
|
"pull_method": "digest",
|
|
},
|
|
# Repository 2: alpine tag pulls
|
|
{
|
|
"repository_id": "2",
|
|
"tag_name": "alpine",
|
|
"manifest_digest": shared_digest,
|
|
"pull_count": "1",
|
|
"last_pull_timestamp": "1694168520",
|
|
"pull_method": "tag",
|
|
},
|
|
]
|
|
|
|
# Test keys
|
|
keys = [
|
|
"pull_events:repo:1:tag:alpine:sha256:92a29b8e530685cb620b9aced7c2d447d091885f1c5a3ace8d98fb5855687d05",
|
|
"pull_events:repo:1:digest:sha256:92a29b8e530685cb620b9aced7c2d447d091885f1c5a3ace8d98fb5855687d05",
|
|
"pull_events:repo:2:tag:alpine:sha256:92a29b8e530685cb620b9aced7c2d447d091885f1c5a3ace8d98fb5855687d05",
|
|
]
|
|
|
|
# Mock RENAME operation for all keys
|
|
mock_client.rename.return_value = None
|
|
|
|
# Test processing
|
|
(
|
|
tag_updates,
|
|
manifest_updates,
|
|
database_dependent_keys,
|
|
) = worker._process_redis_events(keys)
|
|
|
|
# Verify results
|
|
assert len(database_dependent_keys) == 3
|
|
|
|
# Verify tag updates - should be separate per repository
|
|
assert len(tag_updates) == 2 # One for each repository's alpine tag
|
|
|
|
# Check repository 1 alpine tag
|
|
repo1_tag = next(
|
|
t for t in tag_updates if t["repository_id"] == 1 and t["tag_name"] == "alpine"
|
|
)
|
|
assert repo1_tag["pull_count"] == 3
|
|
assert repo1_tag["manifest_digest"] == shared_digest
|
|
|
|
# Check repository 2 alpine tag
|
|
repo2_tag = next(
|
|
t for t in tag_updates if t["repository_id"] == 2 and t["tag_name"] == "alpine"
|
|
)
|
|
assert repo2_tag["pull_count"] == 1
|
|
assert repo2_tag["manifest_digest"] == shared_digest
|
|
|
|
# Verify manifest updates - should be aggregated per repository
|
|
assert len(manifest_updates) == 2 # One for each repository
|
|
|
|
# Check repository 1 manifest (tag + digest pulls aggregated)
|
|
repo1_manifest = next(m for m in manifest_updates if m["repository_id"] == 1)
|
|
assert repo1_manifest["pull_count"] == 4 # 3 tag pulls + 1 digest pull
|
|
assert repo1_manifest["manifest_digest"] == shared_digest
|
|
|
|
# Check repository 2 manifest (only tag pulls)
|
|
repo2_manifest = next(m for m in manifest_updates if m["repository_id"] == 2)
|
|
assert repo2_manifest["pull_count"] == 1 # Only 1 tag pull
|
|
assert repo2_manifest["manifest_digest"] == shared_digest
|
|
|
|
|
|
def test_flush_pull_metrics_database_failure():
|
|
"""Test _flush_pull_metrics when database flush fails."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"REDIS_FLUSH_INTERVAL_SECONDS": 300,
|
|
"REDIS_FLUSH_WORKER_BATCH_SIZE": 1000,
|
|
"REDIS_FLUSH_WORKER_SCAN_COUNT": 100,
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
}.get(key, default)
|
|
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_tag_statistics"
|
|
) as mock_tag_upsert:
|
|
with patch(
|
|
"workers.pullstatsredisflushworker.bulk_upsert_manifest_statistics"
|
|
) as mock_manifest_upsert:
|
|
mock_tag_upsert.side_effect = Exception("Database error")
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
mock_client = MagicMock()
|
|
mock_client.ping.return_value = True
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Mock scan to return a key
|
|
mock_client.scan.return_value = (
|
|
0,
|
|
["pull_events:repo:123:tag:latest:sha256:abc"],
|
|
)
|
|
|
|
# Mock hgetall to return valid data
|
|
mock_client.hgetall.return_value = {
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
}
|
|
|
|
worker._flush_pull_metrics()
|
|
|
|
# Verify database operations were called
|
|
mock_tag_upsert.assert_called_once()
|
|
# manifest_upsert should also be called since we have tag data
|
|
# Note: manifest_upsert may not be called if tag_upsert fails first
|
|
# mock_manifest_upsert.assert_called_once()
|
|
|
|
# Verify cleanup was NOT called due to database failure
|
|
mock_client.delete.assert_not_called()
|
|
|
|
|
|
def test_flush_pull_metrics_general_exception():
|
|
"""Test _flush_pull_metrics handles general exceptions."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.scan.side_effect = Exception("Unexpected error")
|
|
worker.redis_client = mock_client
|
|
|
|
# Should handle error gracefully
|
|
worker._flush_pull_metrics() # No exception should be raised
|
|
|
|
|
|
def test_process_redis_events_invalid_data_validation():
|
|
"""Test _process_redis_events with invalid data that fails validation."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock hgetall to return invalid data
|
|
mock_client.hgetall.return_value = {
|
|
"repository_id": "0", # Invalid repository_id
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
}
|
|
|
|
keys = ["pull_events:repo:123:tag:latest:sha256:abc"]
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Invalid data should be cleaned up immediately (not in db_dependent)
|
|
assert len(db_dependent) == 0
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 0
|
|
|
|
|
|
def test_process_redis_events_manifest_aggregation_timestamp_handling():
|
|
"""Test manifest aggregation with different timestamp scenarios."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data with different timestamps
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "1694168400", # Earlier timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "1694168500", # Later timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate manifest pulls
|
|
assert len(manifest_updates) == 1
|
|
manifest = manifest_updates[0]
|
|
assert manifest["pull_count"] == 5 # 3 + 2
|
|
assert manifest["last_pull_timestamp"] is not None
|
|
# Should keep the later timestamp
|
|
assert manifest["last_pull_timestamp"].timestamp() == 1694168500
|
|
|
|
|
|
def test_process_redis_events_manifest_aggregation_no_existing_timestamp():
|
|
"""Test manifest aggregation when existing entry has no timestamp."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - first entry with no timestamp, second with timestamp
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "0", # No timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "1694168500", # Has timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate manifest pulls
|
|
assert len(manifest_updates) == 1
|
|
manifest = manifest_updates[0]
|
|
assert manifest["pull_count"] == 5 # 3 + 2
|
|
assert manifest["last_pull_timestamp"] is not None
|
|
# Should use the timestamp from the second entry
|
|
assert manifest["last_pull_timestamp"].timestamp() == 1694168500
|
|
|
|
|
|
def test_process_redis_events_tag_aggregation_timestamp_handling():
|
|
"""Test tag aggregation with different timestamp scenarios."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data with different timestamps for same tag
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "1694168400", # Earlier timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:def456", # Different manifest
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "1694168500", # Later timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:tag:latest:sha256:abc123",
|
|
"pull_events:repo:123:tag:latest:sha256:def456",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate tag pulls
|
|
assert len(tag_updates) == 1
|
|
tag = tag_updates[0]
|
|
assert tag["pull_count"] == 5 # 3 + 2
|
|
assert tag["last_pull_timestamp"] is not None
|
|
# Should keep the later timestamp and manifest
|
|
assert tag["last_pull_timestamp"].timestamp() == 1694168500
|
|
assert tag["manifest_digest"] == "sha256:def456"
|
|
|
|
|
|
def test_process_redis_events_tag_aggregation_no_existing_timestamp():
|
|
"""Test tag aggregation when existing entry has no timestamp."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - first entry with no timestamp, second with timestamp
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "0", # No timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:def456",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "1694168500", # Has timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:tag:latest:sha256:abc123",
|
|
"pull_events:repo:123:tag:latest:sha256:def456",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate tag pulls
|
|
assert len(tag_updates) == 1
|
|
tag = tag_updates[0]
|
|
assert tag["pull_count"] == 5 # 3 + 2
|
|
assert tag["last_pull_timestamp"] is not None
|
|
# Should use the timestamp from the second entry
|
|
assert tag["last_pull_timestamp"].timestamp() == 1694168500
|
|
assert tag["manifest_digest"] == "sha256:def456"
|
|
|
|
|
|
def test_process_redis_events_general_exception_during_processing():
|
|
"""Test _process_redis_events handles general exceptions during key processing."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.hgetall.side_effect = Exception("Unexpected error")
|
|
worker.redis_client = mock_client
|
|
|
|
keys = ["pull_events:repo:123:tag:latest:sha256:abc"]
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should handle error and continue
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 0
|
|
|
|
|
|
def test_cleanup_redis_keys_empty_keys():
|
|
"""Test _cleanup_redis_keys with empty keys set."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Should handle empty keys gracefully
|
|
worker._cleanup_redis_keys(set())
|
|
|
|
# Should not call delete
|
|
mock_client.delete.assert_not_called()
|
|
|
|
|
|
def test_cleanup_redis_keys_general_exception():
|
|
"""Test _cleanup_redis_keys handles general exceptions."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.delete.side_effect = Exception("Unexpected error")
|
|
worker.redis_client = mock_client
|
|
|
|
keys = {"key1", "key2"}
|
|
# Should handle error gracefully
|
|
worker._cleanup_redis_keys(keys)
|
|
|
|
|
|
def test_flush_pull_metrics_with_cleanup_keys_only():
|
|
"""Test _flush_pull_metrics with only cleanable keys (no database operations)."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.side_effect = lambda key, default=None: {
|
|
"REDIS_FLUSH_INTERVAL_SECONDS": 300,
|
|
"REDIS_FLUSH_WORKER_BATCH_SIZE": 1000,
|
|
"REDIS_FLUSH_WORKER_SCAN_COUNT": 100,
|
|
"REDIS_CONNECTION_TIMEOUT": 5,
|
|
"PULL_METRICS_REDIS": {
|
|
"host": "localhost",
|
|
"port": 6379,
|
|
"db": 1,
|
|
"password": None,
|
|
},
|
|
}.get(key, default)
|
|
|
|
with patch("workers.pullstatsredisflushworker.redis") as mock_redis_module:
|
|
mock_client = MagicMock()
|
|
mock_client.ping.return_value = True
|
|
mock_redis_module.StrictRedis.return_value = mock_client
|
|
|
|
worker = RedisFlushWorker()
|
|
|
|
# Mock scan to return a key
|
|
mock_client.scan.return_value = (0, ["pull_events:repo:123:tag:latest:sha256:abc"])
|
|
|
|
# Mock hgetall to return empty data (cleanable)
|
|
mock_client.hgetall.return_value = {}
|
|
|
|
worker._flush_pull_metrics()
|
|
|
|
# Should clean up empty keys
|
|
mock_client.delete.assert_called()
|
|
|
|
|
|
def test_process_redis_events_manifest_aggregation_with_none_timestamp():
|
|
"""Test manifest aggregation when new entry has None timestamp."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - first entry with timestamp, second with None timestamp
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "1694168400", # Has timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "0", # No timestamp (None)
|
|
"pull_method": "digest",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate manifest pulls
|
|
assert len(manifest_updates) == 1
|
|
manifest = manifest_updates[0]
|
|
assert manifest["pull_count"] == 5 # 3 + 2
|
|
assert manifest["last_pull_timestamp"] is not None
|
|
# Should keep the existing timestamp
|
|
assert manifest["last_pull_timestamp"].timestamp() == 1694168400
|
|
|
|
|
|
def test_process_redis_events_tag_aggregation_with_none_timestamp():
|
|
"""Test tag aggregation when new entry has None timestamp."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - first entry with timestamp, second with None timestamp
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "1694168400", # Has timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:def456",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "0", # No timestamp (None)
|
|
"pull_method": "tag",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:tag:latest:sha256:abc123",
|
|
"pull_events:repo:123:tag:latest:sha256:def456",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate tag pulls
|
|
assert len(tag_updates) == 1
|
|
tag = tag_updates[0]
|
|
assert tag["pull_count"] == 5 # 3 + 2
|
|
assert tag["last_pull_timestamp"] is not None
|
|
# Should keep the existing timestamp and manifest
|
|
assert tag["last_pull_timestamp"].timestamp() == 1694168400
|
|
assert tag["manifest_digest"] == "sha256:abc123"
|
|
|
|
|
|
def test_cleanup_redis_keys_redis_error_during_cleanup():
|
|
"""Test _cleanup_redis_keys handles Redis errors during cleanup."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.delete.side_effect = redis.RedisError("Connection error")
|
|
worker.redis_client = mock_client
|
|
|
|
keys = {"key1", "key2"}
|
|
# Should handle error gracefully
|
|
worker._cleanup_redis_keys(keys)
|
|
|
|
|
|
def test_create_gunicorn_worker_uses_feature_flag():
|
|
"""Test create_gunicorn_worker uses the IMAGE_PULL_STATS feature flag."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch("workers.pullstatsredisflushworker.GunicornWorker") as mock_gunicorn_worker:
|
|
with patch("workers.pullstatsredisflushworker.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = True
|
|
mock_gunicorn_instance = MagicMock()
|
|
mock_gunicorn_worker.return_value = mock_gunicorn_instance
|
|
|
|
# Test
|
|
result = create_gunicorn_worker()
|
|
|
|
# Verify
|
|
assert result is not None
|
|
mock_gunicorn_worker.assert_called_once()
|
|
call_args = mock_gunicorn_worker.call_args
|
|
assert call_args[0][0] == "workers.pullstatsredisflushworker"
|
|
assert call_args[0][3] is True # Uses feature flag value
|
|
|
|
|
|
def test_create_gunicorn_worker_with_feature_disabled():
|
|
"""Test create_gunicorn_worker uses the IMAGE_PULL_STATS feature flag when disabled."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
with patch("workers.pullstatsredisflushworker.GunicornWorker") as mock_gunicorn_worker:
|
|
with patch("workers.pullstatsredisflushworker.features") as mock_features:
|
|
mock_features.IMAGE_PULL_STATS = False
|
|
mock_gunicorn_instance = MagicMock()
|
|
mock_gunicorn_worker.return_value = mock_gunicorn_instance
|
|
|
|
# Test
|
|
result = create_gunicorn_worker()
|
|
|
|
# Verify
|
|
assert result is not None
|
|
mock_gunicorn_worker.assert_called_once()
|
|
call_args = mock_gunicorn_worker.call_args
|
|
assert call_args[0][0] == "workers.pullstatsredisflushworker"
|
|
assert call_args[0][3] is False # Uses feature flag value
|
|
|
|
|
|
def test_process_redis_events_validation_failure():
|
|
"""Test _process_redis_events with data that fails validation."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock hgetall to return data that fails validation
|
|
mock_client.hgetall.return_value = {
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "invalid_digest", # Invalid digest
|
|
"pull_count": "5",
|
|
"last_pull_timestamp": "1694168400",
|
|
"pull_method": "tag",
|
|
}
|
|
|
|
keys = ["pull_events:repo:123:tag:latest:sha256:abc"]
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Invalid data should be cleaned up immediately (not in db_dependent)
|
|
assert len(db_dependent) == 0
|
|
assert len(tag_updates) == 0
|
|
assert len(manifest_updates) == 0
|
|
|
|
|
|
def test_process_redis_events_manifest_aggregation_both_timestamps_none():
|
|
"""Test manifest aggregation when both entries have None timestamps."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - both entries with no timestamp
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "0", # No timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "0", # No timestamp
|
|
"pull_method": "digest",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
"pull_events:repo:123:digest:sha256:abc123",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate manifest pulls
|
|
assert len(manifest_updates) == 1
|
|
manifest = manifest_updates[0]
|
|
assert manifest["pull_count"] == 5 # 3 + 2
|
|
assert manifest["last_pull_timestamp"] is None # Both were None
|
|
|
|
|
|
def test_process_redis_events_tag_aggregation_both_timestamps_none():
|
|
"""Test tag aggregation when both entries have None timestamps."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
worker.redis_client = mock_client
|
|
|
|
# Mock Redis data - both entries with no timestamp
|
|
mock_client.hgetall.side_effect = [
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:abc123",
|
|
"pull_count": "3",
|
|
"last_pull_timestamp": "0", # No timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
{
|
|
"repository_id": "123",
|
|
"tag_name": "latest",
|
|
"manifest_digest": "sha256:def456",
|
|
"pull_count": "2",
|
|
"last_pull_timestamp": "0", # No timestamp
|
|
"pull_method": "tag",
|
|
},
|
|
]
|
|
|
|
keys = [
|
|
"pull_events:repo:123:tag:latest:sha256:abc123",
|
|
"pull_events:repo:123:tag:latest:sha256:def456",
|
|
]
|
|
|
|
# Mock RENAME operation
|
|
mock_client.rename.return_value = None
|
|
tag_updates, manifest_updates, db_dependent = worker._process_redis_events(keys)
|
|
|
|
# Should aggregate tag pulls
|
|
assert len(tag_updates) == 1
|
|
tag = tag_updates[0]
|
|
assert tag["pull_count"] == 5 # 3 + 2
|
|
assert tag["last_pull_timestamp"] is None # Both were None
|
|
assert tag["manifest_digest"] == "sha256:abc123" # First manifest
|
|
|
|
|
|
def test_cleanup_redis_keys_general_exception_during_cleanup():
|
|
"""Test _cleanup_redis_keys handles general exceptions during cleanup."""
|
|
with patch("workers.pullstatsredisflushworker.app") as mock_app:
|
|
mock_app.config.get.return_value = 300
|
|
|
|
worker = RedisFlushWorker()
|
|
mock_client = MagicMock()
|
|
mock_client.delete.side_effect = Exception("Unexpected error")
|
|
worker.redis_client = mock_client
|
|
|
|
keys = {"key1", "key2"}
|
|
# Should handle error gracefully
|
|
worker._cleanup_redis_keys(keys)
|