1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/registry_model/test/test_registry_proxy_model.py
jbpratt 71d219cc35 fix(test): prevent MySQL deadlocks in parallel proxy model tests (PROJQUAY-0000) (#4605)
* fix(test): prevent MySQL deadlocks in parallel proxy model tests (PROJQUAY-0000)

Mark all registry proxy model test classes to run serially using
pytest-xdist group markers. These tests all use the same "quayio-cache"
organization and were causing MySQL deadlocks when run in parallel
across multiple workers with pytest -n auto.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(test): resolve Flask app naming conflict in quotaregistrysizeworker tests (PROJQUAY-0000)

Import Flask app with alias to avoid conflict with pytest 'app' fixture.
The test was using 'app.config' but 'app' resolved to a pytest fixture
definition instead of the Flask application object.

Follows the same pattern as test_securityscanningnotificationworker.py.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Brady Pratt <bpratt@redhat.com>

---------

Signed-off-by: Brady Pratt <bpratt@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
2025-11-24 15:28:34 +05:30

1603 lines
64 KiB
Python

import json
from datetime import timedelta
from unittest.mock import MagicMock, patch
import pytest
from playhouse.test_utils import assert_query_count
from app import storage
from data.database import (
ImageStorage,
ImageStorageLocation,
ImageStoragePlacement,
Manifest,
ManifestBlob,
ManifestChild,
Tag,
get_epoch_timestamp_ms,
)
from data.model import (
QuotaExceededException,
TagDoesNotExist,
namespacequota,
oci,
user,
)
from data.model.blob import store_blob_record_and_temp_link
from data.model.oci.manifest import get_or_create_manifest
from data.model.organization import create_organization
from data.model.proxy_cache import create_proxy_cache_config
from data.model.repository import create_repository
from data.model.storage import get_layer_path
from data.model.user import get_user
from data.registry_model import registry_model
from data.registry_model.datatypes import Manifest as ManifestType
from data.registry_model.registry_proxy_model import ProxyModel
from data.registry_model.test import testdata
from digest.digest_tools import sha256_digest
from image.docker.schema1 import DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE
from image.docker.schema2 import (
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
DOCKER_SCHEMA2_MANIFESTLIST_CONTENT_TYPE,
)
from image.docker.schema2.manifest import (
DockerSchema2Manifest,
DockerSchema2ManifestBuilder,
)
from image.docker.schema2.test.test_config import (
CONFIG_BYTES,
CONFIG_DIGEST,
CONFIG_SIZE,
)
from image.shared import ManifestException
from image.shared.schemas import parse_manifest_from_bytes
from proxy.fixtures import proxy_manifest_response # noqa: F401,F403
from test.fixtures import * # noqa: F401,F403
from util.bytes import Bytes
UBI8_8_5_DIGEST = "sha256:8ee9d7bbcfc19d383f9044316a5c5fbcbe2df6be3c97f6c7a5422527b29bdede"
UBI8_8_5_MANIFEST_SCHEMA2 = r"""{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 4365,
"digest": "sha256:cc0656847854310306093b3dc1a7d9e7fc06399da46853e0c921cd5ec1906bfd"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 81428527,
"digest": "sha256:ce3c6836540f978b55c511d236429e26b7a45f5a6f1204ab8d4378afaf77332f"
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 1792,
"digest": "sha256:63f9f4c31162a6a5dacd999a0dc65007e15b2ca6b2d9360a1234c27de12e7f38"
}
]
}"""
UBI8_8_4_DIGEST = "sha256:5e334d76fc059f7b44ee8fc2da6a2e8b240582d0214364c8c88596d20b33d7f1"
UBI8_8_4_MANIFEST_SCHEMA2 = r"""{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 4366,
"digest": "sha256:53ce4390f2adb1681eb1a90ec8b48c49c015e0a8d336c197637e7f65e365fa9e"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 83368932,
"digest": "sha256:262268b65bd5f33784d6a61514964887bc18bc00c60c588bc62bfae7edca46f1"
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 1795,
"digest": "sha256:06038631a24a25348b51d1bfc7d0a0ee555552a8998f8328f9b657d02dd4c64c"
}
]
}"""
@pytest.fixture
def create_repo():
def _create_repo(orgname, reponame, user):
r = create_repository(orgname, reponame, user)
assert r is not None
repo_ref = registry_model.lookup_repository(orgname, reponame)
assert repo_ref is not None
return repo_ref
return _create_repo
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_registry_proxy_model_init_only_query_db_once(initialized_db):
orgname = "testorg"
user = get_user("devtable")
org = create_organization(orgname, "{self.orgname}@devtable.com", user)
org.save()
create_proxy_cache_config(
org_name=orgname,
upstream_registry="quay.io",
expiration_s=3600,
)
with assert_query_count(1):
ProxyModel(
orgname,
"app-sre/ubi8-ubi",
user,
)
@pytest.mark.xdist_group("registry_proxy_serial")
class TestLookUpRepositoryWithRepoCreationVisibilityFlag:
orgname = "quayio-cache"
upstream_repository = "library/nginx"
upstream_registry = "docker.io"
tag = "latest"
@pytest.fixture(autouse=True)
def setup(self, app):
self.user = get_user("devtable")
org = create_organization(self.orgname, "{self.orgname}@devtable.com", self.user)
org.save()
self.config = create_proxy_cache_config(
org_name=self.orgname,
upstream_registry=self.upstream_registry,
expiration_s=3600,
)
self.proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_lookup_repository_creates_private_repo_when_enabled(self):
with patch("data.registry_model.registry_proxy_model.app", MagicMock()) as app_mock:
app_mock.config = {"CREATE_PRIVATE_REPO_ON_PUSH": True}
repo_ref = self.proxy_model.lookup_repository(
self.orgname, self.upstream_repository, manifest_ref=self.tag
)
assert repo_ref is not None
assert repo_ref.is_public is False
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_lookup_repository_creates_public_repo_when_disabled(self):
with patch("data.registry_model.registry_proxy_model.app", MagicMock()) as app_mock:
app_mock.config = {"CREATE_PRIVATE_REPO_ON_PUSH": False}
repo_ref = self.proxy_model.lookup_repository(
self.orgname, self.upstream_repository, manifest_ref=self.tag
)
assert repo_ref is not None
assert repo_ref.is_public is True
@pytest.mark.xdist_group("registry_proxy_serial")
class TestRegistryProxyModelGetSchema1ParsedManifest:
upstream_registry = "quay.io"
upstream_repository = "app-sre/ubi8-ubi"
orgname = "quayio-cache"
repository = f"{orgname}/{upstream_repository}"
tag = "8.4"
@pytest.fixture(autouse=True)
def setup(self, app, create_repo):
self.user = get_user("devtable")
self.org = create_organization(self.orgname, "{self.orgname}@devtable.com", self.user)
self.org.save()
self.config = create_proxy_cache_config(
org_name=self.orgname,
upstream_registry=self.upstream_registry,
expiration_s=3600,
)
self.repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_raises_exception_with_manifest_list(self):
manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LATEST["manifest"]),
testdata.UBI8_LATEST["content-type"],
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
with pytest.raises(ManifestException):
proxy_model.get_schema1_parsed_manifest(
manifest,
self.orgname,
self.upstream_repository,
self.tag,
storage,
raise_on_error=True,
)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_raises_exception_with_docker_v2_manifest_to_v1(self):
def get_blob(layer):
content = Bytes.for_string_or_unicode(layer).as_encoded_str()
digest = str(sha256_digest(content))
blob = store_blob_record_and_temp_link(
self.orgname,
self.upstream_repository,
digest,
ImageStorageLocation.get(name="local_us"),
len(content),
120,
)
storage.put_content(["local_us"], get_layer_path(blob), content)
return blob, digest
layer1 = json.dumps(
{
"config": {},
"rootfs": {"type": "layers", "diff_ids": []},
"history": [{}],
}
)
_, config_digest = get_blob(layer1)
layer2 = "hello world"
_, blob_digest = get_blob(layer2)
builder = DockerSchema2ManifestBuilder()
builder.set_config_digest(config_digest, len(layer1.encode("utf-8")))
builder.add_layer(blob_digest, len(layer2.encode("utf-8")))
manifest = builder.build()
created_manifest = get_or_create_manifest(self.repo_ref.id, manifest, storage)
assert created_manifest is not None
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
m = ManifestType.for_manifest(created_manifest.manifest, MagicMock())
with pytest.raises(ManifestException):
proxy_model.get_schema1_parsed_manifest(
m,
self.orgname,
self.upstream_repository,
self.tag,
storage,
raise_on_error=True,
)
@pytest.mark.xdist_group("registry_proxy_serial")
class TestRegistryProxyModelCreateManifestAndRetargetTag:
upstream_registry = "quay.io"
upstream_repository = "app-sre/ubi8-ubi"
orgname = "quayio-cache"
repository = f"{orgname}/{upstream_repository}"
tag = "8.4"
@pytest.fixture(autouse=True)
def setup(self, app):
self.user = get_user("devtable")
self.org = create_organization(self.orgname, "{self.orgname}@devtable.com", self.user)
self.org.save()
self.config = create_proxy_cache_config(
org_name=self.orgname,
upstream_registry=self.upstream_registry,
expiration_s=3600,
)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_create_manifest_and_temp_tag_when_they_dont_exist(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest, tag = proxy_model._create_manifest_and_retarget_tag(
repo_ref, input_manifest, self.tag
)
assert manifest is not None
assert tag is not None
assert manifest.internal_manifest_bytes.as_unicode() == UBI8_8_4_MANIFEST_SCHEMA2
assert manifest.digest == UBI8_8_4_DIGEST
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_create_8_4_tag_for_existing_manifest(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
# first create the manifest with a temp tag
first_manifest, _ = proxy_model._create_manifest_with_temp_tag(repo_ref, input_manifest)
# now try to create it again, but using the actual tag
manifest, tag = proxy_model._create_manifest_and_retarget_tag(
repo_ref, input_manifest, self.tag
)
assert first_manifest is not None
assert manifest is not None
assert tag is not None
assert manifest.internal_manifest_bytes.as_unicode() == UBI8_8_4_MANIFEST_SCHEMA2
assert manifest.digest == UBI8_8_4_DIGEST
assert manifest.id == first_manifest.id
@patch("data.registry_model.registry_proxy_model.proxy_cache_blob_queue.put")
@patch.object(ProxyModel, "_create_blob")
def test__create_placeholder_blobs_direct(self, mock_create_blob, mock_queue_put, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest_id = 123
repo_id = repo_ref.id
username = self.user.username
orgname = self.orgname
# Call the private method directly
proxy_model._create_placeholder_blobs(input_manifest, manifest_id, repo_id)
# Should call _create_blob for config and each layer
expected_blob_calls = 1 + len(input_manifest.manifest_dict["layers"])
assert mock_create_blob.call_count == expected_blob_calls
# Should call queue.put for each layer
assert mock_queue_put.call_count == len(input_manifest.manifest_dict["layers"])
# Check that the digests match what is expected
expected_digests = [layer["digest"] for layer in input_manifest.manifest_dict["layers"]]
expected_digests.append(input_manifest.config.digest)
actual_digests = [call.args[0] for call in mock_create_blob.call_args_list]
# Check the payloads sent to the queue for each layer
for i, layer in enumerate(input_manifest.manifest_dict["layers"]):
args, kwargs = mock_queue_put.call_args_list[i]
# First arg is a list: [username, repo_id, layer_digest]
assert args[0] == [self.orgname, str(repo_id), str(layer["digest"])]
# Second arg is a JSON string with the payload
payload = json.loads(args[1])
assert payload == {
"digest": str(layer["digest"]),
"repo_id": repo_id,
"username": username,
"namespace": orgname,
}
# available_after should be 5
assert kwargs["available_after"] == 5
@patch("data.registry_model.registry_proxy_model.proxy_cache_blob_queue.put")
@patch.object(ProxyModel, "_create_blob")
def test__create_placeholder_blobs_with_none_user(
self, mock_create_blob, mock_queue_put, create_repo
):
"""Test that _create_placeholder_blobs handles None user for public repositories (PROJQUAY-9346)"""
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
# Create ProxyModel with None user (simulates public repository access)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
None, # This is the key test - None user
)
manifest_id = 123
repo_id = repo_ref.id
orgname = self.orgname
# Call the private method directly - this should not fail with AttributeError
proxy_model._create_placeholder_blobs(input_manifest, manifest_id, repo_id)
# Should call _create_blob for config and each layer
expected_blob_calls = 1 + len(input_manifest.manifest_dict["layers"])
assert mock_create_blob.call_count == expected_blob_calls
# Should call queue.put for each layer
assert mock_queue_put.call_count == len(input_manifest.manifest_dict["layers"])
# Check the payloads sent to the queue for each layer - username should be None
for i, layer in enumerate(input_manifest.manifest_dict["layers"]):
args, kwargs = mock_queue_put.call_args_list[i]
# First arg is a list: [namespace, repo_id, layer_digest]
assert args[0] == [self.orgname, str(repo_id), str(layer["digest"])]
# Second arg is a JSON string with the payload
payload = json.loads(args[1])
assert payload == {
"digest": str(layer["digest"]),
"repo_id": repo_id,
"username": None, # This should be None for public repositories
"namespace": orgname,
}
# available_after should be 5
assert kwargs["available_after"] == 5
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_create_placeholder_blobs_for_new_manifest(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest, _ = proxy_model._create_manifest_and_retarget_tag(
repo_ref, input_manifest, self.tag
)
assert manifest is not None
blob_count = 1 # schema 2 manifests have one extra config blob
blob_count += len(input_manifest.manifest_dict["layers"])
mblobs = ManifestBlob.select().where(ManifestBlob.manifest == manifest.id)
assert blob_count == mblobs.count()
expected_digests = [layer["digest"] for layer in input_manifest.manifest_dict["layers"]]
expected_digests.append(input_manifest.config.digest)
created_digests = [mblob.blob.content_checksum for mblob in mblobs]
assert sorted(expected_digests) == sorted(created_digests)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_connect_existing_blobs_to_new_manifest(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
layer = input_manifest.manifest_dict["layers"][0]
blob = ImageStorage.create(
image_size=layer["size"],
uncompressed_size=layer["size"],
content_checksum=layer["digest"],
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
proxy_model._create_manifest_and_retarget_tag(repo_ref, input_manifest, self.tag)
blob_count = (
ImageStorage.select()
.where(ImageStorage.content_checksum == blob.content_checksum)
.count()
)
assert blob_count == 1
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_create_sub_manifests_for_manifest_list(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LATEST["manifest"]),
testdata.UBI8_LATEST["content-type"],
sparse_manifest_support=True,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest, _ = proxy_model._create_manifest_and_retarget_tag(
repo_ref, input_manifest, self.tag
)
mchildren = ManifestChild.select().where(ManifestChild.manifest == manifest.id)
created_count = mchildren.count()
expected_count = len(list(input_manifest.child_manifests(content_retriever=None)))
assert expected_count == created_count
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_create_temp_tags_for_newly_created_sub_manifests_on_manifest_list(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LATEST["manifest"]),
testdata.UBI8_LATEST["content-type"],
sparse_manifest_support=True,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest, _ = proxy_model._create_manifest_and_retarget_tag(
repo_ref, input_manifest, self.tag
)
mchildren = ManifestChild.select(ManifestChild.child_manifest_id).where(
ManifestChild.manifest == manifest.id
)
tags = Tag.select().join(
ManifestChild, on=(Tag.manifest_id == ManifestChild.child_manifest_id)
)
assert mchildren.count() == tags.count()
assert all([t.hidden for t in tags]), "all sub manifest tags must be hidden"
assert all(
[t.name != self.tag for t in tags]
), "sub manifest tags must have temp tag name, not parent manifest name"
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_lookup_manifest_with_allow_hidden_and_allow_dead_in_create_manifest_with_temp_tag(
self, create_repo
):
"""
Test that _create_manifest_with_temp_tag calls oci.manifest.lookup_manifest
with allow_hidden=True, allow_dead=True when looking up child manifests.
This test specifically covers the change on line 586 where
oci.manifest.lookup_manifest is called with these specific parameters.
"""
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LATEST["manifest"]),
testdata.UBI8_LATEST["content-type"],
sparse_manifest_support=True,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
# Mock oci.manifest.lookup_manifest to verify it's called with correct parameters
with patch(
"data.registry_model.registry_proxy_model.oci.manifest.lookup_manifest"
) as mock_lookup:
# Mock the lookup to return None (simulating manifest not found)
mock_lookup.return_value = None
# Mock oci.tag.create_temporary_tag_if_necessary
with patch(
"data.registry_model.registry_proxy_model.oci.tag.create_temporary_tag_if_necessary"
) as mock_create_tag:
mock_tag = MagicMock()
mock_create_tag.return_value = mock_tag
# Mock oci.manifest.connect_manifests
with patch(
"data.registry_model.registry_proxy_model.oci.manifest.connect_manifests"
):
manifest, tag = proxy_model._create_manifest_with_temp_tag(
repo_ref, input_manifest
)
# Verify that lookup_manifest was called with allow_hidden=True, allow_dead=True
# for each child manifest in the manifest list
expected_calls = []
for child in input_manifest.child_manifests(content_retriever=None):
expected_calls.append(
(
(repo_ref.id, child.digest),
{"allow_hidden": True, "allow_dead": True},
)
)
# Check that lookup_manifest was called the expected number of times
assert mock_lookup.call_count == len(expected_calls)
# Verify each call had the correct parameters
for i, call in enumerate(mock_lookup.call_args_list):
args, kwargs = call
assert args[0] == repo_ref.id # repository_id
assert args[1] == expected_calls[i][0][1] # child.digest
assert kwargs["allow_hidden"] is True
assert kwargs["allow_dead"] is True
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_connect_existing_manifest_to_manifest_list(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LINUX_AMD64["manifest"]),
testdata.UBI8_LINUX_AMD64["content-type"],
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest, _ = proxy_model._create_manifest_with_temp_tag(repo_ref, input_manifest)
assert manifest is not None
input_list = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LATEST["manifest"]),
testdata.UBI8_LATEST["content-type"],
sparse_manifest_support=True,
)
manifest_list, _ = proxy_model._create_manifest_and_retarget_tag(
repo_ref, input_list, self.tag
)
assert manifest_list is not None
conn_count = (
ManifestChild.select()
.where(
ManifestChild.manifest == manifest_list.id,
ManifestChild.child_manifest == manifest.id,
)
.count()
)
assert conn_count == 1
@pytest.mark.xdist_group("registry_proxy_serial")
class TestRegistryProxyModelLookupManifestByDigest:
upstream_registry = "quay.io"
upstream_repository = "app-sre/ubi8-ubi"
orgname = "quayio-cache"
repository = f"{orgname}/{upstream_repository}"
digest = UBI8_8_4_DIGEST
@pytest.fixture(autouse=True)
def setup(self, app):
self.user = get_user("devtable")
self.org = create_organization(self.orgname, "{self.orgname}@devtable.com", self.user)
self.org.save()
self.config = create_proxy_cache_config(
org_name=self.orgname,
upstream_registry=self.upstream_registry,
expiration_s=3600,
)
def test_returns_cached_manifest_when_it_exists_upstream(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
assert manifest.digest == UBI8_8_4_DIGEST
first_manifest = manifest
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
assert manifest.id == first_manifest.id
assert manifest.digest == first_manifest.digest
# one for each lookup_manifest_by_digest call
assert proxy_mock.manifest_exists.call_count == 2
# single call from first call to lookup_manifest_by_digest
assert proxy_mock.get_manifest.call_count == 1
def test_renew_tag_when_manifest_is_cached_and_exists_upstream(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
first_tag = oci.tag.get_tag_by_manifest_id(repo_ref.id, manifest.id)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
tag = oci.tag.get_tag_by_manifest_id(repo_ref.id, manifest.id)
assert tag.lifetime_end_ms > first_tag.lifetime_end_ms
def test_recreate_tag_when_manifest_is_cached_and_exists_upstream(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
Tag.delete().where(Tag.manifest_id == manifest.id).execute()
for child_manifest in oci.tag.get_child_manifests(repo_ref.id, manifest.id):
Tag.delete().where(Tag.manifest_id == child_manifest).execute()
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
def test_renew_manifest_and_parent_tags_when_manifest_has_multiple_parents(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
with patch(
"data.registry_model.registry_proxy_model.Proxy",
MagicMock(
return_value=proxy_manifest_response(
"bullseye",
testdata.PYTHON_BULLSEYE["manifest"],
testdata.PYTHON_BULLSEYE["content-type"],
)
),
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
bullseye = proxy_model.get_repo_tag(repo_ref, "bullseye")
assert bullseye is not None
with patch(
"data.registry_model.registry_proxy_model.Proxy",
MagicMock(
return_value=proxy_manifest_response(
"latest",
testdata.PYTHON_LATEST["manifest"],
testdata.PYTHON_LATEST["content-type"],
)
),
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
latest = proxy_model.get_repo_tag(repo_ref, "latest")
assert latest is not None
with patch(
"data.registry_model.registry_proxy_model.Proxy",
MagicMock(
return_value=proxy_manifest_response(
testdata.PYTHON_LINUX_AMD64["digest"],
testdata.PYTHON_LINUX_AMD64["manifest"],
testdata.PYTHON_LINUX_AMD64["content-type"],
)
),
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(
repo_ref, testdata.PYTHON_LINUX_AMD64["digest"]
)
assert manifest is not None
updated_bullseye = oci.tag.get_tag_by_manifest_id(repo_ref.id, bullseye.manifest.id)
updated_latest = oci.tag.get_tag_by_manifest_id(repo_ref.id, latest.manifest.id)
assert updated_bullseye.lifetime_end_ms > bullseye.lifetime_end_ms
assert updated_latest.lifetime_end_ms > latest.lifetime_end_ms
def test_renew_manifest_and_parent_tag_when_manifest_is_child_of_manifest_list(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_list = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LATEST["manifest"]),
testdata.UBI8_LATEST["content-type"],
sparse_manifest_support=True,
)
proxy_mock = proxy_manifest_response(
"latest",
testdata.UBI8_LATEST["manifest"],
testdata.UBI8_LATEST["content-type"],
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, "latest")
assert tag is not None
manifest_list = tag.manifest
assert manifest_list is not None
child = (
ManifestChild.select(ManifestChild.child_manifest_id)
.join(Manifest, on=(ManifestChild.child_manifest_id == Manifest.id))
.where(
(ManifestChild.manifest_id == manifest_list.id)
& (Manifest.digest == testdata.UBI8_LINUX_AMD64["digest"])
)
)
manifest_tag = Tag.select().where(Tag.manifest == child).get()
manifest_list_tag = tag
proxy_mock = proxy_manifest_response(
testdata.UBI8_LINUX_AMD64["digest"],
testdata.UBI8_LINUX_AMD64["manifest"],
testdata.UBI8_LINUX_AMD64["content-type"],
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(
repo_ref, testdata.UBI8_LINUX_AMD64["digest"]
)
updated_tag = oci.tag.get_tag_by_manifest_id(repo_ref.id, manifest.id)
updated_list_tag = oci.tag.get_tag_by_manifest_id(repo_ref.id, manifest_list.id)
assert updated_tag.id == manifest_tag.id
assert updated_list_tag.id == manifest_list_tag.id
assert updated_tag.lifetime_end_ms > manifest_tag.lifetime_end_ms
assert updated_list_tag.lifetime_end_ms > manifest_list_tag.lifetime_end_ms
def test_renew_manifest_list_tag_when_upstream_manifest_changed(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
def create_image(tag: str, manifest_info: dict):
proxy_mock = proxy_manifest_response(
tag, manifest_info["manifest"], manifest_info["content-type"]
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, tag)
return tag
def pull_manifest_by_digest(manifest_info: dict):
proxy_mock = proxy_manifest_response(
manifest_info["digest"], manifest_info["manifest"], manifest_info["content-type"]
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, manifest_info["digest"])
return manifest
# start off with latest pointing to 1.22
_1_22 = create_image("1.22", testdata.NGINX_1_22)
latest = create_image("latest", testdata.NGINX_1_22)
assert _1_22 is not None
assert latest is not None
assert latest.manifest.digest == _1_22.manifest.digest
manifest = pull_manifest_by_digest(testdata.NGINX_1_22_LINUX_AMD64)
assert manifest is not None
# now update latest to point to 1.23.1
_1_23_1 = create_image("1.23.1", testdata.NGINX_1_23_1)
new_latest = create_image("latest", testdata.NGINX_1_23_1)
assert _1_23_1 is not None
assert new_latest is not None
assert new_latest.manifest.digest == _1_23_1.manifest.digest
manifest = pull_manifest_by_digest(testdata.NGINX_1_23_1_LINUX_AMD64)
assert manifest is not None
# now switch back to 1.22
latest = create_image("latest", testdata.NGINX_1_22)
assert _1_22 is not None
assert latest is not None
assert latest.manifest.digest == _1_22.manifest.digest
manifest = pull_manifest_by_digest(testdata.NGINX_1_22_LINUX_AMD64)
assert manifest is not None
def test_update_relevant_manifest_fields_when_manifest_is_placeholder(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
"latest", testdata.UBI8_LATEST["manifest"], testdata.UBI8_LATEST["content-type"]
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, "latest")
assert tag is not None
assert tag.manifest.digest == testdata.UBI8_LATEST["digest"]
assert tag.manifest.is_manifest_list
proxy_mock = proxy_manifest_response(
testdata.UBI8_LINUX_AMD64["digest"],
testdata.UBI8_LINUX_AMD64["manifest"],
testdata.UBI8_LINUX_AMD64["content-type"],
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(
repo_ref, testdata.UBI8_LINUX_AMD64["digest"]
)
mbytes = manifest.internal_manifest_bytes.as_unicode()
assert mbytes != ""
assert manifest.digest == testdata.UBI8_LINUX_AMD64["digest"]
assert manifest.layers_compressed_size == 772795
def test_renew_tag_when_cache_is_expired_and_manifest_is_up_to_date_with_upstream(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
before_ms = get_epoch_timestamp_ms() - timedelta(hours=24).total_seconds() * 1000
Tag.update(
lifetime_start_ms=before_ms,
lifetime_end_ms=before_ms + 5,
).where(Tag.manifest == manifest.id).execute()
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
tag = Tag.get(manifest_id=manifest.id)
now_ms = get_epoch_timestamp_ms()
assert tag.lifetime_end_ms > now_ms
def test_return_None_when_local_cache_is_expired_and_manifest_no_longer_exists_upstream(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
# expire the tag by setting start and end time to the past
before_ms = get_epoch_timestamp_ms() - timedelta(hours=24).total_seconds() * 1000
Tag.update(
lifetime_start_ms=before_ms,
lifetime_end_ms=before_ms + 5,
).where(Tag.manifest == manifest.id).execute()
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is None
def test_return_None_when_manifest_is_placeholder_and_upstream_is_down(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
"latest", testdata.UBI8_LATEST["manifest"], DOCKER_SCHEMA2_MANIFESTLIST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, "latest")
assert tag is not None
assert tag.manifest is not None
proxy_mock = proxy_manifest_response("does-not-exist", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(
repo_ref, testdata.UBI8_LINUX_AMD64["digest"]
)
assert manifest is None
def test_returns_cached_manifest_when_upstream_errors_and_cache_is_not_expired(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
assert manifest.digest == UBI8_8_4_DIGEST
first_manifest = manifest
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
assert manifest.id == first_manifest.id
assert manifest.digest == first_manifest.digest
assert proxy_mock.manifest_exists.call_count == 1
assert proxy_mock.get_manifest.call_count == 0
def test_does_not_bump_tag_expiration_when_manifest_is_cached_and_upstream_errors(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
first_tag = Tag.get(manifest_id=manifest.id)
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = proxy_model.lookup_manifest_by_digest(repo_ref, UBI8_8_4_DIGEST)
assert manifest is not None
tag = Tag.get(manifest_id=manifest.id)
assert tag.id == first_tag.id
assert tag.lifetime_end_ms == first_tag.lifetime_end_ms
@pytest.mark.xdist_group("registry_proxy_serial")
class TestRegistryProxyModelGetRepoTag:
upstream_registry = "quay.io" # alternatively, use quay.io/someorg
upstream_repository = "app-sre/ubi8-ubi"
orgname = "quayio-cache"
repository = f"{orgname}/{upstream_repository}"
tag = "latest"
@pytest.fixture(autouse=True)
def setup(self, app):
self.user = get_user("devtable")
self.org = create_organization(self.orgname, "{self.orgname}@devtable.com", self.user)
self.org.save()
self.config = create_proxy_cache_config(
org_name=self.orgname,
upstream_registry=self.upstream_registry,
expiration_s=3600,
)
def test_caches_manifest_on_first_pull(self, create_repo, proxy_manifest_response):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.manifest is not None
assert tag.manifest.internal_manifest_bytes.as_unicode() == UBI8_8_4_MANIFEST_SCHEMA2
def test_updates_manifest_and_bumps_tag_expiration_when_upstream_manifest_changed(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.name == self.tag
first_manifest = tag.manifest
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_5_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.name == self.tag
assert tag.manifest.id != first_manifest.id
assert tag.manifest.digest == UBI8_8_5_DIGEST
def test_renews_expired_tag_when_manifest_is_up_to_date_with_upstream(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_5_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.name == self.tag
# expire the tag by setting start and end time to the past
before_ms = get_epoch_timestamp_ms() - timedelta(hours=24).total_seconds() * 1000
Tag.update(
lifetime_start_ms=before_ms,
lifetime_end_ms=before_ms + 5,
).where(Tag.id == tag.id).execute()
expired_tag = tag
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert expired_tag.id == tag.id
assert expired_tag.manifest.id == tag.manifest.id
assert not tag.expired
new_expiration_ms = get_epoch_timestamp_ms() + self.config.expiration_s * 1000
# subtract a some milliseconds so the test doesn't flake
assert tag.lifetime_end_ms >= new_expiration_ms - 500
def test_passes_through_upstream_error_when_image_isnt_cached(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
with pytest.raises(TagDoesNotExist):
proxy_model.get_repo_tag(repo_ref, self.tag)
def test_passes_through_upstream_error_when_local_cache_is_expired(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_5_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
# expire the tag by setting start and end time to the past
before_ms = get_epoch_timestamp_ms() - timedelta(hours=24).total_seconds() * 1000
Tag.update(
lifetime_start_ms=before_ms,
lifetime_end_ms=before_ms + 5,
).where(Tag.id == tag.id).execute()
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is None
def test_returns_None_when_manifest_no_longer_exists_upstream_and_local_cache_is_expired(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_5_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
# expire the tag by setting start and end time to the past
before_ms = get_epoch_timestamp_ms() - timedelta(hours=24).total_seconds() * 1000
Tag.update(
lifetime_start_ms=before_ms,
lifetime_end_ms=before_ms + 5,
).where(Tag.id == tag.id).execute()
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is None
def test_bumps_tag_expiration_when_upstream_is_alive_and_cache_is_up_to_date(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_5_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.name == self.tag
first_tag = tag
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.lifetime_end_ms > first_tag.lifetime_end_ms
def test_doesnt_bump_tag_expiration_when_upstream_is_dead(
self, create_repo, proxy_manifest_response
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
proxy_mock = proxy_manifest_response(
self.tag, UBI8_8_5_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
first_tag = tag
proxy_mock = proxy_manifest_response("not-existing-ref", "", "")
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
tag = proxy_model.get_repo_tag(repo_ref, self.tag)
assert tag is not None
assert tag.lifetime_end_ms == first_tag.lifetime_end_ms
def test_missing_layers_in_proxy_cache(self, proxy_manifest_response):
"""
Test to verify that layers are missing in proxy cache when they exist locally
but were not explicitly requested by the client.
Note: this is not an expected behavior. The fix is expected with (PROJQUAY-8440)
"""
proxy_mock = proxy_manifest_response(
UBI8_8_4_DIGEST, UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
manifest = DockerSchema2Manifest(
Bytes.for_string_or_unicode(self.create_manifest_with_valid_layers())
)
assert manifest is not None
assert len(manifest.filesystem_layers) == 4
missing_layers = []
for layer in manifest.filesystem_layers:
exists = (
ImageStorage.select().where(ImageStorage.content_checksum == layer.digest).exists()
)
if not exists:
missing_layers.append(layer.digest)
# Assert that we have missing layers
assert len(missing_layers) > 0
def create_manifest_with_valid_layers(self):
return json.dumps(
{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": CONFIG_SIZE,
"digest": CONFIG_DIGEST,
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 1234,
"digest": "sha256:ec4b8955958665577945c89419d1af06b5f7636b4ac3da7f12184802ad867736",
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 32654,
"digest": "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f",
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 16724,
"digest": "sha256:3c3a4604a545cdc127456d94e421cd355bca5b528f4a9c1905b15da2eb4a4c6b",
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 73109,
"digest": "sha256:ec4b8955958665577945c89419d1af06b5f7636b4ac3da7f12184802ad867736",
},
],
}
).encode("utf-8")
class TestPruningLRUProxiedImagesToAllowBlobUpload:
upstream_registry = "docker.io"
upstream_repository = "library/busybox"
orgname = "proxy-cache"
repository = f"{orgname}/{upstream_repository}"
tag = "1.35.0"
@pytest.fixture(autouse=True)
def setup(self, app):
self.user = get_user("devtable")
self.org = create_organization(self.orgname, "{self.orgname}@devtable.com", self.user)
self.org.save()
self.config = create_proxy_cache_config(
org_name=self.orgname,
upstream_registry=self.upstream_registry,
expiration_s=3600,
)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_auto_pruning_skipped_if_no_quota_set(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(UBI8_8_4_MANIFEST_SCHEMA2),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
try:
proxy_model._check_image_upload_possible_or_prune(repo_ref, input_manifest)
except QuotaExceededException:
assert False, "No exception should be raised here"
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_auto_pruning_skipped_for_manifest_list(self, create_repo, initialized_db):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.PYTHON_LATEST["manifest"]),
DOCKER_SCHEMA2_MANIFESTLIST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
try:
proxy_model._check_image_upload_possible_or_prune(repo_ref, input_manifest)
except QuotaExceededException:
assert False, "No exception should be raised here"
# verify ns size did not change
assert namespacequota.get_namespace_size(self.orgname) == 0
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_raises_quota_exceed_when_blob_is_bigger_than_max_quota(self, create_repo):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.PYTHON_LINUX_AMD64["manifest"]),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
limit_bytes = 2048
namespace = user.get_user_or_org(self.orgname)
namespacequota.create_namespace_quota(namespace, limit_bytes)
with pytest.raises(QuotaExceededException):
proxy_model._check_image_upload_possible_or_prune(repo_ref, input_manifest)
@patch("data.registry_model.registry_proxy_model.Proxy", MagicMock())
def test_auto_pruning_when_quota_limit_reached(
self, create_repo, proxy_manifest_response, initialized_db
):
repo_ref = create_repo(self.orgname, self.upstream_repository, self.user)
limit_bytes = 83375093
namespace = user.get_user_or_org(self.orgname)
namespacequota.create_namespace_quota(namespace, limit_bytes)
proxy_mock = proxy_manifest_response(
"8.4", UBI8_8_4_MANIFEST_SCHEMA2, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.upstream_repository,
self.user,
)
first_manifest = proxy_model.get_repo_tag(repo_ref, "8.4")
assert first_manifest is not None
first_tag = oci.tag.get_tag(repo_ref.id, "8.4")
assert first_tag is not None
assert namespacequota.get_namespace_size(self.orgname) == 83375093
# pull a different tag when the quota limit is reached and verify
# that the previous tag was removed
input_manifest = parse_manifest_from_bytes(
Bytes.for_string_or_unicode(testdata.UBI8_LINUX_AMD64["manifest"]),
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
)
assert proxy_model._check_image_upload_possible_or_prune(repo_ref, input_manifest) is None
first_tag = oci.tag.get_tag(repo_ref.id, "8.4")
assert first_tag is None