# -*- coding: utf-8 -*- import hashlib import json import os import uuid from datetime import datetime, timedelta from io import BytesIO import pytest from mock import patch from playhouse.test_utils import assert_query_count from app import docker_v2_signing_key, model_cache, storage from data import model from data.cache import cache_key from data.cache.impl import InMemoryDataModelCache from data.cache.test.test_cache import TEST_CACHE_CONFIG from data.database import ( ImageStorageLocation, Manifest, ManifestBlob, ManifestLabel, Repository, Tag, User, get_epoch_timestamp_ms, ) from data.model import QuotaExceededException, namespacequota from data.model.blob import store_blob_record_and_temp_link from data.model.oci.retriever import RepositoryContentRetriever from data.model.storage import get_layer_path from data.registry_model.blobuploader import BlobUploadSettings, upload_blob from data.registry_model.datatypes import RepositoryReference from data.registry_model.registry_oci_model import OCIModel from digest.digest_tools import sha256_digest from image.docker.schema1 import ( DOCKER_SCHEMA1_CONTENT_TYPES, DockerSchema1Manifest, DockerSchema1ManifestBuilder, ) from image.docker.schema2.list import DockerSchema2ManifestListBuilder from image.docker.schema2.manifest import DockerSchema2ManifestBuilder from image.oci.index import OCIIndexBuilder from image.shared.types import ManifestImageLayer from test.fixtures import * from util.bytes import Bytes @pytest.fixture( params=[ OCIModel(), ] ) def registry_model(request, initialized_db): return request.param @pytest.fixture() def oci_model(initialized_db): return OCIModel() @pytest.mark.parametrize( "names, expected", [ (["unknown"], None), (["latest"], {"latest"}), (["latest", "prod"], {"latest", "prod"}), (["latest", "prod", "another"], {"latest", "prod"}), (["foo", "prod"], {"prod"}), ], ) def test_find_matching_tag(names, expected, registry_model): repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found = registry_model.find_matching_tag(repository_ref, names) if expected is None: assert found is None else: assert found.name in expected assert found.repository.name == "simple" @pytest.mark.parametrize( "repo_namespace, repo_name, expected", [ ("devtable", "simple", {"latest", "prod"}), ("buynlarge", "orgrepo", {"latest", "prod"}), ], ) def test_get_most_recent_tag(repo_namespace, repo_name, expected, registry_model): repo = model.repository.get_repository(repo_namespace, repo_name) repository_ref = RepositoryReference.for_repo_obj(repo) found = registry_model.get_most_recent_tag(repository_ref) if expected is None: assert found is None else: assert found.name in expected @pytest.mark.parametrize( "repo_namespace, repo_name, expected", [ ("devtable", "simple", True), ("buynlarge", "orgrepo", True), ("buynlarge", "unknownrepo", False), ], ) def test_lookup_repository(repo_namespace, repo_name, expected, registry_model): repo_ref = registry_model.lookup_repository(repo_namespace, repo_name) if expected: assert repo_ref else: assert repo_ref is None @pytest.mark.parametrize( "repo_namespace, repo_name, expected", [ ("devtable", "simple", True), ("buynlarge", "orgrepo", True), ("buynlarge", "unknownrepo", False), ], ) def test_lookup_repository_with_cache(repo_namespace, repo_name, expected, registry_model): model_cache = InMemoryDataModelCache(TEST_CACHE_CONFIG) model_cache.empty_for_testing() cache_key_for_repository_lookup = cache_key.for_repository_lookup( repo_namespace, repo_name, None, None, {} ) repo_ref = registry_model.lookup_repository(repo_namespace, repo_name, model_cache=model_cache) cached_result = model_cache.cache.get(cache_key_for_repository_lookup.key) if expected: res_obj = OCIModel.get_repository_response_to_object(json.loads(cached_result)) assert repo_ref._db_id is res_obj.id assert repo_ref.state == res_obj.state assert repo_ref.namespace_name == repo_namespace else: assert repo_ref is None @pytest.mark.parametrize( "repo_namespace, repo_name", [ ("devtable", "simple"), ("buynlarge", "orgrepo"), ], ) def test_lookup_manifests(repo_namespace, repo_name, registry_model): repo = model.repository.get_repository(repo_namespace, repo_name) repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.find_matching_tag(repository_ref, ["latest"]) found_manifest = registry_model.get_manifest_for_tag(found_tag) found = registry_model.lookup_manifest_by_digest(repository_ref, found_manifest.digest) assert found._db_id == found_manifest._db_id assert found.digest == found_manifest.digest schema1_parsed = registry_model.get_schema1_parsed_manifest(found, "foo", "bar", "baz", storage) assert schema1_parsed is not None def test_lookup_unknown_manifest(registry_model): repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found = registry_model.lookup_manifest_by_digest(repository_ref, "sha256:deadbeef") assert found is None def test_manifest_labels(registry_model): repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.find_matching_tag(repository_ref, ["latest"]) found_manifest = registry_model.get_manifest_for_tag(found_tag) # Create a new label. created = registry_model.create_manifest_label(found_manifest, "foo", "bar", "api") assert created.key == "foo" assert created.value == "bar" assert created.source_type_name == "api" assert created.media_type_name == "text/plain" # Ensure we can look it up. assert registry_model.get_manifest_label(found_manifest, created.uuid) == created # Ensure it is in our list of labels. assert created in registry_model.list_manifest_labels(found_manifest) assert created in registry_model.list_manifest_labels(found_manifest, key_prefix="fo") # Ensure it is *not* in our filtered list. assert created not in registry_model.list_manifest_labels(found_manifest, key_prefix="ba") # Delete the label and ensure it is gone. assert registry_model.delete_manifest_label(found_manifest, created.uuid) assert registry_model.get_manifest_label(found_manifest, created.uuid) is None assert created not in registry_model.list_manifest_labels(found_manifest) def test_manifest_label_handlers(registry_model): repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.get_repo_tag(repository_ref, "latest") found_manifest = registry_model.get_manifest_for_tag(found_tag) # Ensure the tag has no expiration. assert found_tag.lifetime_end_ts is None # Create a new label with an expires-after. registry_model.create_manifest_label(found_manifest, "quay.expires-after", "2h", "api") # Ensure the tag now has an expiration. updated_tag = registry_model.get_repo_tag(repository_ref, "latest") assert updated_tag.lifetime_end_ts == (updated_tag.lifetime_start_ts + (60 * 60 * 2)) def test_immutable_label_handler(registry_model): """Test that quay.immutable=true label sets tag as immutable.""" repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.get_repo_tag(repository_ref, "latest") found_manifest = registry_model.get_manifest_for_tag(found_tag) # Ensure tag is not immutable assert not found_tag.immutable # Create label with quay.immutable=true registry_model.create_manifest_label(found_manifest, "quay.immutable", "true", "api") # Ensure tag is now immutable updated_tag = registry_model.get_repo_tag(repository_ref, "latest") assert updated_tag.immutable @pytest.mark.parametrize( "label_value", ["True", "TRUE", " true ", " TRUE "], ) def test_immutable_label_case_insensitive(registry_model, label_value): """Test quay.immutable label works with various case formats.""" repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.get_repo_tag(repository_ref, "latest") found_manifest = registry_model.get_manifest_for_tag(found_tag) # Ensure tag is not immutable initially assert not found_tag.immutable # Create label with various case/whitespace formats registry_model.create_manifest_label(found_manifest, "quay.immutable", label_value, "api") # Ensure tag is now immutable updated_tag = registry_model.get_repo_tag(repository_ref, "latest") assert updated_tag.immutable @pytest.mark.parametrize( "label_value", ["false", "False", "FALSE", "0", "no", "", "anything", "yes"], ) def test_immutable_label_false_does_not_set(registry_model, label_value): """Test quay.immutable with non-true values does NOT set immutability.""" repo = model.repository.get_repository("devtable", "simple") repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.get_repo_tag(repository_ref, "latest") found_manifest = registry_model.get_manifest_for_tag(found_tag) # Ensure tag is not immutable initially assert not found_tag.immutable # Create label with non-true value registry_model.create_manifest_label(found_manifest, "quay.immutable", label_value, "api") # Ensure tag is still NOT immutable updated_tag = registry_model.get_repo_tag(repository_ref, "latest") assert not updated_tag.immutable def test_batch_labels(registry_model): repo = model.repository.get_repository("devtable", "history") repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = registry_model.find_matching_tag(repository_ref, ["latest"]) found_manifest = registry_model.get_manifest_for_tag(found_tag) with registry_model.batch_create_manifest_labels(found_manifest) as add_label: add_label("foo", "1", "api") add_label("bar", "2", "api") add_label("baz", "3", "api") # Ensure we can look them up. assert len(registry_model.list_manifest_labels(found_manifest)) == 3 @pytest.mark.parametrize( "repo_namespace, repo_name", [ ("devtable", "simple"), ("devtable", "complex"), ("devtable", "history"), ("buynlarge", "orgrepo"), ], ) def test_repository_tags(repo_namespace, repo_name, registry_model): repository_ref = registry_model.lookup_repository(repo_namespace, repo_name) tags = registry_model.list_all_active_repository_tags(repository_ref) assert len(tags) tags_map = registry_model.get_legacy_tags_map(repository_ref, storage) for tag in tags: found_tag = registry_model.get_repo_tag(repository_ref, tag.name) assert found_tag == tag retriever = RepositoryContentRetriever(repository_ref.id, storage) legacy_image = tag.manifest.lookup_legacy_image(0, retriever) found_image = registry_model.get_legacy_image( repository_ref, found_tag.manifest.legacy_image_root_id, storage ) if found_image is not None: assert found_image.docker_image_id == legacy_image.docker_image_id assert tags_map[tag.name] == found_image.docker_image_id @pytest.mark.parametrize( "namespace, name, expected_tag_count, has_expired", [ ("devtable", "simple", 2, False), ("devtable", "history", 2, True), ("devtable", "gargantuan", 8, False), ("public", "publicrepo", 1, False), ], ) @pytest.mark.parametrize( "with_size_fallback", [ False, True, ], ) def test_repository_tag_history( namespace, name, expected_tag_count, has_expired, registry_model, with_size_fallback ): # Pre-cache media type loads to ensure consistent query count. Manifest.media_type.get_name(1) repository_ref = registry_model.lookup_repository(namespace, name) with assert_query_count(2 if with_size_fallback else 1): # If size fallback is requested, delete the sizes on the manifest rows. if with_size_fallback: Manifest.update(layers_compressed_size=None).execute() history, has_more = registry_model.list_repository_tag_history(repository_ref) assert not has_more assert len(history) == expected_tag_count for tag in history: # Retrieve the manifest to ensure it doesn't issue extra queries. tag.manifest # Verify that looking up the size doesn't issue extra queries. tag.manifest_layers_size if has_expired: # Ensure the latest tag is marked expired, since there is an expired one. with assert_query_count(1): assert registry_model.has_expired_tag(repository_ref, "latest") def test_repository_tag_history_future_expires(registry_model): # Set the expiration of a tag to the future. repository_ref = registry_model.lookup_repository("devtable", "simple") tag = registry_model.get_repo_tag(repository_ref, "latest") registry_model.change_repository_tag_expiration(tag, datetime.utcnow() + timedelta(days=7)) # List the tag history and ensure the tag is returned with the correct expiration. history, has_more = registry_model.list_repository_tag_history(repository_ref) assert not has_more assert history for tag in history: if tag.name == "latest": assert tag.lifetime_end_ms is not None @pytest.mark.parametrize( "repositories, expected_tag_count", [ ([], 0), ([("devtable", "simple"), ("devtable", "building")], 1), ], ) def test_get_most_recent_tag_lifetime_start(repositories, expected_tag_count, registry_model): last_modified_map = registry_model.get_most_recent_tag_lifetime_start( [registry_model.lookup_repository(name, namespace) for name, namespace in repositories] ) assert len(last_modified_map) == expected_tag_count for repo_id, last_modified in list(last_modified_map.items()): tag = registry_model.get_most_recent_tag(RepositoryReference.for_id(repo_id)) assert last_modified == tag.lifetime_start_ms // 1000 @pytest.mark.parametrize( "repo_namespace, repo_name", [ ("devtable", "simple"), ("devtable", "complex"), ("devtable", "history"), ("buynlarge", "orgrepo"), ], ) @pytest.mark.parametrize( "via_manifest", [ False, True, ], ) def test_delete_tags(repo_namespace, repo_name, via_manifest, registry_model): repository_ref = registry_model.lookup_repository(repo_namespace, repo_name) tags = registry_model.list_all_active_repository_tags(repository_ref) assert len(tags) # Save history before the deletions. previous_history, _ = registry_model.list_repository_tag_history(repository_ref, size=1000) assert len(previous_history) >= len(tags) # Delete every tag in the repository. for tag in tags: if via_manifest: assert registry_model.delete_tag(model_cache, repository_ref, tag.name) else: manifest = registry_model.get_manifest_for_tag(tag) if manifest is not None: registry_model.delete_tags_for_manifest(model_cache, manifest) # Make sure the tag is no longer found. with assert_query_count(1): found_tag = registry_model.get_repo_tag(repository_ref, tag.name) assert found_tag is None # Ensure all tags have been deleted. tags = registry_model.list_all_active_repository_tags(repository_ref) assert not len(tags) # Ensure that the tags all live in history. history, _ = registry_model.list_repository_tag_history(repository_ref, size=1000) assert len(history) == len(previous_history) @pytest.mark.parametrize( "use_manifest", [ True, False, ], ) def test_retarget_tag_history(use_manifest, registry_model): repository_ref = registry_model.lookup_repository("devtable", "history") history, _ = registry_model.list_repository_tag_history(repository_ref) if use_manifest: manifest_or_legacy_image = registry_model.lookup_manifest_by_digest( repository_ref, history[0].manifest_digest, allow_dead=True ) else: manifest_or_legacy_image = registry_model.get_legacy_image( repository_ref, history[0].manifest.legacy_image_root_id, storage ) # Retarget the tag. assert manifest_or_legacy_image updated_tag = registry_model.retarget_tag( repository_ref, "latest", manifest_or_legacy_image, storage, docker_v2_signing_key, is_reversion=True, ) # Ensure the tag has changed targets. if use_manifest: assert updated_tag.manifest_digest == manifest_or_legacy_image.digest else: assert updated_tag.manifest.legacy_image_root_id == manifest_or_legacy_image.docker_image_id # Ensure history has been updated. new_history, _ = registry_model.list_repository_tag_history(repository_ref) assert len(new_history) == len(history) + 1 def test_change_repository_tag_expiration(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") tag = registry_model.get_repo_tag(repository_ref, "latest") assert tag.lifetime_end_ts is None new_datetime = datetime.utcnow() + timedelta(days=2) previous, okay = registry_model.change_repository_tag_expiration(tag, new_datetime) assert okay assert previous is None tag = registry_model.get_repo_tag(repository_ref, "latest") assert tag.lifetime_end_ts is not None @pytest.fixture() def clear_rows(initialized_db): # Remove all new-style rows so we can backfill. Tag.delete().execute() ManifestLabel.delete().execute() ManifestBlob.delete().execute() Manifest.delete().execute() @pytest.mark.parametrize( "namespace, expect_enabled", [ ("devtable", True), ("buynlarge", True), ("disabled", False), ], ) def test_is_namespace_enabled(namespace, expect_enabled, registry_model): assert registry_model.is_namespace_enabled(namespace) == expect_enabled @pytest.mark.parametrize( "repo_namespace, repo_name", [ ("devtable", "simple"), ("devtable", "complex"), ("devtable", "history"), ("buynlarge", "orgrepo"), ], ) def test_layers_and_blobs(repo_namespace, repo_name, registry_model): repository_ref = registry_model.lookup_repository(repo_namespace, repo_name) tags = registry_model.list_all_active_repository_tags(repository_ref) assert tags for tag in tags: manifest = registry_model.get_manifest_for_tag(tag) assert manifest parsed = manifest.get_parsed_manifest() assert parsed layers = registry_model.list_parsed_manifest_layers(repository_ref, parsed, storage) assert layers layers = registry_model.list_parsed_manifest_layers( repository_ref, parsed, storage, include_placements=True ) assert layers for index, manifest_layer in enumerate(layers): assert manifest_layer.blob.storage_path assert manifest_layer.blob.placements repo_blob = registry_model.get_repo_blob_by_digest( repository_ref, manifest_layer.blob.digest ) assert repo_blob.digest == manifest_layer.blob.digest assert manifest_layer.estimated_size(1) is not None assert isinstance(manifest_layer.layer_info, ManifestImageLayer) blobs = registry_model.get_manifest_local_blobs(manifest, storage, include_placements=True) assert {b.digest for b in blobs} == set(parsed.local_blob_digests) def test_manifest_remote_layers(oci_model): # Create a config blob for testing. config_json = json.dumps( { "config": {}, "rootfs": {"type": "layers", "diff_ids": []}, "history": [ { "created": "2018-04-03T18:37:09.284840891Z", "created_by": "do something", }, ], } ) app_config = {"TESTING": True} repository_ref = oci_model.lookup_repository("devtable", "simple") with upload_blob(repository_ref, storage, BlobUploadSettings(500, 500)) as upload: upload.upload_chunk(app_config, BytesIO(config_json.encode("utf-8"))) blob = upload.commit_to_blob(app_config) # Create the manifest in the repo. builder = DockerSchema2ManifestBuilder() builder.set_config_digest(blob.digest, blob.compressed_size) builder.add_layer("sha256:abcd", 1234, urls=["http://hello/world"]) manifest = builder.build() created_manifest, _ = oci_model.create_manifest_and_retarget_tag( repository_ref, manifest, "sometag", storage ) assert created_manifest layers = oci_model.list_parsed_manifest_layers( repository_ref, created_manifest.get_parsed_manifest(), storage ) assert len(layers) == 1 assert layers[0].layer_info.is_remote assert layers[0].layer_info.urls == ["http://hello/world"] assert layers[0].blob is None def test_blob_uploads(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") blob_upload = registry_model.create_blob_upload( repository_ref, str(uuid.uuid4()), "local_us", {"some": "metadata"} ) assert blob_upload assert blob_upload.storage_metadata == {"some": "metadata"} assert blob_upload.location_name == "local_us" # Ensure we can find the blob upload. assert registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) == blob_upload # Update and ensure the changes are saved. assert registry_model.update_blob_upload( blob_upload, 1, {"new": "metadata"}, 2, 3, blob_upload.sha_state, ) updated = registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) assert updated assert updated.uncompressed_byte_count == 1 assert updated.storage_metadata == {"new": "metadata"} assert updated.byte_count == 2 assert updated.chunk_count == 3 # Delete the upload. registry_model.delete_blob_upload(blob_upload) # Ensure it can no longer be found. assert not registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) def test_commit_blob_upload(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") blob_upload = registry_model.create_blob_upload( repository_ref, str(uuid.uuid4()), "local_us", {"some": "metadata"} ) # Commit the blob upload and make sure it is written as a blob. digest = "sha256:" + hashlib.sha256(b"hello").hexdigest() blob = registry_model.commit_blob_upload(blob_upload, digest, 60) assert blob.digest == digest # Ensure the upload can no longer be found. assert not registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) def test_mount_blob_into_repository(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") latest_tag = registry_model.get_repo_tag(repository_ref, "latest") manifest = registry_model.get_manifest_for_tag(latest_tag) target_repository_ref = registry_model.lookup_repository("devtable", "complex") blobs = registry_model.get_manifest_local_blobs(manifest, storage, include_placements=True) assert blobs for blob in blobs: # Ensure the blob doesn't exist under the repository. assert not registry_model.get_repo_blob_by_digest(target_repository_ref, blob.digest) # Mount the blob into the repository. assert registry_model.mount_blob_into_repository(blob, target_repository_ref, 60) # Ensure it now exists. found = registry_model.get_repo_blob_by_digest(target_repository_ref, blob.digest) assert found == blob class SomeException(Exception): pass def test_get_cached_repo_blob(registry_model): model_cache = InMemoryDataModelCache(TEST_CACHE_CONFIG) repository_ref = registry_model.lookup_repository("devtable", "simple") latest_tag = registry_model.get_repo_tag(repository_ref, "latest") manifest = registry_model.get_manifest_for_tag(latest_tag) blobs = registry_model.get_manifest_local_blobs(manifest, storage, include_placements=True) assert blobs blob = blobs[0] # Load a blob to add it to the cache. found = registry_model.get_cached_repo_blob(model_cache, "devtable", "simple", blob.digest) assert found.digest == blob.digest assert found.uuid == blob.uuid assert found.compressed_size == blob.compressed_size assert found.uncompressed_size == blob.uncompressed_size assert found.placements == blob.placements # Disconnect from the database by overwriting the connection. def fail(x, y): raise SomeException("Not connected!") with patch( "data.registry_model.registry_oci_model.model.oci.blob.get_repository_blob_by_digest", fail, ): # Make sure we can load again, which should hit the cache. cached = registry_model.get_cached_repo_blob(model_cache, "devtable", "simple", blob.digest) assert cached.digest == blob.digest assert cached.uuid == blob.uuid assert cached.compressed_size == blob.compressed_size assert cached.uncompressed_size == blob.uncompressed_size assert cached.placements == blob.placements # Try another blob, which should fail since the DB is not connected and the cache # does not contain the blob. with pytest.raises(SomeException): registry_model.get_cached_repo_blob( model_cache, "devtable", "simple", "some other digest" ) def test_create_manifest_and_retarget_tag(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") latest_tag = registry_model.get_repo_tag(repository_ref, "latest") manifest = registry_model.get_manifest_for_tag(latest_tag).get_parsed_manifest() builder = DockerSchema1ManifestBuilder("devtable", "simple", "anothertag") builder.add_layer(manifest.blob_digests[0], '{"id": "%s"}' % "someid") sample_manifest = builder.build(docker_v2_signing_key) assert sample_manifest is not None another_manifest, tag = registry_model.create_manifest_and_retarget_tag( repository_ref, sample_manifest, "anothertag", storage, verify_quota=True ) assert another_manifest is not None assert tag is not None assert tag.name == "anothertag" assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict def test_create_manifest_and_retarget_tag_with_quota(registry_model): CONFIG_LAYER_JSON = json.dumps( { "config": {}, "rootfs": {"type": "layers", "diff_ids": []}, "history": [], } ) # Create the blobs and manifest object repository_ref = registry_model.lookup_repository("devtable", "simple") builder = DockerSchema2ManifestBuilder() _, config_digest = _populate_blob_with_content( CONFIG_LAYER_JSON, "devtable", repository_ref.name ) builder.set_config_digest(config_digest, len(CONFIG_LAYER_JSON.encode("utf-8"))) blob_content = "blobcontent" _, blob_digest = _populate_blob_with_content(blob_content, "devtable", repository_ref.name) builder.add_layer(blob_digest, len(blob_content)) manifest = builder.build() assert manifest is not None # Create quota and limit user = User.select().where(User.username == "devtable").get() new_quota = namespacequota.create_namespace_quota(user, 1) namespacequota.create_namespace_quota_limit(new_quota, "reject", 100) rejected = False try: registry_model.create_manifest_and_retarget_tag( repository_ref, manifest, "newtag", storage, verify_quota=True, ) except QuotaExceededException: rejected = True assert rejected # Assert that a temporary tag outside the time machine window was created since the manifest was rejected # Get the tag that has beeen created in the last second tag = ( Tag.select() .where( Tag.lifetime_start_ms > get_epoch_timestamp_ms() - 1000, Tag.repository == repository_ref._db_id, ) .get() ) assert tag.name.startswith("$temp-") assert tag.lifetime_end_ms is not None assert tag.lifetime_end_ms < get_epoch_timestamp_ms() - user.removed_tag_expiration_s * 1000 assert tag.hidden assert not tag.reversion def test_get_schema1_parsed_manifest(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") latest_tag = registry_model.get_repo_tag(repository_ref, "latest") manifest = registry_model.get_manifest_for_tag(latest_tag) assert registry_model.get_schema1_parsed_manifest(manifest, "", "", "", storage) def test_convert_manifest(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") latest_tag = registry_model.get_repo_tag(repository_ref, "latest") manifest = registry_model.get_manifest_for_tag(latest_tag) mediatypes = DOCKER_SCHEMA1_CONTENT_TYPES assert registry_model.convert_manifest(manifest, "", "", "", mediatypes, storage) mediatypes = [] assert registry_model.convert_manifest(manifest, "", "", "", mediatypes, storage) is None def test_create_manifest_and_retarget_tag_with_labels(registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") latest_tag = registry_model.get_repo_tag(repository_ref, "latest") manifest = registry_model.get_manifest_for_tag(latest_tag).get_parsed_manifest() json_metadata = { "id": "someid", "config": { "Labels": { "quay.expires-after": "2w", }, }, } builder = DockerSchema1ManifestBuilder("devtable", "simple", "anothertag") builder.add_layer(manifest.blob_digests[0], json.dumps(json_metadata)) sample_manifest = builder.build(docker_v2_signing_key) assert sample_manifest is not None another_manifest, tag = registry_model.create_manifest_and_retarget_tag( repository_ref, sample_manifest, "anothertag", storage ) assert another_manifest is not None assert tag is not None assert tag.name == "anothertag" assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict # Ensure the labels were applied. assert tag.lifetime_end_ms is not None # Create another tag and retarget it to an existing manifest; it should have an end date. # This is from a Quay's tag api, so it will not attempt to create a manifest first. yet_another_tag = registry_model.retarget_tag( repository_ref, "yet_another_tag", another_manifest, storage, docker_v2_signing_key ) assert yet_another_tag.lifetime_end_ms is not None def test_create_manifest_and_retarget_tag_with_labels_with_existing_manifest(oci_model): # Create a config blob for testing. config_json = json.dumps( { "config": { "Labels": { "quay.expires-after": "2w", }, }, "rootfs": {"type": "layers", "diff_ids": []}, "history": [ { "created": "2018-04-03T18:37:09.284840891Z", "created_by": "do something", }, ], } ) app_config = {"TESTING": True} repository_ref = oci_model.lookup_repository("devtable", "simple") with upload_blob(repository_ref, storage, BlobUploadSettings(500, 500)) as upload: upload.upload_chunk(app_config, BytesIO(config_json.encode("utf-8"))) blob = upload.commit_to_blob(app_config) # Create the manifest in the repo. builder = DockerSchema2ManifestBuilder() builder.set_config_digest(blob.digest, blob.compressed_size) builder.add_layer("sha256:abcd", 1234, urls=["http://hello/world"]) manifest = builder.build() some_manifest, some_tag = oci_model.create_manifest_and_retarget_tag( repository_ref, manifest, "some_tag", storage ) assert some_manifest is not None assert some_tag is not None assert some_tag.lifetime_end_ms is not None # Create tag and retarget it to an existing manifest; it should have an end date. # This is from a push, so it will attempt to create a manifest first. some_other_manifest, some_other_tag = oci_model.create_manifest_and_retarget_tag( repository_ref, manifest, "some_other_tag", storage ) assert some_other_manifest is not None assert some_other_manifest == some_manifest assert some_other_tag is not None assert some_other_tag.lifetime_end_ms is not None # Create another tag and retarget it to an existing manifest; it should have an end date. # This is from a Quay's tag api, so it will not attempt to create a manifest first. yet_another_tag = oci_model.retarget_tag( repository_ref, "yet_another_tag", some_other_manifest, storage, docker_v2_signing_key ) assert yet_another_tag.lifetime_end_ms is not None def test_manifest_list_expiration_multiple_tags(oci_model): """ Test that when a manifest list is pushed with multiple tags, all tags get the expiration from child manifest labels. This verifies the fix for PROJQUAY-7245. """ repository_ref = oci_model.lookup_repository("devtable", "simple") # Create a config blob with expiry label for child manifest config_json = json.dumps( { "config": { "Labels": { "quay.expires-after": "1w", }, }, "rootfs": {"type": "layers", "diff_ids": []}, "history": [ { "created": "2018-04-03T18:37:09.284840891Z", "created_by": "do something", }, ], } ) app_config = {"TESTING": True} with upload_blob(repository_ref, storage, BlobUploadSettings(500, 500)) as upload: upload.upload_chunk(app_config, BytesIO(config_json.encode("utf-8"))) config_blob = upload.commit_to_blob(app_config) # Create a child manifest with the expiry label child_builder = DockerSchema2ManifestBuilder() child_builder.set_config_digest(config_blob.digest, config_blob.compressed_size) # Use a valid SHA256 digest (64 hex chars) layer_digest = "sha256:" + hashlib.sha256(b"child_layer_content").hexdigest() child_builder.add_layer(layer_digest, 1234, urls=["http://example/layer"]) child_manifest = child_builder.build() # Create the child manifest in the registry (hidden, no tag) child_created, _ = oci_model.create_manifest_and_retarget_tag( repository_ref, child_manifest, "temp_child_tag", storage ) assert child_created is not None # Build a manifest list referencing the child manifest list_builder = DockerSchema2ManifestListBuilder() list_builder.add_manifest(child_manifest, "amd64", "linux") manifest_list = list_builder.build() # Push the manifest list with the first tag manifest1, tag1 = oci_model.create_manifest_and_retarget_tag( repository_ref, manifest_list, "multi-arch-v1", storage ) assert manifest1 is not None assert tag1 is not None assert tag1.lifetime_end_ms is not None, "First tag should have expiration set" # Push the same manifest list with a second tag manifest2, tag2 = oci_model.create_manifest_and_retarget_tag( repository_ref, manifest_list, "multi-arch-v2", storage ) assert manifest2 is not None assert manifest2.digest == manifest1.digest, "Should be the same manifest" assert tag2 is not None assert tag2.lifetime_end_ms is not None, "Second tag should also have expiration set" # Also test retarget_tag (used by Quay's tag API) tag3 = oci_model.retarget_tag( repository_ref, "multi-arch-v3", manifest1, storage, docker_v2_signing_key ) assert tag3 is not None assert tag3.lifetime_end_ms is not None, "Third tag via retarget_tag should have expiration" def _populate_blob(digest): location = ImageStorageLocation.get(name="local_us") store_blob_record_and_temp_link("devtable", "simple", digest, location, 1, 120) def _populate_blob_with_content(content, org_name, repository_name): content = Bytes.for_string_or_unicode(content).as_encoded_str() digest = str(sha256_digest(content)) location = ImageStorageLocation.get(name="local_us") blob = store_blob_record_and_temp_link( org_name, repository_name, digest, location, len(content), 120 ) storage.put_content(["local_us"], get_layer_path(blob), content) return blob, digest def test_known_issue_schema1(registry_model): test_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(test_dir, "../../../image/docker/test/validate_manifest_known_issue.json") with open(path, "r") as f: manifest_bytes = f.read() manifest = DockerSchema1Manifest(Bytes.for_string_or_unicode(manifest_bytes)) for blob_digest in manifest.local_blob_digests: _populate_blob(blob_digest) digest = manifest.digest assert digest == "sha256:44518f5a4d1cb5b7a6347763116fb6e10f6a8563b6c40bb389a0a982f0a9f47a" # Create the manifest in the database. repository_ref = registry_model.lookup_repository("devtable", "simple") created_manifest, _ = registry_model.create_manifest_and_retarget_tag( repository_ref, manifest, "latest", storage ) assert created_manifest assert created_manifest.digest == manifest.digest assert ( created_manifest.internal_manifest_bytes.as_encoded_str() == manifest.bytes.as_encoded_str() ) # Look it up again and validate. found = registry_model.lookup_manifest_by_digest( repository_ref, manifest.digest, allow_dead=True ) assert found assert found.digest == digest assert found.internal_manifest_bytes.as_encoded_str() == manifest.bytes.as_encoded_str() assert found.get_parsed_manifest().digest == digest def test_unicode_emoji(registry_model): builder = DockerSchema1ManifestBuilder("devtable", "simple", "latest") builder.add_layer( "sha256:abcde", json.dumps( { "id": "someid", "author": "😱", }, ensure_ascii=False, ), ) manifest = builder.build(ensure_ascii=False) manifest._validate() for blob_digest in manifest.local_blob_digests: _populate_blob(blob_digest) # Create the manifest in the database. repository_ref = registry_model.lookup_repository("devtable", "simple") created_manifest, _ = registry_model.create_manifest_and_retarget_tag( repository_ref, manifest, "latest", storage ) assert created_manifest assert created_manifest.digest == manifest.digest assert ( created_manifest.internal_manifest_bytes.as_encoded_str() == manifest.bytes.as_encoded_str() ) # Look it up again and validate. found = registry_model.lookup_manifest_by_digest( repository_ref, manifest.digest, allow_dead=True ) assert found assert found.digest == manifest.digest assert found.internal_manifest_bytes.as_encoded_str() == manifest.bytes.as_encoded_str() assert found.get_parsed_manifest().digest == manifest.digest @pytest.mark.parametrize( "test_cached", [ False, True, ], ) def test_lookup_active_repository_tags(test_cached, oci_model): repository_ref = oci_model.lookup_repository("devtable", "simple") latest_tag = oci_model.get_repo_tag(repository_ref, "latest") manifest = oci_model.get_manifest_for_tag(latest_tag) existing_tags = oci_model.list_all_active_repository_tags(repository_ref) tag_count = 500 # Create a bunch of tags. tags_expected = set() for index in range(0, tag_count): tags_expected.add("somenewtag%s" % index) oci_model.retarget_tag( repository_ref, "somenewtag%s" % index, manifest, storage, docker_v2_signing_key ) for tag in existing_tags: tags_expected.add(tag.name) tags_expected = sorted(tags_expected) assert tags_expected tags_to_process = tags_expected.copy() # List the tags. tag_limit = 11 tags_found = set() tag_name = None while True: if test_cached: model_cache = InMemoryDataModelCache(TEST_CACHE_CONFIG) tags, has_more = oci_model.lookup_cached_active_repository_tags( model_cache, repository_ref, tag_name, tag_limit ) else: tags, has_more = oci_model.lookup_active_repository_tags( repository_ref, tag_name, tag_limit ) assert len(tags) <= tag_limit for tag in tags: # ensure last tag name is not part of result set if tag_name is not None: assert tag_name is not tag.name # make sure we don't have duplicates between result pages assert tag.name not in tags_found # make sure the tags are in lexical order assert tag.name == tags_to_process.pop(0) tags_found.add(tag.name) if not has_more: break tag_name = tags[-1].name # Make sure we've found all the tags. assert set(tags_found) == set(tags_expected) assert not tags_to_process def test_create_manifest_with_temp_tag(initialized_db, registry_model): builder = DockerSchema1ManifestBuilder("devtable", "simple", "latest") builder.add_layer( "sha256:abcde", json.dumps( { "id": "someid", "author": "some user", }, ensure_ascii=False, ), ) manifest = builder.build(ensure_ascii=False) for blob_digest in manifest.local_blob_digests: _populate_blob(blob_digest) # Create the manifest in the database. repository_ref = registry_model.lookup_repository("devtable", "simple") created = registry_model.create_manifest_with_temp_tag(repository_ref, manifest, 300, storage) assert created.digest == manifest.digest # Ensure it cannot be found normally, since it is simply temp-tagged. assert registry_model.lookup_manifest_by_digest(repository_ref, manifest.digest) is None # Ensure it can be found, which means it is temp-tagged. found = registry_model.lookup_manifest_by_digest( repository_ref, manifest.digest, allow_dead=True ) assert found is not None def test_find_manifests_for_sec_notification(initialized_db, registry_model): # First try for manifests inside a repository without any events, which should not # return any results. repository_ref = registry_model.lookup_repository("devtable", "simple") found_manifest = False for tag in registry_model.list_all_active_repository_tags(repository_ref): manifest = registry_model.get_manifest_for_tag(tag) found_manifest = True assert len(list(registry_model.find_manifests_for_sec_notification(manifest.digest))) == 0 assert found_manifest # Add a security notification to the repository. model.notification.create_repo_notification( repository_ref.id, "vulnerability_found", "webhook", {}, { "vulnerability": { "priority": "Critical", }, }, ) # Now ensure the manifests are found. for tag in registry_model.list_all_active_repository_tags(repository_ref): manifest = registry_model.get_manifest_for_tag(tag) assert len(list(registry_model.find_manifests_for_sec_notification(manifest.digest))) > 0 def test_lookup_secscan_notification_severities(initialized_db, registry_model): repository_ref = registry_model.lookup_repository("devtable", "simple") assert len(list(registry_model.lookup_secscan_notification_severities(repository_ref))) == 0 # Add some vuln events. model.notification.create_repo_notification( repository_ref.id, "vulnerability_found", "webhook", {}, { "vulnerability": { "priority": "Critical", }, }, ) model.notification.create_repo_notification( repository_ref.id, "vulnerability_found", "webhook", {}, { "vulnerability": { "priority": "Low", }, }, ) assert set(registry_model.lookup_secscan_notification_severities(repository_ref)) == { "Low", "Critical", } def test_tag_names_for_manifest(initialized_db, registry_model): verified_tag = False for repository in Repository.select(): repo_ref = RepositoryReference.for_repo_obj(repository) for tag in registry_model.list_all_active_repository_tags(repo_ref): manifest = registry_model.get_manifest_for_tag(tag) tag_names = set(registry_model.tag_names_for_manifest(manifest, 1000)) assert tag.name in tag_names verified_tag = True for found_name in tag_names: found_tag = registry_model.get_repo_tag(repo_ref, found_name) assert registry_model.get_manifest_for_tag(found_tag) == manifest assert verified_tag