diff --git a/data/model/blob.py b/data/model/blob.py index 9a1f7ce06..19889555d 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -169,31 +169,18 @@ def _temp_link_blob(repository_id, storage, link_expiration_s): def get_stale_blob_upload(stale_timespan): """ - Returns a random blob upload which was created before the stale timespan. + Returns a blob upload which was created before the stale timespan. """ stale_threshold = datetime.now() - stale_timespan try: candidates = ( - BlobUpload.select() - .where(BlobUpload.created <= stale_threshold) - .limit(500) - .distinct() - .alias("candidates") - ) - - found = ( - BlobUpload.select(candidates.c.id).from_(candidates).order_by(db_random_func()).get() - ) - if not found: - return None - - return ( BlobUpload.select(BlobUpload, ImageStorageLocation) .join(ImageStorageLocation) - .where(BlobUpload.id == found.id) - .get() + .where(BlobUpload.created <= stale_threshold) ) + + return candidates.get() except BlobUpload.DoesNotExist: return None diff --git a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py index 50d6ea015..10fb0ba33 100644 --- a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py @@ -1,29 +1,45 @@ import logging import logging.config -from datetime import timedelta +from datetime import timedelta, datetime from app import app, storage from data.database import UseThenDisconnect from workers.blobuploadcleanupworker.models_pre_oci import pre_oci_model as model from workers.worker import Worker from util.log import logfile_path +from util.locking import GlobalLock, LockNotAcquiredException + logger = logging.getLogger(__name__) DELETION_DATE_THRESHOLD = timedelta(days=2) BLOBUPLOAD_CLEANUP_FREQUENCY = app.config.get("BLOBUPLOAD_CLEANUP_FREQUENCY", 60 * 60) +LOCK_TTL = 60 * 20 # 20 minutes class BlobUploadCleanupWorker(Worker): def __init__(self): super(BlobUploadCleanupWorker, self).__init__() - self.add_operation(self._cleanup_uploads, BLOBUPLOAD_CLEANUP_FREQUENCY) + self.add_operation(self._try_cleanup_uploads, BLOBUPLOAD_CLEANUP_FREQUENCY) - def _cleanup_uploads(self): + def _try_cleanup_uploads(self): """ Performs garbage collection on the blobupload table. """ + try: + with GlobalLock("BLOB_CLEANUP", lock_ttl=LOCK_TTL): + self._cleanup_uploads() + except LockNotAcquiredException: + logger.debug("Could not acquire global lock for blob upload cleanup worker") + return + + def _cleanup_uploads(self): + """ + Performs cleanup on the blobupload table. + """ + logger.debug("Performing blob upload cleanup") + while True: # Find all blob uploads older than the threshold (typically a week) and delete them. with UseThenDisconnect(app.config): @@ -34,6 +50,8 @@ class BlobUploadCleanupWorker(Worker): # Remove the stale upload from storage. logger.debug("Removing stale blob upload %s", stale_upload.uuid) + assert stale_upload.created <= (datetime.utcnow() - DELETION_DATE_THRESHOLD) + try: storage.cancel_chunked_upload( [stale_upload.location_name], stale_upload.uuid, stale_upload.storage_metadata diff --git a/workers/blobuploadcleanupworker/models_interface.py b/workers/blobuploadcleanupworker/models_interface.py index 18cd33ef5..7329b6723 100644 --- a/workers/blobuploadcleanupworker/models_interface.py +++ b/workers/blobuploadcleanupworker/models_interface.py @@ -3,7 +3,9 @@ from collections import namedtuple from six import add_metaclass -class BlobUpload(namedtuple("BlobUpload", ["uuid", "storage_metadata", "location_name"])): +class BlobUpload( + namedtuple("BlobUpload", ["uuid", "storage_metadata", "location_name", "created"]) +): """ BlobUpload represents a single upload of a blob in progress or previously started. """ diff --git a/workers/blobuploadcleanupworker/models_pre_oci.py b/workers/blobuploadcleanupworker/models_pre_oci.py index e4a745c9e..3c888baa9 100644 --- a/workers/blobuploadcleanupworker/models_pre_oci.py +++ b/workers/blobuploadcleanupworker/models_pre_oci.py @@ -14,7 +14,12 @@ class PreOCIModel(BlobUploadCleanupWorkerDataInterface): if blob_upload is None: return None - return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + return BlobUpload( + blob_upload.uuid, + blob_upload.storage_metadata, + blob_upload.location.name, + blob_upload.created, + ) def delete_blob_upload(self, blob_upload): blob_upload = model.blob.get_blob_upload_by_uuid(blob_upload.uuid) @@ -30,7 +35,12 @@ class PreOCIModel(BlobUploadCleanupWorkerDataInterface): blob_upload = model.blob.initiate_upload("devtable", "simple", "foobarbaz", "local_us", {}) blob_upload.created = datetime.now() - timedelta(days=60) blob_upload.save() - return BlobUpload(blob_upload.uuid, blob_upload.storage_metadata, blob_upload.location.name) + return BlobUpload( + blob_upload.uuid, + blob_upload.storage_metadata, + blob_upload.location.name, + blob_upload.created, + ) def blob_upload_exists(self, upload_uuid): blob_upload = model.blob.get_blob_upload_by_uuid(upload_uuid)