mirror of
https://github.com/huggingface/diffusers.git
synced 2026-01-27 17:22:53 +03:00
* Basic implementation of request scheduling * Basic editing in SD and Flux Pipelines * Small Fix * Fix * Update for more pipelines * Add examples/server-async * Add examples/server-async * Updated RequestScopedPipeline to handle a single tokenizer lock to avoid race conditions * Fix * Fix _TokenizerLockWrapper * Fix _TokenizerLockWrapper * Delete _TokenizerLockWrapper * Fix tokenizer * Update examples/server-async * Fix server-async * Optimizations in examples/server-async * We keep the implementation simple in examples/server-async * Update examples/server-async/README.md * Update examples/server-async/README.md for changes to tokenizer locks and backward-compatible retrieve_timesteps * The changes to the diffusers core have been undone and all logic is being moved to exmaples/server-async * Update examples/server-async/utils/* * Fix BaseAsyncScheduler * Rollback in the core of the diffusers * Update examples/server-async/README.md * Complete rollback of diffusers core files * Simple implementation of an asynchronous server compatible with SD3-3.5 and Flux Pipelines * Update examples/server-async/README.md * Fixed import errors in 'examples/server-async/serverasync.py' * Flux Pipeline Discard * Update examples/server-async/README.md * Apply style fixes * Add thread-safe wrappers for components in pipeline Refactor requestscopedpipeline.py to add thread-safe wrappers for tokenizer, VAE, and image processor. Introduce locking mechanisms to ensure thread safety during concurrent access. * Add wrappers.py * Apply style fixes --------- Co-authored-by: Sayak Paul <spsayakpaul@gmail.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
87 lines
2.2 KiB
Python
87 lines
2.2 KiB
Python
class ThreadSafeTokenizerWrapper:
|
|
def __init__(self, tokenizer, lock):
|
|
self._tokenizer = tokenizer
|
|
self._lock = lock
|
|
|
|
self._thread_safe_methods = {
|
|
"__call__",
|
|
"encode",
|
|
"decode",
|
|
"tokenize",
|
|
"encode_plus",
|
|
"batch_encode_plus",
|
|
"batch_decode",
|
|
}
|
|
|
|
def __getattr__(self, name):
|
|
attr = getattr(self._tokenizer, name)
|
|
|
|
if name in self._thread_safe_methods and callable(attr):
|
|
|
|
def wrapped_method(*args, **kwargs):
|
|
with self._lock:
|
|
return attr(*args, **kwargs)
|
|
|
|
return wrapped_method
|
|
|
|
return attr
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
with self._lock:
|
|
return self._tokenizer(*args, **kwargs)
|
|
|
|
def __setattr__(self, name, value):
|
|
if name.startswith("_"):
|
|
super().__setattr__(name, value)
|
|
else:
|
|
setattr(self._tokenizer, name, value)
|
|
|
|
def __dir__(self):
|
|
return dir(self._tokenizer)
|
|
|
|
|
|
class ThreadSafeVAEWrapper:
|
|
def __init__(self, vae, lock):
|
|
self._vae = vae
|
|
self._lock = lock
|
|
|
|
def __getattr__(self, name):
|
|
attr = getattr(self._vae, name)
|
|
if name in {"decode", "encode", "forward"} and callable(attr):
|
|
|
|
def wrapped(*args, **kwargs):
|
|
with self._lock:
|
|
return attr(*args, **kwargs)
|
|
|
|
return wrapped
|
|
return attr
|
|
|
|
def __setattr__(self, name, value):
|
|
if name.startswith("_"):
|
|
super().__setattr__(name, value)
|
|
else:
|
|
setattr(self._vae, name, value)
|
|
|
|
|
|
class ThreadSafeImageProcessorWrapper:
|
|
def __init__(self, proc, lock):
|
|
self._proc = proc
|
|
self._lock = lock
|
|
|
|
def __getattr__(self, name):
|
|
attr = getattr(self._proc, name)
|
|
if name in {"postprocess", "preprocess"} and callable(attr):
|
|
|
|
def wrapped(*args, **kwargs):
|
|
with self._lock:
|
|
return attr(*args, **kwargs)
|
|
|
|
return wrapped
|
|
return attr
|
|
|
|
def __setattr__(self, name, value):
|
|
if name.startswith("_"):
|
|
super().__setattr__(name, value)
|
|
else:
|
|
setattr(self._proc, name, value)
|