* Basic implementation of request scheduling * Basic editing in SD and Flux Pipelines * Small Fix * Fix * Update for more pipelines * Add examples/server-async * Add examples/server-async * Updated RequestScopedPipeline to handle a single tokenizer lock to avoid race conditions * Fix * Fix _TokenizerLockWrapper * Fix _TokenizerLockWrapper * Delete _TokenizerLockWrapper * Fix tokenizer * Update examples/server-async * Fix server-async * Optimizations in examples/server-async * We keep the implementation simple in examples/server-async * Update examples/server-async/README.md * Update examples/server-async/README.md for changes to tokenizer locks and backward-compatible retrieve_timesteps * The changes to the diffusers core have been undone and all logic is being moved to exmaples/server-async * Update examples/server-async/utils/* * Fix BaseAsyncScheduler * Rollback in the core of the diffusers * Update examples/server-async/README.md * Complete rollback of diffusers core files * Simple implementation of an asynchronous server compatible with SD3-3.5 and Flux Pipelines * Update examples/server-async/README.md * Fixed import errors in 'examples/server-async/serverasync.py' * Flux Pipeline Discard * Update examples/server-async/README.md * Apply style fixes --------- Co-authored-by: Sayak Paul <spsayakpaul@gmail.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Asynchronous server and parallel execution of models
Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3 pipelines. We recommend running 10 to 50 inferences in parallel for optimal performance, averaging between 25 and 30 seconds to 1 minute and 1 minute and 30 seconds. (This is only recommended if you have a GPU with 35GB of VRAM or more; otherwise, keep it to one or two inferences in parallel to avoid decoding or saving errors due to memory shortages.)
⚠️ IMPORTANT
- The example demonstrates how to run pipelines like
StableDiffusion3-3.5concurrently while keeping a single copy of the heavy model parameters on GPU.
Necessary components
All the components needed to create the inference server are in the current directory:
server-async/
├── utils/
├─────── __init__.py
├─────── scheduler.py # BaseAsyncScheduler wrapper and async_retrieve_timesteps for secure inferences
├─────── requestscopedpipeline.py # RequestScoped Pipeline for inference with a single in-memory model
├─────── utils.py # Image/video saving utilities and service configuration
├── Pipelines.py # pipeline loader classes (SD3)
├── serverasync.py # FastAPI app with lifespan management and async inference endpoints
├── test.py # Client test script for inference requests
├── requirements.txt # Dependencies
└── README.md # This documentation
What diffusers-async adds / Why we needed it
Core problem: a naive server that calls pipe.__call__ concurrently can hit race conditions (e.g., scheduler.set_timesteps mutates shared state) or explode memory by deep-copying the whole pipeline per-request.
diffusers-async / this example addresses that by:
- Request-scoped views:
RequestScopedPipelinecreates a shallow copy of the pipeline per request so heavy weights (UNet, VAE, text encoder) remain shared and are not duplicated. - Per-request mutable state: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. The system uses
BaseAsyncScheduler.clone_for_request(...)for scheduler cloning, with fallback to safedeepcopyor other heuristics. - Tokenizer concurrency safety:
RequestScopedPipelinenow manages an internal tokenizer lock with automatic tokenizer detection and wrapping. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors likeAlready borrowedno longer occur. async_retrieve_timesteps(..., return_scheduler=True): fully retro-compatible helper that returns(timesteps, num_inference_steps, scheduler)without mutating the shared scheduler. For users not usingreturn_scheduler=True, the behavior is identical to the original API.- Robust attribute handling: wrapper avoids writing to read-only properties (e.g.,
components) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors. Configurable tensor size threshold prevents cloning of large tensors. - Enhanced scheduler wrapping:
BaseAsyncSchedulerautomatically wraps schedulers with improved__getattr__,__setattr__, and debugging methods (__repr__,__str__).
How the server works (high-level flow)
-
Single model instance is loaded into memory (GPU/MPS) when the server starts.
-
On each HTTP inference request:
-
The server uses
RequestScopedPipeline.generate(...)which:- automatically wraps the base scheduler in
BaseAsyncScheduler(if not already wrapped), - obtains a local scheduler (via
clone_for_request()ordeepcopy), - does
local_pipe = copy.copy(base_pipe)(shallow copy), - sets
local_pipe.scheduler = local_scheduler(if possible), - clones only small mutable attributes (callbacks, rng, small latents) with auto-detection,
- wraps tokenizers with thread-safe locks to prevent race conditions,
- optionally enters a
model_cpu_offload_context()for memory offload hooks, - calls the pipeline on the local view (
local_pipe(...)).
- automatically wraps the base scheduler in
-
-
Result: inference completes, images are moved to CPU & saved (if requested), internal buffers freed (GC +
torch.cuda.empty_cache()). -
Multiple requests can run in parallel while sharing heavy weights and isolating mutable state.
How to set up and run the server
1) Install dependencies
Recommended: create a virtualenv / conda environment.
pip install diffusers
pip install -r requirements.txt
2) Start the server
Using the serverasync.py file that already has everything you need:
python serverasync.py
The server will start on http://localhost:8500 by default with the following features:
- FastAPI application with async lifespan management
- Automatic model loading and pipeline initialization
- Request counting and active inference tracking
- Memory cleanup after each inference
- CORS middleware for cross-origin requests
3) Test the server
Use the included test script:
python test.py
Or send a manual request:
POST /api/diffusers/inference with JSON body:
{
"prompt": "A futuristic cityscape, vibrant colors",
"num_inference_steps": 30,
"num_images_per_prompt": 1
}
Response example:
{
"response": ["http://localhost:8500/images/img123.png"]
}
4) Server endpoints
GET /- Welcome messagePOST /api/diffusers/inference- Main inference endpointGET /images/{filename}- Serve generated imagesGET /api/status- Server status and memory info
Advanced Configuration
RequestScopedPipeline Parameters
RequestScopedPipeline(
pipeline, # Base pipeline to wrap
mutable_attrs=None, # Custom list of attributes to clone
auto_detect_mutables=True, # Enable automatic detection of mutable attributes
tensor_numel_threshold=1_000_000, # Tensor size threshold for cloning
tokenizer_lock=None, # Custom threading lock for tokenizers
wrap_scheduler=True # Auto-wrap scheduler in BaseAsyncScheduler
)
BaseAsyncScheduler Features
- Transparent proxy to the original scheduler with
__getattr__and__setattr__ clone_for_request()method for safe per-request scheduler cloning- Enhanced debugging with
__repr__and__str__methods - Full compatibility with existing scheduler APIs
Server Configuration
The server configuration can be modified in serverasync.py through the ServerConfigModels dataclass:
@dataclass
class ServerConfigModels:
model: str = 'stabilityai/stable-diffusion-3.5-medium'
type_models: str = 't2im'
host: str = '0.0.0.0'
port: int = 8500
Troubleshooting (quick)
-
Already borrowed— previously a Rust tokenizer concurrency error. ✅ This is now fixed:RequestScopedPipelineautomatically detects and wraps tokenizers with thread locks, so race conditions no longer happen. -
can't set attribute 'components'— pipeline exposes read-onlycomponents. ✅ The RequestScopedPipeline now detects read-only properties and skips setting them automatically. -
Scheduler issues:
- If the scheduler doesn't implement
clone_for_requestanddeepcopyfails, we log and fallback — but preferasync_retrieve_timesteps(..., return_scheduler=True)to avoid mutating the shared scheduler. ✅ Note:async_retrieve_timestepsis fully retro-compatible — if you don't passreturn_scheduler=True, the behavior is unchanged.
- If the scheduler doesn't implement
-
Memory issues with large tensors: ✅ The system now has configurable
tensor_numel_thresholdto prevent cloning of large tensors while still cloning small mutable ones. -
Automatic tokenizer detection: ✅ The system automatically identifies tokenizer components by checking for tokenizer methods, class names, and attributes, then applies thread-safe wrappers.