1
0
mirror of https://github.com/quay/quay.git synced 2025-04-19 21:42:17 +03:00
quay/util/asyncwrapper.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

72 lines
2.4 KiB
Python

import queue
from concurrent.futures import CancelledError, Executor, Future
from functools import wraps
class AsyncExecutorWrapper(object):
"""
This class will wrap a syncronous library transparently in a way which will move all calls off
to an asynchronous Executor, and will change all returned values to be Future objects.
"""
SYNC_FLAG_FIELD = "__AsyncExecutorWrapper__sync__"
def __init__(self, delegate, executor):
"""
Wrap the specified synchronous delegate instance, and submit() all method calls to the
specified Executor instance.
"""
self._delegate = delegate
self._executor = executor
def __getattr__(self, attr_name):
maybe_callable = getattr(self._delegate, attr_name) # Will raise proper attribute error
if callable(maybe_callable):
# Build a callable which when executed places the request
# onto a queue
@wraps(maybe_callable)
def wrapped_method(*args, **kwargs):
if getattr(maybe_callable, self.SYNC_FLAG_FIELD, False):
sync_result = Future()
try:
sync_result.set_result(maybe_callable(*args, **kwargs))
except Exception as ex:
sync_result.set_exception(ex)
return sync_result
try:
return self._executor.submit(maybe_callable, *args, **kwargs)
except queue.Full as ex:
queue_full = Future()
queue_full.set_exception(ex)
return queue_full
return wrapped_method
else:
return maybe_callable
@classmethod
def sync(cls, f):
"""
Annotate the given method to flag it as synchronous so that AsyncExecutorWrapper will return
the result immediately without submitting it to the executor.
"""
setattr(f, cls.SYNC_FLAG_FIELD, True)
return f
class NullExecutorCancelled(CancelledError):
def __init__(self):
super(NullExecutorCancelled, self).__init__("Null executor always fails.")
class NullExecutor(Executor):
"""
Executor instance which always returns a Future completed with a CancelledError exception.
"""
def submit(self, _, *args, **kwargs):
always_fail = Future()
always_fail.set_exception(NullExecutorCancelled())
return always_fail