mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
48 lines
1.5 KiB
Python
48 lines
1.5 KiB
Python
from data.database import ImageStorage, ManifestBlob, UploadedBlob
|
|
from data.model import BlobDoesNotExist
|
|
from data.model.storage import InvalidImageException, get_storage_by_uuid
|
|
|
|
|
|
def get_repository_blob_by_digest(repository, blob_digest):
|
|
"""
|
|
Find the content-addressable blob linked to the specified repository and returns it or None if
|
|
none.
|
|
"""
|
|
# First try looking for a recently uploaded blob. If none found that is matching,
|
|
# check the repository itself.
|
|
storage = _lookup_blob_uploaded(repository, blob_digest)
|
|
if storage is None:
|
|
storage = _lookup_blob_in_repository(repository, blob_digest)
|
|
|
|
return get_storage_by_uuid(storage.uuid) if storage is not None else None
|
|
|
|
|
|
def _lookup_blob_uploaded(repository, blob_digest):
|
|
try:
|
|
return (
|
|
ImageStorage.select(ImageStorage.uuid)
|
|
.join(UploadedBlob)
|
|
.where(
|
|
UploadedBlob.repository == repository,
|
|
ImageStorage.content_checksum == blob_digest,
|
|
)
|
|
.get()
|
|
)
|
|
except ImageStorage.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def _lookup_blob_in_repository(repository, blob_digest):
|
|
try:
|
|
return (
|
|
ImageStorage.select(ImageStorage.uuid)
|
|
.join(ManifestBlob)
|
|
.where(
|
|
ManifestBlob.repository == repository,
|
|
ImageStorage.content_checksum == blob_digest,
|
|
)
|
|
.get()
|
|
)
|
|
except ImageStorage.DoesNotExist:
|
|
return None
|