mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* proxycache: Download blob not cached when pulling manifests with blob available locally (PROJQUAY-6708) * Skip downloading blobs without placeholders --------- Co-authored-by: Kenny Lee Sin Cheong <kenny.lee28@gmail.com> Co-authored-by: Brandon Caton <bcaton@redhat.com>
127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
import logging
|
|
import time
|
|
|
|
import features
|
|
from app import app, proxy_cache_blob_queue
|
|
from data.database import ImageStorage, ImageStoragePlacement, ManifestBlob
|
|
from data.model import repository, user
|
|
from data.registry_model.datatypes import RepositoryReference
|
|
from data.registry_model.registry_proxy_model import ProxyModel
|
|
from util.log import logfile_path
|
|
from workers.gunicorn_worker import GunicornWorker
|
|
from workers.queueworker import (
|
|
JobException,
|
|
QueueWorker,
|
|
WorkerSleepException,
|
|
WorkerUnhealthyException,
|
|
)
|
|
from workers.worker import with_exponential_backoff
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
POLL_PERIOD_SECONDS = 10
|
|
RESERVATION_SECONDS = 60 * 20
|
|
|
|
|
|
class ProxyCacheBlobWorker(QueueWorker):
|
|
def process_queue_item(self, job_details):
|
|
repo_id = job_details["repo_id"]
|
|
namespace_name = job_details["namespace"]
|
|
digest = job_details["digest"]
|
|
username = job_details["username"]
|
|
|
|
repo_name = repository.lookup_repository(repo_id).name
|
|
repo_ref = RepositoryReference.for_id(repo_id)
|
|
user_ref = user.get_username(username)
|
|
|
|
registry_proxy_model = ProxyModel(
|
|
namespace_name,
|
|
repo_name,
|
|
user_ref,
|
|
)
|
|
|
|
logger.debug(
|
|
"Starting proxy cache blob download for digest %s for repo id %s",
|
|
digest,
|
|
repo_id,
|
|
)
|
|
|
|
if self._should_download_blob(digest, repo_id, registry_proxy_model):
|
|
try:
|
|
registry_proxy_model._download_blob(
|
|
repo_ref,
|
|
digest,
|
|
)
|
|
except:
|
|
logger.exception(
|
|
"Exception when downloading blob %s for repo id %s for proxy cache",
|
|
digest,
|
|
repo_id,
|
|
)
|
|
|
|
return
|
|
|
|
def _should_download_blob(self, digest, repo_id, registry_proxy_model):
|
|
blob = registry_proxy_model._get_shared_storage(digest)
|
|
if blob is None:
|
|
try:
|
|
blob = (
|
|
ImageStorage.select()
|
|
.join(ManifestBlob)
|
|
.where(
|
|
ManifestBlob.repository_id == repo_id,
|
|
ImageStorage.content_checksum == digest,
|
|
)
|
|
.get()
|
|
)
|
|
except ImageStorage.DoesNotExist:
|
|
# There should be placeholder blobs after manifest requests
|
|
return False
|
|
|
|
try:
|
|
ImageStoragePlacement.select().where(ImageStoragePlacement.storage == blob).get()
|
|
except ImageStoragePlacement.DoesNotExist:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def create_gunicorn_worker():
|
|
"""
|
|
follows the gunicorn application factory pattern, enabling
|
|
a quay worker to run as a gunicorn worker thread.
|
|
|
|
this is useful when utilizing gunicorn's hot reload in local dev.
|
|
|
|
utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio.
|
|
"""
|
|
proxy_cache_blob_worker = ProxyCacheBlobWorker(
|
|
proxy_cache_blob_queue,
|
|
poll_period_seconds=POLL_PERIOD_SECONDS,
|
|
reservation_seconds=RESERVATION_SECONDS,
|
|
)
|
|
worker = GunicornWorker(__name__, app, repl_worker, features.PROXY_CACHE)
|
|
return worker
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
|
|
|
if app.config.get("ACCOUNT_RECOVERY_MODE", False):
|
|
logger.debug("Quay running in account recovery mode")
|
|
while True:
|
|
time.sleep(100000)
|
|
|
|
if not features.PROXY_CACHE:
|
|
while True:
|
|
time.sleep(100000)
|
|
|
|
logger.debug("Starting proxy cache blob worker")
|
|
worker = ProxyCacheBlobWorker(
|
|
proxy_cache_blob_queue,
|
|
poll_period_seconds=POLL_PERIOD_SECONDS,
|
|
reservation_seconds=RESERVATION_SECONDS,
|
|
)
|
|
worker.start()
|