mirror of
https://github.com/huggingface/diffusers.git
synced 2026-01-27 17:22:53 +03:00
* Basic implementation of request scheduling * Basic editing in SD and Flux Pipelines * Small Fix * Fix * Update for more pipelines * Add examples/server-async * Add examples/server-async * Updated RequestScopedPipeline to handle a single tokenizer lock to avoid race conditions * Fix * Fix _TokenizerLockWrapper * Fix _TokenizerLockWrapper * Delete _TokenizerLockWrapper * Fix tokenizer * Update examples/server-async * Fix server-async * Optimizations in examples/server-async * We keep the implementation simple in examples/server-async * Update examples/server-async/README.md * Update examples/server-async/README.md for changes to tokenizer locks and backward-compatible retrieve_timesteps * The changes to the diffusers core have been undone and all logic is being moved to exmaples/server-async * Update examples/server-async/utils/* * Fix BaseAsyncScheduler * Rollback in the core of the diffusers * Update examples/server-async/README.md * Complete rollback of diffusers core files * Simple implementation of an asynchronous server compatible with SD3-3.5 and Flux Pipelines * Update examples/server-async/README.md * Fixed import errors in 'examples/server-async/serverasync.py' * Flux Pipeline Discard * Update examples/server-async/README.md * Apply style fixes --------- Co-authored-by: Sayak Paul <spsayakpaul@gmail.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
171 lines
8.1 KiB
Markdown
171 lines
8.1 KiB
Markdown
# Asynchronous server and parallel execution of models
|
|
|
|
> Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3 pipelines.
|
|
> We recommend running 10 to 50 inferences in parallel for optimal performance, averaging between 25 and 30 seconds to 1 minute and 1 minute and 30 seconds. (This is only recommended if you have a GPU with 35GB of VRAM or more; otherwise, keep it to one or two inferences in parallel to avoid decoding or saving errors due to memory shortages.)
|
|
|
|
## ⚠️ IMPORTANT
|
|
|
|
* The example demonstrates how to run pipelines like `StableDiffusion3-3.5` concurrently while keeping a single copy of the heavy model parameters on GPU.
|
|
|
|
## Necessary components
|
|
|
|
All the components needed to create the inference server are in the current directory:
|
|
|
|
```
|
|
server-async/
|
|
├── utils/
|
|
├─────── __init__.py
|
|
├─────── scheduler.py # BaseAsyncScheduler wrapper and async_retrieve_timesteps for secure inferences
|
|
├─────── requestscopedpipeline.py # RequestScoped Pipeline for inference with a single in-memory model
|
|
├─────── utils.py # Image/video saving utilities and service configuration
|
|
├── Pipelines.py # pipeline loader classes (SD3)
|
|
├── serverasync.py # FastAPI app with lifespan management and async inference endpoints
|
|
├── test.py # Client test script for inference requests
|
|
├── requirements.txt # Dependencies
|
|
└── README.md # This documentation
|
|
```
|
|
|
|
## What `diffusers-async` adds / Why we needed it
|
|
|
|
Core problem: a naive server that calls `pipe.__call__` concurrently can hit **race conditions** (e.g., `scheduler.set_timesteps` mutates shared state) or explode memory by deep-copying the whole pipeline per-request.
|
|
|
|
`diffusers-async` / this example addresses that by:
|
|
|
|
* **Request-scoped views**: `RequestScopedPipeline` creates a shallow copy of the pipeline per request so heavy weights (UNet, VAE, text encoder) remain shared and *are not duplicated*.
|
|
* **Per-request mutable state**: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. The system uses `BaseAsyncScheduler.clone_for_request(...)` for scheduler cloning, with fallback to safe `deepcopy` or other heuristics.
|
|
* **Tokenizer concurrency safety**: `RequestScopedPipeline` now manages an internal tokenizer lock with automatic tokenizer detection and wrapping. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors like `Already borrowed` no longer occur.
|
|
* **`async_retrieve_timesteps(..., return_scheduler=True)`**: fully retro-compatible helper that returns `(timesteps, num_inference_steps, scheduler)` without mutating the shared scheduler. For users not using `return_scheduler=True`, the behavior is identical to the original API.
|
|
* **Robust attribute handling**: wrapper avoids writing to read-only properties (e.g., `components`) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors. Configurable tensor size threshold prevents cloning of large tensors.
|
|
* **Enhanced scheduler wrapping**: `BaseAsyncScheduler` automatically wraps schedulers with improved `__getattr__`, `__setattr__`, and debugging methods (`__repr__`, `__str__`).
|
|
|
|
## How the server works (high-level flow)
|
|
|
|
1. **Single model instance** is loaded into memory (GPU/MPS) when the server starts.
|
|
2. On each HTTP inference request:
|
|
|
|
* The server uses `RequestScopedPipeline.generate(...)` which:
|
|
|
|
* automatically wraps the base scheduler in `BaseAsyncScheduler` (if not already wrapped),
|
|
* obtains a *local scheduler* (via `clone_for_request()` or `deepcopy`),
|
|
* does `local_pipe = copy.copy(base_pipe)` (shallow copy),
|
|
* sets `local_pipe.scheduler = local_scheduler` (if possible),
|
|
* clones only small mutable attributes (callbacks, rng, small latents) with auto-detection,
|
|
* wraps tokenizers with thread-safe locks to prevent race conditions,
|
|
* optionally enters a `model_cpu_offload_context()` for memory offload hooks,
|
|
* calls the pipeline on the local view (`local_pipe(...)`).
|
|
3. **Result**: inference completes, images are moved to CPU & saved (if requested), internal buffers freed (GC + `torch.cuda.empty_cache()`).
|
|
4. Multiple requests can run in parallel while sharing heavy weights and isolating mutable state.
|
|
|
|
## How to set up and run the server
|
|
|
|
### 1) Install dependencies
|
|
|
|
Recommended: create a virtualenv / conda environment.
|
|
|
|
```bash
|
|
pip install diffusers
|
|
pip install -r requirements.txt
|
|
```
|
|
|
|
### 2) Start the server
|
|
|
|
Using the `serverasync.py` file that already has everything you need:
|
|
|
|
```bash
|
|
python serverasync.py
|
|
```
|
|
|
|
The server will start on `http://localhost:8500` by default with the following features:
|
|
- FastAPI application with async lifespan management
|
|
- Automatic model loading and pipeline initialization
|
|
- Request counting and active inference tracking
|
|
- Memory cleanup after each inference
|
|
- CORS middleware for cross-origin requests
|
|
|
|
### 3) Test the server
|
|
|
|
Use the included test script:
|
|
|
|
```bash
|
|
python test.py
|
|
```
|
|
|
|
Or send a manual request:
|
|
|
|
`POST /api/diffusers/inference` with JSON body:
|
|
|
|
```json
|
|
{
|
|
"prompt": "A futuristic cityscape, vibrant colors",
|
|
"num_inference_steps": 30,
|
|
"num_images_per_prompt": 1
|
|
}
|
|
```
|
|
|
|
Response example:
|
|
|
|
```json
|
|
{
|
|
"response": ["http://localhost:8500/images/img123.png"]
|
|
}
|
|
```
|
|
|
|
### 4) Server endpoints
|
|
|
|
- `GET /` - Welcome message
|
|
- `POST /api/diffusers/inference` - Main inference endpoint
|
|
- `GET /images/{filename}` - Serve generated images
|
|
- `GET /api/status` - Server status and memory info
|
|
|
|
## Advanced Configuration
|
|
|
|
### RequestScopedPipeline Parameters
|
|
|
|
```python
|
|
RequestScopedPipeline(
|
|
pipeline, # Base pipeline to wrap
|
|
mutable_attrs=None, # Custom list of attributes to clone
|
|
auto_detect_mutables=True, # Enable automatic detection of mutable attributes
|
|
tensor_numel_threshold=1_000_000, # Tensor size threshold for cloning
|
|
tokenizer_lock=None, # Custom threading lock for tokenizers
|
|
wrap_scheduler=True # Auto-wrap scheduler in BaseAsyncScheduler
|
|
)
|
|
```
|
|
|
|
### BaseAsyncScheduler Features
|
|
|
|
* Transparent proxy to the original scheduler with `__getattr__` and `__setattr__`
|
|
* `clone_for_request()` method for safe per-request scheduler cloning
|
|
* Enhanced debugging with `__repr__` and `__str__` methods
|
|
* Full compatibility with existing scheduler APIs
|
|
|
|
### Server Configuration
|
|
|
|
The server configuration can be modified in `serverasync.py` through the `ServerConfigModels` dataclass:
|
|
|
|
```python
|
|
@dataclass
|
|
class ServerConfigModels:
|
|
model: str = 'stabilityai/stable-diffusion-3.5-medium'
|
|
type_models: str = 't2im'
|
|
host: str = '0.0.0.0'
|
|
port: int = 8500
|
|
```
|
|
|
|
## Troubleshooting (quick)
|
|
|
|
* `Already borrowed` — previously a Rust tokenizer concurrency error.
|
|
✅ This is now fixed: `RequestScopedPipeline` automatically detects and wraps tokenizers with thread locks, so race conditions no longer happen.
|
|
|
|
* `can't set attribute 'components'` — pipeline exposes read-only `components`.
|
|
✅ The RequestScopedPipeline now detects read-only properties and skips setting them automatically.
|
|
|
|
* Scheduler issues:
|
|
* If the scheduler doesn't implement `clone_for_request` and `deepcopy` fails, we log and fallback — but prefer `async_retrieve_timesteps(..., return_scheduler=True)` to avoid mutating the shared scheduler.
|
|
✅ Note: `async_retrieve_timesteps` is fully retro-compatible — if you don't pass `return_scheduler=True`, the behavior is unchanged.
|
|
|
|
* Memory issues with large tensors:
|
|
✅ The system now has configurable `tensor_numel_threshold` to prevent cloning of large tensors while still cloning small mutable ones.
|
|
|
|
* Automatic tokenizer detection:
|
|
✅ The system automatically identifies tokenizer components by checking for tokenizer methods, class names, and attributes, then applies thread-safe wrappers. |