1
0
mirror of https://github.com/quay/quay.git synced 2026-01-29 08:42:15 +03:00
Files
quay/workers/manifestsubjectbackfillworker.py
Kenny Lee Sin Cheong 4546163e83 registry: implements the OCI 1.1 referrers API (PROJQUAY-7280) (#2597)
* registry: implements the OCI 1.1 referrers API

Migrations:
- Adds a subject column for lookup
- Adds a subject_backfilled column to track status of the backfilling
of existing manifests
- Adds a manifest_json column making use of postgres' JSONB support,
for future use.

Manifestsubjectbackfillworker: Indexes existing manifests for possible
existing subject field.

* Deprecate IGNORE_UNKNOWN_MEDIATYPES

* Cleanup
2024-06-07 13:28:13 -04:00

117 lines
3.6 KiB
Python

import json
import logging
import time
from peewee import fn
import features
from app import app
from data.database import Manifest
from image.shared.schemas import ManifestException, parse_manifest_from_bytes
from util.bytes import Bytes
from util.log import logfile_path
from util.migrate.allocator import yield_random_entries
from workers.gunicorn_worker import GunicornWorker
from workers.worker import Worker
logger = logging.getLogger(__name__)
WORKER_FREQUENCY = app.config.get("MANIFEST_SUBJECT_BACKFILL_WORKER_FREQUENCY", 60)
class ManifestSubjectBackfillWorker(Worker):
"""
Worker which backfills the newly added subject fields onto Manifest.
"""
def __init__(self):
super().__init__()
self.add_operation(self._backfill_manifest_subject, WORKER_FREQUENCY)
def _backfill_manifest_subject(self):
try:
Manifest.select().where(
Manifest.subject_backfilled == False | Manifest.subject_backfilled >> None
).get()
except Manifest.DoesNotExist:
logger.debug("Manifest subject backfill worker has completed; skipping")
return False
iterator = yield_random_entries(
lambda: Manifest.select().where(Manifest.subject_backfilled == False),
Manifest.id,
250,
Manifest.select(fn.Max(Manifest.id)).scalar(),
1,
)
for manifest_row, abt, _ in iterator:
if manifest_row.subject_backfilled:
logger.debug("Another worker preempted this worker")
abt.set()
continue
logger.debug("Setting manifest subject for manifest %s", manifest_row.id)
manifest_bytes = Bytes.for_string_or_unicode(manifest_row.manifest_bytes)
try:
parsed = parse_manifest_from_bytes(
manifest_bytes, manifest_row.media_type.name, validate=False
)
subject = parsed.subject
except ManifestException as me:
logger.warning(
"Got exception when trying to parse manifest %s: %s", manifest_row.id, me
)
updated = (
Manifest.update(
subject=subject,
subject_backfilled=True,
)
.where(Manifest.id == manifest_row.id, Manifest.subject_backfilled == False)
.execute()
)
if updated != 1:
logger.debug("Another worker preempted this worker")
abt.set()
continue
return True
def create_gunicorn_worker():
"""
follows the gunicorn application factory pattern, enabling
a quay worker to run as a gunicorn worker thread.
this is useful when utilizing gunicorn's hot reload in local dev.
utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio.
"""
worker = GunicornWorker(
__name__, app, ManifestSubjectBackfillWorker(), features.MANIFEST_SUBJECT_BACKFILL
)
return worker
def main():
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
if app.config.get("ACCOUNT_RECOVERY_MODE", False):
logger.debug("Quay running in account recovery mode")
while True:
time.sleep(100000)
if not features.MANIFEST_SUBJECT_BACKFILL:
logger.debug("Manifest backfill worker not enabled; skipping")
while True:
time.sleep(100000)
worker = ManifestSubjectBackfillWorker()
worker.start()
if __name__ == "__main__":
main()