mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
783 lines
32 KiB
Python
783 lines
32 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Callable
|
|
|
|
from peewee import Select, fn
|
|
|
|
import features
|
|
from app import app, proxy_cache_blob_queue, storage
|
|
from data.database import ImageStorage, ImageStoragePlacement
|
|
from data.database import Manifest as ManifestTable
|
|
from data.database import ManifestBlob, ManifestChild
|
|
from data.database import Tag as TagTable
|
|
from data.database import (
|
|
db,
|
|
db_disallow_replica_use,
|
|
db_transaction,
|
|
get_epoch_timestamp_ms,
|
|
)
|
|
from data.model import (
|
|
ManifestDoesNotExist,
|
|
QuotaExceededException,
|
|
RepositoryDoesNotExist,
|
|
TagDoesNotExist,
|
|
namespacequota,
|
|
oci,
|
|
repository,
|
|
)
|
|
from data.model.oci.manifest import is_child_manifest
|
|
from data.model.proxy_cache import get_proxy_cache_config_for_org
|
|
from data.model.quota import (
|
|
QuotaOperation,
|
|
get_namespace_id_from_repository,
|
|
is_blob_alive,
|
|
update_quota,
|
|
)
|
|
from data.model.repository import create_repository, get_repository
|
|
from data.registry_model.blobuploader import (
|
|
BlobDigestMismatchException,
|
|
BlobRangeMismatchException,
|
|
BlobTooLargeException,
|
|
BlobUploadException,
|
|
BlobUploadSettings,
|
|
complete_when_uploaded,
|
|
create_blob_upload,
|
|
)
|
|
from data.registry_model.datatypes import Manifest, RepositoryReference, Tag
|
|
from data.registry_model.registry_oci_model import OCIModel
|
|
from image.docker.schema1 import (
|
|
DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE,
|
|
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE,
|
|
DockerSchema1Manifest,
|
|
)
|
|
from image.docker.schema2 import (
|
|
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
|
|
DOCKER_SCHEMA2_MANIFESTLIST_CONTENT_TYPE,
|
|
)
|
|
from image.oci import OCI_IMAGE_INDEX_CONTENT_TYPE, OCI_IMAGE_MANIFEST_CONTENT_TYPE
|
|
from image.shared import ManifestException
|
|
from image.shared.interfaces import ManifestInterface
|
|
from image.shared.schemas import parse_manifest_from_bytes
|
|
from proxy import Proxy, UpstreamRegistryError
|
|
from util.bytes import Bytes
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ACCEPTED_MEDIA_TYPES = [
|
|
OCI_IMAGE_MANIFEST_CONTENT_TYPE,
|
|
OCI_IMAGE_INDEX_CONTENT_TYPE,
|
|
DOCKER_SCHEMA2_MANIFESTLIST_CONTENT_TYPE,
|
|
DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE,
|
|
DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE,
|
|
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE,
|
|
]
|
|
|
|
|
|
class ProxyModel(OCIModel):
|
|
def __init__(self, namespace_name, repo_name, user):
|
|
super().__init__()
|
|
self._config = get_proxy_cache_config_for_org(namespace_name)
|
|
self._user = user
|
|
self._namespace_name = namespace_name
|
|
|
|
# when Quay is set up to proxy a whole upstream registry, the
|
|
# upstream_registry_namespace for the proxy cache config will be empty.
|
|
# the given repo then is expected to include both, the upstream namespace
|
|
# and repo. Quay will treat it as a nested repo.
|
|
target_ns = self._config.upstream_registry_namespace
|
|
if target_ns != "" and target_ns is not None:
|
|
repo_name = f"{target_ns}/{repo_name}"
|
|
|
|
self._proxy = Proxy(self._config, repo_name)
|
|
|
|
def lookup_repository(
|
|
self,
|
|
namespace_name,
|
|
repo_name,
|
|
kind_filter=None,
|
|
raise_on_error=False,
|
|
manifest_ref=None,
|
|
model_cache=None,
|
|
):
|
|
"""
|
|
Looks up and returns a reference to the repository with the given namespace and name, or
|
|
None if none.
|
|
|
|
If the repository does not exist and the given manifest_ref exists upstream,
|
|
creates the repository.
|
|
"""
|
|
repo = get_repository(namespace_name, repo_name)
|
|
exists = repo is not None
|
|
if exists:
|
|
return RepositoryReference.for_repo_obj(
|
|
repo,
|
|
namespace_name,
|
|
repo_name,
|
|
repo.namespace_user.stripe_id is None if repo else None,
|
|
state=repo.state if repo is not None else None,
|
|
)
|
|
|
|
# we only create a repository for images that exist upstream, and if
|
|
# we're not given a manifest reference then we can't check whether the
|
|
# image exists upstream or not, so we refuse to create the repo.
|
|
if manifest_ref is None:
|
|
return None
|
|
|
|
try:
|
|
self._proxy.manifest_exists(manifest_ref, ACCEPTED_MEDIA_TYPES)
|
|
except UpstreamRegistryError as e:
|
|
if raise_on_error:
|
|
raise RepositoryDoesNotExist(str(e))
|
|
return None
|
|
|
|
visibility = "private" if app.config.get("CREATE_PRIVATE_REPO_ON_PUSH", True) else "public"
|
|
|
|
repo = create_repository(
|
|
namespace_name, repo_name, self._user, visibility=visibility, proxy_cache=True
|
|
)
|
|
return RepositoryReference.for_repo_obj(
|
|
repo,
|
|
namespace_name,
|
|
repo_name,
|
|
repo.namespace_user.stripe_id is None if repo else None,
|
|
state=repo.state if repo is not None else None,
|
|
)
|
|
|
|
def _check_image_upload_possible_or_prune(
|
|
self, repo_ref: RepositoryReference, upstream_manifest: ManifestInterface
|
|
) -> None:
|
|
"""
|
|
Checks whether the given image fits within the quota size for the namespace
|
|
the repository is part of. If it doesn't, it prunes older tags in the namespace
|
|
by marking them expired which is eventually garbage collected by the gc worker.
|
|
|
|
Raises QuotaExceededException if the given tag is larger than the max quota
|
|
allotted for the namespace or if there are not enough tags to prune to free up space.
|
|
"""
|
|
if upstream_manifest.is_manifest_list:
|
|
return
|
|
|
|
curr_ns_size = namespacequota.get_namespace_size(repo_ref.namespace_name)
|
|
|
|
# if quota limit is not set for the namespace, skip auto pruning of images
|
|
quotas = namespacequota.get_namespace_quota_list(repo_ref.namespace_name)
|
|
if quotas:
|
|
# currently only one quota per namespace is supported
|
|
ns_quota_limit = quotas[0].limit_bytes
|
|
else:
|
|
logger.info("No quota configured")
|
|
return
|
|
|
|
image_size = upstream_manifest.layers_compressed_size
|
|
if image_size > ns_quota_limit:
|
|
raise QuotaExceededException
|
|
|
|
if (curr_ns_size + image_size) <= ns_quota_limit:
|
|
return
|
|
|
|
reclaimable_size = 0
|
|
namespace_id = get_namespace_id_from_repository(repo_ref.id)
|
|
tags = oci.tag.get_tag_with_least_lifetime_end_for_ns(repo_ref.namespace_name)
|
|
if tags is not None:
|
|
for tag in tags:
|
|
is_manifest_list = (
|
|
ManifestChild.select(1).where(ManifestChild.manifest == tag.manifest).exists()
|
|
)
|
|
|
|
# Get all the blobs under this manifest. If a manifest list get all the blobs
|
|
# under the child manifests as well
|
|
blobs = None
|
|
if is_manifest_list:
|
|
blobs = (
|
|
ImageStorage.select(ImageStorage.id, ImageStorage.image_size)
|
|
.join(ManifestBlob, on=(ManifestBlob.blob == ImageStorage.id))
|
|
.join(
|
|
ManifestChild,
|
|
on=(ManifestChild.child_manifest == ManifestBlob.manifest),
|
|
)
|
|
.where(ManifestChild.manifest == tag.manifest)
|
|
)
|
|
else:
|
|
blobs = (
|
|
ImageStorage.select(ImageStorage.id, ImageStorage.image_size)
|
|
.join(ManifestBlob, on=(ManifestBlob.blob == ImageStorage.id))
|
|
.where(ManifestBlob.manifest == tag.manifest)
|
|
)
|
|
|
|
# We remove duplicates within the loop to prevent using "distinct" in the above query.
|
|
# If the blob is not being referenced by an alive tag we'll get that size back
|
|
# when it's GC'd, so add it to the reclaimable total.
|
|
seen_blobs = []
|
|
for blob in blobs:
|
|
if blob.id not in seen_blobs and not is_blob_alive(
|
|
namespace_id, tag.id, blob.id
|
|
):
|
|
size = blob.image_size if blob.image_size is not None else 0
|
|
reclaimable_size = reclaimable_size + size
|
|
seen_blobs.append(blob.id)
|
|
|
|
updated = oci.tag.remove_tag_from_timemachine(
|
|
tag.repository_id,
|
|
tag.name,
|
|
tag.manifest,
|
|
include_submanifests=is_manifest_list,
|
|
is_alive=True,
|
|
)
|
|
|
|
# If we get enough size back from deleting this tag, exit
|
|
if updated and reclaimable_size > image_size:
|
|
return
|
|
|
|
# if we got here, then there aren't enough tags in the namespace to expire, so we raise an exception
|
|
raise QuotaExceededException
|
|
|
|
def lookup_manifest_by_digest(
|
|
self,
|
|
repository_ref,
|
|
manifest_digest,
|
|
allow_dead=False,
|
|
allow_hidden=False,
|
|
require_available=False,
|
|
raise_on_error=True,
|
|
):
|
|
"""
|
|
Looks up the manifest with the given digest under the given repository and returns it or
|
|
None if none.
|
|
|
|
If a manifest with the digest does not exist, fetches the manifest upstream
|
|
and creates it with a temp tag.
|
|
|
|
Raises QuotaExceededException if the given tag is larger than the max quota
|
|
allotted for the namespace or if there are not enough tags to prune.
|
|
"""
|
|
|
|
wrapped_manifest = super().lookup_manifest_by_digest(
|
|
repository_ref, manifest_digest, allow_dead=True, require_available=False
|
|
)
|
|
if wrapped_manifest is None:
|
|
try:
|
|
wrapped_manifest, _ = self._create_and_tag_manifest(
|
|
repository_ref, manifest_digest, self._create_manifest_with_temp_tag
|
|
)
|
|
except (UpstreamRegistryError, ManifestDoesNotExist) as e:
|
|
raise ManifestDoesNotExist(str(e))
|
|
return wrapped_manifest
|
|
|
|
db_tag = oci.tag.get_tag_by_manifest_id(repository_ref.id, wrapped_manifest.id)
|
|
if db_tag is None:
|
|
oci.manifest.lookup_manifest(
|
|
repository_ref.id, manifest_digest, allow_dead=True, require_available=True
|
|
)
|
|
db_tag = oci.tag.get_tag_by_manifest_id(repository_ref.id, wrapped_manifest.id)
|
|
existing_tag = Tag.for_tag(
|
|
db_tag, self._legacy_image_id_handler, manifest_row=db_tag.manifest
|
|
)
|
|
new_tag = False
|
|
try:
|
|
tag, new_tag = self._update_manifest_for_tag(
|
|
repository_ref,
|
|
existing_tag,
|
|
existing_tag.manifest,
|
|
manifest_digest,
|
|
self._create_manifest_with_temp_tag,
|
|
)
|
|
except ManifestDoesNotExist:
|
|
raise
|
|
except UpstreamRegistryError:
|
|
# when the upstream fetch fails, we only return the tag if
|
|
# it isn't yet expired. note that we don't bump the tag's
|
|
# expiration here either - we only do this when we can ensure
|
|
# the tag exists upstream.
|
|
isplaceholder = wrapped_manifest.internal_manifest_bytes.as_unicode() == ""
|
|
return wrapped_manifest if not existing_tag.expired and not isplaceholder else None
|
|
|
|
if tag.expired or not new_tag:
|
|
with db_disallow_replica_use():
|
|
new_expiration = (
|
|
get_epoch_timestamp_ms() + self._config.expiration_s * 1000
|
|
if self._config.expiration_s
|
|
else None
|
|
)
|
|
oci.tag.set_tag_end_ms(db_tag, new_expiration)
|
|
# if the manifest is a child of a manifest list in this repo, renew
|
|
# the parent(s) manifest list tag too.
|
|
# select tag ids by most recent lifetime_end_ms uniquely by name,
|
|
# based on the link between sub-manifest and parent manifest in the
|
|
# manifest link.
|
|
q = (
|
|
TagTable.select(fn.MAX(TagTable.id).alias("id"))
|
|
.join(ManifestChild, on=(TagTable.manifest_id == ManifestChild.manifest_id))
|
|
.where(
|
|
(ManifestChild.repository_id == repository_ref.id)
|
|
& (ManifestChild.child_manifest_id == wrapped_manifest.id)
|
|
)
|
|
.group_by(TagTable.name)
|
|
)
|
|
tag_ids = [item for item in q]
|
|
TagTable.update(lifetime_end_ms=new_expiration).where(
|
|
TagTable.id.in_(tag_ids)
|
|
).execute()
|
|
|
|
return super().lookup_manifest_by_digest(
|
|
repository_ref,
|
|
manifest_digest,
|
|
allow_dead=True,
|
|
allow_hidden=True,
|
|
require_available=False,
|
|
raise_on_error=True,
|
|
)
|
|
|
|
def get_repo_tag(self, repository_ref, tag_name, raise_on_error=True):
|
|
"""
|
|
Returns the latest, *active* tag found in the repository, with the matching
|
|
name or None if none.
|
|
|
|
If both manifest and tag don't exist, fetches the manifest with the tag
|
|
from upstream, and creates them both.
|
|
If tag and manifest exists and the manifest is a placeholder, pull the
|
|
upstream manifest and save it locally.
|
|
|
|
Raises QuotaExceededException if the given tag is larger than the max quota
|
|
allotted for the namespace or if there are not enough tags to prune.
|
|
"""
|
|
db_tag = oci.tag.get_current_tag(repository_ref.id, tag_name)
|
|
existing_tag = Tag.for_tag(db_tag, self._legacy_image_id_handler)
|
|
if existing_tag is None:
|
|
try:
|
|
_, tag = self._create_and_tag_manifest(
|
|
repository_ref, tag_name, self._create_manifest_and_retarget_tag
|
|
)
|
|
except (UpstreamRegistryError, ManifestDoesNotExist) as e:
|
|
raise TagDoesNotExist(str(e))
|
|
return tag
|
|
|
|
new_tag = False
|
|
try:
|
|
tag, new_tag = self._update_manifest_for_tag(
|
|
repository_ref,
|
|
existing_tag,
|
|
existing_tag.manifest,
|
|
tag_name,
|
|
self._create_manifest_and_retarget_tag,
|
|
)
|
|
except ManifestDoesNotExist as e:
|
|
raise TagDoesNotExist(str(e))
|
|
except UpstreamRegistryError:
|
|
# when the upstream fetch fails, we only return the tag if
|
|
# it isn't yet expired. note that we don't bump the tag's
|
|
# expiration here either - we only do this when we can ensure
|
|
# the tag exists upstream.
|
|
isplaceholder = existing_tag.manifest.internal_manifest_bytes.as_unicode() == ""
|
|
return existing_tag if not existing_tag.expired and not isplaceholder else None
|
|
|
|
# always bump tag expiration when retrieving tags that both are cached
|
|
# and exist upstream, as a means to auto-renew the cache.
|
|
if tag.expired or not new_tag:
|
|
with db_disallow_replica_use():
|
|
new_expiration = (
|
|
get_epoch_timestamp_ms() + self._config.expiration_s * 1000
|
|
if self._config.expiration_s
|
|
else None
|
|
)
|
|
oci.tag.set_tag_end_ms(db_tag, new_expiration)
|
|
return super().get_repo_tag(repository_ref, tag_name, raise_on_error=True)
|
|
|
|
return tag
|
|
|
|
def _create_and_tag_manifest(
|
|
self,
|
|
repo_ref: RepositoryReference,
|
|
manifest_ref: str,
|
|
create_manifest_fn: Callable[
|
|
[RepositoryReference, ManifestInterface, str | None], tuple[Manifest | None, Tag | None]
|
|
],
|
|
) -> tuple[Manifest | None, Tag | None]:
|
|
"""
|
|
Returns the newly created manifest and tag.
|
|
|
|
Raises a UpstreamRegistryError exception when the upstream registry
|
|
returns anything other than a 200 status code.
|
|
Raises a ManifestDoesNotExist when the manifest pull from upstream errors,
|
|
or the retrieved manifest is invalid (for docker manifest schema v1).
|
|
"""
|
|
self._proxy.manifest_exists(manifest_ref, ACCEPTED_MEDIA_TYPES)
|
|
upstream_manifest = self._pull_upstream_manifest(repo_ref.name, manifest_ref)
|
|
manifest, tag = create_manifest_fn(repo_ref, upstream_manifest, manifest_ref)
|
|
return manifest, tag
|
|
|
|
def _update_manifest_for_tag(
|
|
self,
|
|
repo_ref: RepositoryReference,
|
|
tag: Tag,
|
|
manifest: Manifest,
|
|
manifest_ref: str,
|
|
create_manifest_fn,
|
|
) -> tuple[Tag, bool]:
|
|
"""
|
|
Updates a placeholder manifest with the given tag name.
|
|
|
|
If the manifest is stale, downloads it from the upstream registry
|
|
and creates a new manifest and retargets the tag.
|
|
|
|
A manifest is considered stale when the manifest's digest changed in
|
|
the upstream registry.
|
|
A manifest is considered a placeholder when its db entry exists, but
|
|
its manifest_bytes field is empty.
|
|
|
|
Raises UpstreamRegistryError if the upstream registry returns anything
|
|
other than 200.
|
|
Raises ManifestDoesNotExist if the given manifest was not found in the
|
|
database.
|
|
|
|
Returns a new tag if one was created, or the existing one with a manifest
|
|
freshly out of the database, and a boolean indicating whether the returned
|
|
tag was newly created or not.
|
|
"""
|
|
upstream_manifest = None
|
|
upstream_digest = self._proxy.manifest_exists(manifest_ref, ACCEPTED_MEDIA_TYPES)
|
|
|
|
# manifest_exists will return an empty/None digest when the upstream
|
|
# registry omits the docker-content-digest header.
|
|
if not upstream_digest:
|
|
upstream_manifest = self._pull_upstream_manifest(repo_ref.name, manifest_ref)
|
|
upstream_digest = upstream_manifest.digest
|
|
|
|
logger.debug(f"Found upstream manifest with digest {upstream_digest}, {manifest_ref=}")
|
|
up_to_date = manifest.digest == upstream_digest
|
|
|
|
placeholder = manifest.internal_manifest_bytes.as_unicode() == ""
|
|
if up_to_date and not placeholder:
|
|
if tag.expired:
|
|
if upstream_manifest is None:
|
|
upstream_manifest = self._pull_upstream_manifest(repo_ref.name, manifest_ref)
|
|
self._check_image_upload_possible_or_prune(repo_ref, upstream_manifest)
|
|
return tag, False
|
|
|
|
if upstream_manifest is None:
|
|
upstream_manifest = self._pull_upstream_manifest(repo_ref.name, manifest_ref)
|
|
|
|
if up_to_date and placeholder:
|
|
self._check_image_upload_possible_or_prune(repo_ref, upstream_manifest)
|
|
with db_disallow_replica_use():
|
|
with db_transaction():
|
|
q = ManifestTable.update(
|
|
manifest_bytes=upstream_manifest.bytes.as_unicode(),
|
|
layers_compressed_size=upstream_manifest.layers_compressed_size,
|
|
).where(ManifestTable.id == manifest.id)
|
|
q.execute()
|
|
self._create_placeholder_blobs(upstream_manifest, manifest.id, repo_ref.id)
|
|
db_tag = oci.tag.get_tag_by_manifest_id(repo_ref.id, manifest.id)
|
|
return Tag.for_tag(db_tag, self._legacy_image_id_handler), False
|
|
|
|
# if we got here, the manifest is stale, so we both create a new manifest
|
|
# entry in the db, and retarget the tag.
|
|
_, tag = create_manifest_fn(repo_ref, upstream_manifest, manifest_ref)
|
|
return tag, True
|
|
|
|
def _create_manifest_and_retarget_tag(
|
|
self, repository_ref: RepositoryReference, manifest: ManifestInterface, tag_name: str
|
|
) -> tuple[Manifest | None, Tag | None]:
|
|
"""
|
|
Creates a manifest in the given repository.
|
|
|
|
Also checks whether the given image size is within the quota limit
|
|
of the namespace the repository is part of. If not, it prunes older tags.
|
|
Raises QuotaExceededException if there are not enough tags to prune.
|
|
|
|
Also creates placeholders for the objects referenced by the manifest.
|
|
For manifest lists, creates placeholder sub-manifests. For regular
|
|
manifests, creates placeholder blobs.
|
|
|
|
Placeholder objects will be "filled" with the objects' contents on
|
|
upcoming client requests, as part of the flow described in the OCI
|
|
distribution specification.
|
|
|
|
Returns a reference to the (created manifest, tag) or (None, None) on error.
|
|
"""
|
|
self._check_image_upload_possible_or_prune(repository_ref, manifest)
|
|
|
|
with db_disallow_replica_use():
|
|
with db_transaction():
|
|
db_manifest = oci.manifest.lookup_manifest(
|
|
repository_ref.id, manifest.digest, allow_dead=True
|
|
)
|
|
if db_manifest is None:
|
|
db_manifest = oci.manifest.create_manifest(
|
|
repository_ref.id, manifest, raise_on_error=True
|
|
)
|
|
if db_manifest is None:
|
|
return None, None
|
|
|
|
# 0 means a tag never expires - if we get 0 as expiration,
|
|
# we set the tag expiration to None.
|
|
expiration = self._config.expiration_s or None
|
|
tag = oci.tag.retarget_tag(
|
|
tag_name,
|
|
db_manifest,
|
|
raise_on_error=True,
|
|
expiration_seconds=expiration,
|
|
)
|
|
if tag is None:
|
|
return None, None
|
|
|
|
wrapped_manifest = Manifest.for_manifest(db_manifest, self._legacy_image_id_handler)
|
|
wrapped_tag = Tag.for_tag(
|
|
tag, self._legacy_image_id_handler, manifest_row=db_manifest
|
|
)
|
|
|
|
if not manifest.is_manifest_list:
|
|
self._create_placeholder_blobs(manifest, db_manifest.id, repository_ref.id)
|
|
return wrapped_manifest, wrapped_tag
|
|
|
|
manifests_to_connect = []
|
|
for child in manifest.child_manifests(content_retriever=None):
|
|
m = oci.manifest.lookup_manifest(
|
|
repository_ref.id, child.digest, allow_dead=True
|
|
)
|
|
if m is None:
|
|
m = oci.manifest.create_manifest(repository_ref.id, child)
|
|
oci.tag.create_temporary_tag_if_necessary(
|
|
m, self._config.expiration_s or None
|
|
)
|
|
try:
|
|
ManifestChild.get(manifest=db_manifest.id, child_manifest=m.id)
|
|
except ManifestChild.DoesNotExist:
|
|
manifests_to_connect.append(m)
|
|
|
|
oci.manifest.connect_manifests(manifests_to_connect, db_manifest, repository_ref.id)
|
|
|
|
return wrapped_manifest, wrapped_tag
|
|
|
|
def _create_manifest_with_temp_tag(
|
|
self,
|
|
repository_ref: RepositoryReference,
|
|
manifest: ManifestInterface,
|
|
manifest_ref: str | None = None,
|
|
) -> tuple[Manifest | None, Tag | None]:
|
|
"""
|
|
Creates a manifest in the given repository. Also creates placeholders for the
|
|
objects referenced by the manifest. For manifest lists, it creates
|
|
sub manifests entries attached to the manifest list along with a temporary tag.
|
|
|
|
Also checks whether the given image size is within the quota limit
|
|
of the namespace the repository is part of. If not, it prunes older tags.
|
|
Raises QuotaExceededException if there are not enough tags to prune.
|
|
"""
|
|
self._check_image_upload_possible_or_prune(repository_ref, manifest)
|
|
|
|
with db_disallow_replica_use():
|
|
with db_transaction():
|
|
db_manifest = oci.manifest.create_manifest(repository_ref.id, manifest)
|
|
expiration = self._config.expiration_s or None
|
|
tag = Tag.for_tag(
|
|
oci.tag.create_temporary_tag_if_necessary(db_manifest, expiration),
|
|
self._legacy_image_id_handler,
|
|
)
|
|
wrapped_manifest = Manifest.for_manifest(db_manifest, self._legacy_image_id_handler)
|
|
|
|
if not manifest.is_manifest_list:
|
|
self._create_placeholder_blobs(manifest, db_manifest.id, repository_ref.id)
|
|
return wrapped_manifest, tag
|
|
|
|
manifests_to_connect = []
|
|
for child in manifest.child_manifests(content_retriever=None):
|
|
m = oci.manifest.lookup_manifest(
|
|
repository_ref.id, child.digest, allow_hidden=True, allow_dead=True
|
|
)
|
|
if m is None:
|
|
m = oci.manifest.create_manifest(repository_ref.id, child)
|
|
manifests_to_connect.append(m)
|
|
|
|
oci.manifest.connect_manifests(manifests_to_connect, db_manifest, repository_ref.id)
|
|
for db_manifest in manifests_to_connect:
|
|
oci.tag.create_temporary_tag_if_necessary(db_manifest, expiration)
|
|
|
|
return wrapped_manifest, tag
|
|
|
|
def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False):
|
|
"""
|
|
Returns the blob in the repository with the given digest.
|
|
|
|
If the blob is a placeholder, downloads it from the upstream registry.
|
|
Placeholder blobs are blobs that don't yet have a ImageStoragePlacement
|
|
associated with it.
|
|
|
|
Note that there may be multiple records in the same repository for the same blob digest, so
|
|
the return value of this function may change.
|
|
"""
|
|
blob = self._get_shared_storage(blob_digest)
|
|
if blob is None:
|
|
try:
|
|
blob = (
|
|
ImageStorage.select()
|
|
.join(ManifestBlob)
|
|
.where(
|
|
ManifestBlob.repository_id == repository_ref.id,
|
|
ImageStorage.content_checksum == blob_digest,
|
|
)
|
|
.get()
|
|
)
|
|
except ImageStorage.DoesNotExist:
|
|
return None
|
|
|
|
try:
|
|
ImageStoragePlacement.select().where(ImageStoragePlacement.storage == blob).get()
|
|
except ImageStoragePlacement.DoesNotExist:
|
|
try:
|
|
self._download_blob(repository_ref, blob_digest)
|
|
except BlobDigestMismatchException:
|
|
raise UpstreamRegistryError("blob digest mismatch")
|
|
except BlobTooLargeException as e:
|
|
raise UpstreamRegistryError(f"blob too large, max allowed is {e.max_allowed}")
|
|
except BlobRangeMismatchException:
|
|
raise UpstreamRegistryError("range mismatch")
|
|
except BlobUploadException:
|
|
raise UpstreamRegistryError("invalid blob upload")
|
|
|
|
return super().get_repo_blob_by_digest(repository_ref, blob_digest, include_placements)
|
|
|
|
def _download_blob(self, repo_ref: RepositoryReference, digest: str) -> None:
|
|
"""
|
|
Download blob from upstream registry and perform a monolitic upload to
|
|
Quay's own storage.
|
|
"""
|
|
expiration = (
|
|
self._config.expiration_s
|
|
if self._config.expiration_s
|
|
else app.config["PUSH_TEMP_TAG_EXPIRATION_SEC"]
|
|
)
|
|
settings = BlobUploadSettings(
|
|
maximum_blob_size=app.config["MAXIMUM_LAYER_SIZE"],
|
|
committed_blob_expiration=expiration,
|
|
)
|
|
uploader = create_blob_upload(repo_ref, storage, settings)
|
|
with self._proxy.get_blob(digest) as resp:
|
|
start_offset = 0
|
|
length = int(resp.headers.get("content-length", -1))
|
|
with complete_when_uploaded(uploader):
|
|
uploader.upload_chunk(app.config, resp.raw, start_offset, length)
|
|
uploader.commit_to_blob(app.config, digest)
|
|
|
|
def convert_manifest(
|
|
self,
|
|
manifest,
|
|
namespace_name,
|
|
repo_name,
|
|
tag_name,
|
|
allowed_mediatypes,
|
|
storage,
|
|
):
|
|
return None
|
|
|
|
def get_schema1_parsed_manifest(
|
|
self, manifest, namespace_name, repo_name, tag_name, storage, raise_on_error=False
|
|
):
|
|
if raise_on_error:
|
|
raise ManifestException("manifest is not acceptable by the client")
|
|
return None
|
|
|
|
def _create_blob(self, digest: str, size: int, manifest_id: int, repo_id: int):
|
|
try:
|
|
blob = ImageStorage.get(content_checksum=digest)
|
|
except ImageStorage.DoesNotExist:
|
|
# TODO: which size should we really be setting here?
|
|
blob = ImageStorage.create(
|
|
content_checksum=digest, image_size=size, compressed_size=size
|
|
)
|
|
try:
|
|
ManifestBlob.get(manifest_id=manifest_id, blob=blob, repository_id=repo_id)
|
|
except ManifestBlob.DoesNotExist:
|
|
ManifestBlob.create(manifest_id=manifest_id, blob=blob, repository_id=repo_id)
|
|
|
|
# Add blob sizes if quota management is enabled
|
|
update_quota(repo_id, manifest_id, {blob.id: blob.image_size}, QuotaOperation.ADD)
|
|
|
|
return blob
|
|
|
|
def _create_placeholder_blobs(
|
|
self, manifest: ManifestInterface, manifest_id: int, repo_id: int
|
|
):
|
|
if manifest.is_manifest_list:
|
|
return
|
|
|
|
if manifest.schema_version == 2:
|
|
self._create_blob(
|
|
manifest.config.digest,
|
|
manifest.config.size,
|
|
manifest_id,
|
|
repo_id,
|
|
)
|
|
|
|
for layer in manifest.filesystem_layers:
|
|
self._create_blob(layer.digest, layer.compressed_size, manifest_id, repo_id)
|
|
username = self._user.username if self._user else None
|
|
queue_id = proxy_cache_blob_queue.put(
|
|
[self._namespace_name, str(repo_id), str(layer.digest)],
|
|
json.dumps(
|
|
{
|
|
"digest": str(layer.digest),
|
|
"repo_id": repo_id,
|
|
"username": username,
|
|
"namespace": self._namespace_name,
|
|
}
|
|
),
|
|
available_after=5,
|
|
)
|
|
|
|
def _upstream_namespace(self, repo: str) -> str:
|
|
upstream_namespace = self._config.upstream_registry_namespace
|
|
if upstream_namespace is None:
|
|
parts = repo.split("/")
|
|
upstream_namespace = parts[0]
|
|
return upstream_namespace
|
|
|
|
def _upstream_repo(self, repo: str) -> str:
|
|
upstream_repo_name = repo
|
|
if self._config.upstream_registry_namespace is None:
|
|
parts = repo.split("/")
|
|
if len(parts) == 1:
|
|
return repo
|
|
upstream_repo_name = parts[1]
|
|
return upstream_repo_name
|
|
|
|
def _pull_upstream_manifest(self, repo: str, manifest_ref: str) -> ManifestInterface:
|
|
try:
|
|
raw_manifest, content_type = self._proxy.get_manifest(
|
|
manifest_ref, ACCEPTED_MEDIA_TYPES
|
|
)
|
|
except UpstreamRegistryError as e:
|
|
raise ManifestDoesNotExist(str(e))
|
|
|
|
upstream_repo_name = self._upstream_repo(repo)
|
|
upstream_namespace = self._upstream_namespace(repo)
|
|
|
|
# TODO: do we need the compatibility check from v2._parse_manifest?
|
|
mbytes = Bytes.for_string_or_unicode(raw_manifest)
|
|
manifest = parse_manifest_from_bytes(mbytes, content_type, sparse_manifest_support=True)
|
|
valid = self._validate_schema1_manifest(upstream_namespace, upstream_repo_name, manifest)
|
|
if not valid:
|
|
raise ManifestDoesNotExist("invalid schema 1 manifest")
|
|
return manifest
|
|
|
|
def _validate_schema1_manifest(
|
|
self, namespace: str, repo: str, manifest: DockerSchema1Manifest
|
|
) -> bool:
|
|
if manifest.schema_version != 1:
|
|
return True
|
|
|
|
if (
|
|
manifest.namespace == ""
|
|
and features.LIBRARY_SUPPORT
|
|
and namespace == app.config["LIBRARY_NAMESPACE"]
|
|
):
|
|
pass
|
|
elif manifest.namespace != namespace:
|
|
return False
|
|
|
|
if manifest.repo_name != repo:
|
|
return False
|
|
|
|
return True
|