1
0
mirror of https://github.com/quay/quay.git synced 2025-07-31 18:44:32 +03:00

Have the BlobUpload cleanup worker run on a single instance only (#239)

Instead of running simultaneously across multiple nodes, we change the
worker to run only from a single instance. This is better for the DB and
the previous behavior was not necessary given the size of the BlobUpload
table.

Fixes https://issues.redhat.com/browse/PROJQUAY-365
This commit is contained in:
Joseph Schorr
2020-02-27 13:16:40 -05:00
committed by GitHub
parent 1b023b8c2f
commit 424c1a19d7
4 changed files with 40 additions and 23 deletions

View File

@ -169,31 +169,18 @@ def _temp_link_blob(repository_id, storage, link_expiration_s):
def get_stale_blob_upload(stale_timespan): def get_stale_blob_upload(stale_timespan):
""" """
Returns a random blob upload which was created before the stale timespan. Returns a blob upload which was created before the stale timespan.
""" """
stale_threshold = datetime.now() - stale_timespan stale_threshold = datetime.now() - stale_timespan
try: try:
candidates = ( candidates = (
BlobUpload.select()
.where(BlobUpload.created <= stale_threshold)
.limit(500)
.distinct()
.alias("candidates")
)
found = (
BlobUpload.select(candidates.c.id).from_(candidates).order_by(db_random_func()).get()
)
if not found:
return None
return (
BlobUpload.select(BlobUpload, ImageStorageLocation) BlobUpload.select(BlobUpload, ImageStorageLocation)
.join(ImageStorageLocation) .join(ImageStorageLocation)
.where(BlobUpload.id == found.id) .where(BlobUpload.created <= stale_threshold)
.get()
) )
return candidates.get()
except BlobUpload.DoesNotExist: except BlobUpload.DoesNotExist:
return None return None

View File

@ -1,29 +1,45 @@
import logging import logging
import logging.config import logging.config
from datetime import timedelta from datetime import timedelta, datetime
from app import app, storage from app import app, storage
from data.database import UseThenDisconnect from data.database import UseThenDisconnect
from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model
from workers.worker import Worker from workers.worker import Worker
from util.log import logfile_path from util.log import logfile_path
from util.locking import GlobalLock, LockNotAcquiredException
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DELETION_DATE_THRESHOLD = timedelta(days=2) DELETION_DATE_THRESHOLD = timedelta(days=2)
BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get("BLOBUPLOAD_CLEANUP_FREQUENCY", 60 * 60) BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get("BLOBUPLOAD_CLEANUP_FREQUENCY", 60 * 60)
LOCK_TTL = 60 * 20 # 20 minutes
class BlobUploadCleanupWorker(Worker): class BlobUploadCleanupWorker(Worker):
def __init__(self): def __init__(self):
super(BlobUploadCleanupWorker, self).__init__() super(BlobUploadCleanupWorker, self).__init__()
self.add_operation(self._cleanup_uploads, BLOBUPLOAD_CLEANUP_FREQUENCY) self.add_operation(self._try_cleanup_uploads, BLOBUPLOAD_CLEANUP_FREQUENCY)
def _cleanup_uploads(self): def _try_cleanup_uploads(self):
""" """
Performs garbage collection on the blobupload table. Performs garbage collection on the blobupload table.
""" """
try:
with GlobalLock("BLOB_CLEANUP", lock_ttl=LOCK_TTL):
self._cleanup_uploads()
except LockNotAcquiredException:
logger.debug("Could not acquire global lock for blob upload cleanup worker")
return
def _cleanup_uploads(self):
"""
Performs cleanup on the blobupload table.
"""
logger.debug("Performing blob upload cleanup")
while True: while True:
# Find all blob uploads older than the threshold (typically a week) and delete them. # Find all blob uploads older than the threshold (typically a week) and delete them.
with UseThenDisconnect(app.config): with UseThenDisconnect(app.config):
@ -34,6 +50,8 @@ class BlobUploadCleanupWorker(Worker):
# Remove the stale upload from storage. # Remove the stale upload from storage.
logger.debug("Removing stale blob upload %s", stale_upload.uuid) logger.debug("Removing stale blob upload %s", stale_upload.uuid)
assert stale_upload.created <= (datetime.utcnow() - DELETION_DATE_THRESHOLD)
try: try:
storage.cancel_chunked_upload( storage.cancel_chunked_upload(
[stale_upload.location_name], stale_upload.uuid, stale_upload.storage_metadata [stale_upload.location_name], stale_upload.uuid, stale_upload.storage_metadata

View File

@ -3,7 +3,9 @@ from collections import namedtuple
from six import add_metaclass from six import add_metaclass
class BlobUpload(namedtuple("BlobUpload", ["uuid", "storage_metadata", "location_name"])): class BlobUpload(
namedtuple("BlobUpload", ["uuid", "storage_metadata", "location_name", "created"])
):
""" """
BlobUpload represents a single upload of a blob in progress or previously started. BlobUpload represents a single upload of a blob in progress or previously started.
""" """

View File

@ -14,7 +14,12 @@ class PreOCIModel(BlobUploadCleanupWorkerDataInterface):
if blob_upload is None: if blob_upload is None:
return None return None
return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) return BlobUpload(
blob_upload.uuid,
blob_upload.storage_metadata,
blob_upload.location.name,
blob_upload.created,
)
def delete_blob_upload(self, blob_upload): def delete_blob_upload(self, blob_upload):
blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid) blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid)
@ -30,7 +35,12 @@ class PreOCIModel(BlobUploadCleanupWorkerDataInterface):
blob_upload = model.blob.initiate_upload("devtable", "simple", "foobarbaz", "local_us", {}) blob_upload = model.blob.initiate_upload("devtable", "simple", "foobarbaz", "local_us", {})
blob_upload.created = datetime.now() - timedelta(days=60) blob_upload.created = datetime.now() - timedelta(days=60)
blob_upload.save() blob_upload.save()
return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) return BlobUpload(
blob_upload.uuid,
blob_upload.storage_metadata,
blob_upload.location.name,
blob_upload.created,
)
def blob_upload_exists(self, upload_uuid): def blob_upload_exists(self, upload_uuid):
blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid) blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid)