1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/workers/manifestbackfillworker.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

122 lines
3.9 KiB
Python

import logging
import time
from peewee import fn
import features
from app import app
from data.database import Manifest
from image.shared.schemas import ManifestException, parse_manifest_from_bytes
from util.bytes import Bytes
from util.log import logfile_path
from util.migrate.allocator import yield_random_entries
from workers.gunicorn_worker import GunicornWorker
from workers.worker import Worker
logger = logging.getLogger(__name__)
WORKER_FREQUENCY = app.config.get("MANIFEST_BACKFILL_WORKER_FREQUENCY", 60 * 60)
class ManifestBackfillWorker(Worker):
"""
Worker which backfills the newly added layers compressed size and config media type
fields onto Manifest.
"""
def __init__(self):
super(ManifestBackfillWorker, self).__init__()
self.add_operation(self._backfill_manifests, WORKER_FREQUENCY)
def _backfill_manifests(self):
try:
Manifest.select().where(Manifest.layers_compressed_size >> None).get()
except Manifest.DoesNotExist:
logger.debug("Manifest backfill worker has completed; skipping")
return False
iterator = yield_random_entries(
lambda: Manifest.select().where(Manifest.layers_compressed_size >> None),
Manifest.id,
250,
Manifest.select(fn.Max(Manifest.id)).scalar(),
1,
)
for manifest_row, abt, _ in iterator:
if manifest_row.layers_compressed_size is not None:
logger.debug("Another worker preempted this worker")
abt.set()
continue
logger.debug("Setting layers compressed size for manifest %s", manifest_row.id)
layers_compressed_size = -1
config_media_type = None
manifest_bytes = Bytes.for_string_or_unicode(manifest_row.manifest_bytes)
try:
parsed = parse_manifest_from_bytes(
manifest_bytes, manifest_row.media_type.name, validate=False
)
layers_compressed_size = parsed.layers_compressed_size
if layers_compressed_size is None:
layers_compressed_size = 0
config_media_type = parsed.config_media_type or None
except ManifestException as me:
logger.warning(
"Got exception when trying to parse manifest %s: %s", manifest_row.id, me
)
assert layers_compressed_size is not None
updated = (
Manifest.update(
layers_compressed_size=layers_compressed_size,
config_media_type=config_media_type,
)
.where(Manifest.id == manifest_row.id, Manifest.layers_compressed_size >> None)
.execute()
)
if updated != 1:
logger.debug("Another worker preempted this worker")
abt.set()
continue
return True
def create_gunicorn_worker():
"""
follows the gunicorn application factory pattern, enabling
a quay worker to run as a gunicorn worker thread.
this is useful when utilizing gunicorn's hot reload in local dev.
utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio.
"""
worker = GunicornWorker(
__name__, app, ManifestBackfillWorker(), features.MANIFEST_SIZE_BACKFILL
)
return worker
def main():
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
if app.config.get("ACCOUNT_RECOVERY_MODE", False):
logger.debug("Quay running in account recovery mode")
while True:
time.sleep(100000)
if not features.MANIFEST_SIZE_BACKFILL:
logger.debug("Manifest backfill worker not enabled; skipping")
while True:
time.sleep(100000)
worker = ManifestBackfillWorker()
worker.start()
if __name__ == "__main__":
main()