1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/model/oci/blob.py
Kenny Lee Sin Cheong 5f63b3a7bb chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

48 lines
1.5 KiB
Python

from data.database import ImageStorage, ManifestBlob, UploadedBlob
from data.model import BlobDoesNotExist
from data.model.storage import InvalidImageException, get_storage_by_uuid
def get_repository_blob_by_digest(repository, blob_digest):
"""
Find the content-addressable blob linked to the specified repository and returns it or None if
none.
"""
# First try looking for a recently uploaded blob. If none found that is matching,
# check the repository itself.
storage = _lookup_blob_uploaded(repository, blob_digest)
if storage is None:
storage = _lookup_blob_in_repository(repository, blob_digest)
return get_storage_by_uuid(storage.uuid) if storage is not None else None
def _lookup_blob_uploaded(repository, blob_digest):
try:
return (
ImageStorage.select(ImageStorage.uuid)
.join(UploadedBlob)
.where(
UploadedBlob.repository == repository,
ImageStorage.content_checksum == blob_digest,
)
.get()
)
except ImageStorage.DoesNotExist:
return None
def _lookup_blob_in_repository(repository, blob_digest):
try:
return (
ImageStorage.select(ImageStorage.uuid)
.join(ManifestBlob)
.where(
ManifestBlob.repository == repository,
ImageStorage.content_checksum == blob_digest,
)
.get()
)
except ImageStorage.DoesNotExist:
return None