mirror of
https://github.com/huggingface/diffusers.git
synced 2026-04-05 15:21:48 +08:00
Compare commits
1 Commits
fix-torcha
...
modular-pi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c3b90986a |
@@ -35,6 +35,10 @@ Strive to write code as simple and explicit as possible.
|
||||
- Use `self.progress_bar(timesteps)` for progress tracking
|
||||
- Don't subclass an existing pipeline for a variant — DO NOT use an existing pipeline class (e.g., `FluxPipeline`) to override another pipeline (e.g., `FluxImg2ImgPipeline`) which will be a part of the core codebase (`src`)
|
||||
|
||||
### Modular Pipelines
|
||||
|
||||
- See [modular.md](modular.md) for modular pipeline conventions, patterns, and gotchas.
|
||||
|
||||
## Skills
|
||||
|
||||
Task-specific guides live in `.ai/skills/` and are loaded on demand by AI agents. Available skills include:
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
# Modular Pipeline Conversion Reference
|
||||
# Modular pipeline conventions and rules
|
||||
|
||||
## When to use
|
||||
|
||||
Modular pipelines break a monolithic `__call__` into composable blocks. Convert when:
|
||||
- The model supports multiple workflows (T2V, I2V, V2V, etc.)
|
||||
- Users need to swap guidance strategies (CFG, CFG-Zero*, PAG)
|
||||
- You want to share blocks across pipeline variants
|
||||
Shared reference for modular pipeline conventions, patterns, and gotchas.
|
||||
|
||||
## File structure
|
||||
|
||||
@@ -14,7 +9,7 @@ src/diffusers/modular_pipelines/<model>/
|
||||
__init__.py # Lazy imports
|
||||
modular_pipeline.py # Pipeline class (tiny, mostly config)
|
||||
encoders.py # Text encoder + image/video VAE encoder blocks
|
||||
before_denoise.py # Pre-denoise setup blocks
|
||||
before_denoise.py # Pre-denoise setup blocks (timesteps, latent prep, noise)
|
||||
denoise.py # The denoising loop blocks
|
||||
decoders.py # VAE decode block
|
||||
modular_blocks_<model>.py # Block assembly (AutoBlocks)
|
||||
@@ -81,15 +76,21 @@ for i, t in enumerate(timesteps):
|
||||
latents = components.scheduler.step(noise_pred, t, latents, generator=generator)[0]
|
||||
```
|
||||
|
||||
## Key pattern: Chunk loops for video models
|
||||
## Key pattern: Denoising loop
|
||||
|
||||
Use `LoopSequentialPipelineBlocks` for outer loop:
|
||||
All models use `LoopSequentialPipelineBlocks` for the denoising loop (iterating over timesteps):
|
||||
```python
|
||||
class ChunkDenoiseStep(LoopSequentialPipelineBlocks):
|
||||
block_classes = [PrepareChunkStep, NoiseGenStep, DenoiseInnerStep, UpdateStep]
|
||||
class MyModelDenoiseLoopWrapper(LoopSequentialPipelineBlocks):
|
||||
block_classes = [LoopBeforeDenoiser, LoopDenoiser, LoopAfterDenoiser]
|
||||
```
|
||||
|
||||
Note: blocks inside `LoopSequentialPipelineBlocks` receive `(components, block_state, k)` where `k` is the loop iteration index.
|
||||
Autoregressive video models (e.g. Helios) also use it for an outer chunk loop:
|
||||
```python
|
||||
class HeliosChunkDenoiseStep(LoopSequentialPipelineBlocks):
|
||||
block_classes = [ChunkHistorySlice, ChunkNoiseGen, ChunkDenoiseInner, ChunkUpdate]
|
||||
```
|
||||
|
||||
Note: sub-blocks inside `LoopSequentialPipelineBlocks` receive `(components, block_state, i, t)` for denoise loops or `(components, block_state, k)` for chunk loops.
|
||||
|
||||
## Key pattern: Workflow selection
|
||||
|
||||
@@ -136,6 +137,26 @@ ComponentSpec(
|
||||
)
|
||||
```
|
||||
|
||||
## Gotchas
|
||||
|
||||
1. **Importing from standard pipelines.** The modular and standard pipeline systems are parallel — modular blocks must not import from `diffusers.pipelines.*`. For shared utility methods (e.g. `_pack_latents`, `retrieve_timesteps`), either redefine as standalone functions or use `# Copied from diffusers.pipelines.<model>...` headers. See `wan/before_denoise.py` and `helios/before_denoise.py` for examples.
|
||||
|
||||
2. **Cross-importing between modular pipelines.** Don't import utilities from another model's modular pipeline (e.g. SD3 importing from `qwenimage.inputs`). If a utility is shared, move it to `modular_pipeline_utils.py` or copy it with a `# Copied from` header.
|
||||
|
||||
3. **Accepting `guidance_scale` as a pipeline input.** Users configure the guider separately (see [guider docs](https://huggingface.co/docs/diffusers/main/en/api/guiders)). Different guider types have different parameters; forwarding them through the pipeline doesn't scale. Don't manually set `components.guider.guidance_scale = ...` inside blocks. Same applies to computing `do_classifier_free_guidance` — that logic belongs in the guider.
|
||||
|
||||
4. **Accepting pre-computed outputs as inputs to skip encoding.** In standard pipelines we accept `prompt_embeds`, `negative_prompt_embeds`, `image_latents`, etc. so users can skip encoding steps. In modular pipelines this is unnecessary — users just pop out the encoder block and run it separately. Encoder blocks should only accept raw inputs (`prompt`, `image`, etc.).
|
||||
|
||||
5. **VAE encoding inside prepare-latents.** Image encoding should be its own block in `encoders.py` (e.g. `MyModelVaeEncoderStep`). The prepare-latents block should accept `image_latents`, not raw images. This lets users run encoding standalone. See `WanVaeEncoderStep` for reference.
|
||||
|
||||
6. **Instantiating components inline.** If a class like `VideoProcessor` is needed, register it as a `ComponentSpec` and access via `components.video_processor`. Don't create new instances inside block `__call__`.
|
||||
|
||||
7. **Deeply nested block structure.** Prefer flat sequences over nesting Auto blocks inside Sequential blocks inside Auto blocks. Put the `Auto` selection at the top level and make each workflow variant a flat `InsertableDict` of leaf blocks. See `flux2/modular_blocks_flux2_klein.py` for the pattern.
|
||||
|
||||
8. **Using `InputParam.template()` / `OutputParam.template()` when semantics don't match.** Templates carry predefined descriptions — e.g. the `"latents"` output template means "Denoised latents". Don't use it for initial noisy latents from a prepare-latents step. Use a plain `InputParam(...)` / `OutputParam(...)` with an accurate description instead.
|
||||
|
||||
9. **Test model paths pointing to contributor repos.** Tiny test models must live under `hf-internal-testing/`, not personal repos like `username/tiny-model`. Move the model before merge.
|
||||
|
||||
## Conversion checklist
|
||||
|
||||
- [ ] Read original pipeline's `__call__` end-to-end, map stages
|
||||
@@ -5,6 +5,7 @@ Review-specific rules for Claude. Focus on correctness — style is handled by r
|
||||
Before reviewing, read and apply the guidelines in:
|
||||
- [AGENTS.md](AGENTS.md) — coding style, copied code
|
||||
- [models.md](models.md) — model conventions, attention pattern, implementation rules, dependencies, gotchas
|
||||
- [modular.md](modular.md) — modular pipeline conventions, patterns, common mistakes
|
||||
- [skills/parity-testing/SKILL.md](skills/parity-testing/SKILL.md) — testing rules, comparison utilities
|
||||
- [skills/parity-testing/pitfalls.md](skills/parity-testing/pitfalls.md) — known pitfalls (dtype mismatches, config assumptions, etc.)
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ See [../../models.md](../../models.md) for the attention pattern, implementation
|
||||
|
||||
## Modular Pipeline Conversion
|
||||
|
||||
See [modular-conversion.md](modular-conversion.md) for the full guide on converting standard pipelines to modular format, including block types, build order, guider abstraction, and conversion checklist.
|
||||
See [modular.md](../../modular.md) for the full guide on modular pipeline conventions, block types, build order, guider abstraction, gotchas, and conversion checklist.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ from typing import Set
|
||||
import safetensors.torch
|
||||
import torch
|
||||
|
||||
from ..utils import get_logger, is_accelerate_available, is_torchao_available
|
||||
from ..utils import get_logger, is_accelerate_available
|
||||
from ._common import _GO_LC_SUPPORTED_PYTORCH_LAYERS
|
||||
from .hooks import HookRegistry, ModelHook
|
||||
|
||||
@@ -35,54 +35,6 @@ if is_accelerate_available():
|
||||
logger = get_logger(__name__) # pylint: disable=invalid-name
|
||||
|
||||
|
||||
def _is_torchao_tensor(tensor: torch.Tensor) -> bool:
|
||||
if not is_torchao_available():
|
||||
return False
|
||||
from torchao.utils import TorchAOBaseTensor
|
||||
|
||||
return isinstance(tensor, TorchAOBaseTensor)
|
||||
|
||||
|
||||
def _get_torchao_inner_tensor_names(tensor: torch.Tensor) -> list[str]:
|
||||
"""Get names of all internal tensor data attributes from a TorchAO tensor."""
|
||||
cls = type(tensor)
|
||||
names = list(getattr(cls, "tensor_data_names", []))
|
||||
for attr_name in getattr(cls, "optional_tensor_data_names", []):
|
||||
if getattr(tensor, attr_name, None) is not None:
|
||||
names.append(attr_name)
|
||||
return names
|
||||
|
||||
|
||||
def _swap_torchao_tensor(param: torch.Tensor, source: torch.Tensor) -> None:
|
||||
"""Move a TorchAO parameter to the device of `source` via `swap_tensors`.
|
||||
|
||||
`param.data = source` does not work for `_make_wrapper_subclass` tensors because the `.data` setter only replaces
|
||||
the outer wrapper storage while leaving the subclass's internal attributes (e.g. `.qdata`, `.scale`) on the
|
||||
original device. `swap_tensors` swaps the full tensor contents in-place, preserving the parameter's identity so
|
||||
that any dict keyed by `id(param)` remains valid.
|
||||
|
||||
Refer to https://github.com/huggingface/diffusers/pull/13276#discussion_r2944471548 for the full discussion.
|
||||
"""
|
||||
torch.utils.swap_tensors(param, source)
|
||||
|
||||
|
||||
def _restore_torchao_tensor(param: torch.Tensor, source: torch.Tensor) -> None:
|
||||
"""Restore internal tensor data of a TorchAO parameter from `source` without mutating `source`.
|
||||
|
||||
Unlike `_swap_torchao_tensor` this copies attribute references one-by-one via `setattr` so that `source` is **not**
|
||||
modified. Use this when `source` is a cached tensor that must remain unchanged (e.g. a pinned CPU copy in
|
||||
`cpu_param_dict`).
|
||||
"""
|
||||
for attr_name in _get_torchao_inner_tensor_names(source):
|
||||
setattr(param, attr_name, getattr(source, attr_name))
|
||||
|
||||
|
||||
def _record_stream_torchao_tensor(param: torch.Tensor, stream) -> None:
|
||||
"""Record stream for all internal tensors of a TorchAO parameter."""
|
||||
for attr_name in _get_torchao_inner_tensor_names(param):
|
||||
getattr(param, attr_name).record_stream(stream)
|
||||
|
||||
|
||||
# fmt: off
|
||||
_GROUP_OFFLOADING = "group_offloading"
|
||||
_LAYER_EXECUTION_TRACKER = "layer_execution_tracker"
|
||||
@@ -172,13 +124,6 @@ class ModuleGroup:
|
||||
else torch.cuda
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _to_cpu(tensor, low_cpu_mem_usage):
|
||||
# For TorchAO tensors, `.data` returns an incomplete wrapper without internal attributes
|
||||
# (e.g. `.qdata`, `.scale`), so we must call `.cpu()` on the tensor directly.
|
||||
t = tensor.cpu() if _is_torchao_tensor(tensor) else tensor.data.cpu()
|
||||
return t if low_cpu_mem_usage else t.pin_memory()
|
||||
|
||||
def _init_cpu_param_dict(self):
|
||||
cpu_param_dict = {}
|
||||
if self.stream is None:
|
||||
@@ -186,15 +131,17 @@ class ModuleGroup:
|
||||
|
||||
for module in self.modules:
|
||||
for param in module.parameters():
|
||||
cpu_param_dict[param] = self._to_cpu(param, self.low_cpu_mem_usage)
|
||||
cpu_param_dict[param] = param.data.cpu() if self.low_cpu_mem_usage else param.data.cpu().pin_memory()
|
||||
for buffer in module.buffers():
|
||||
cpu_param_dict[buffer] = self._to_cpu(buffer, self.low_cpu_mem_usage)
|
||||
cpu_param_dict[buffer] = (
|
||||
buffer.data.cpu() if self.low_cpu_mem_usage else buffer.data.cpu().pin_memory()
|
||||
)
|
||||
|
||||
for param in self.parameters:
|
||||
cpu_param_dict[param] = self._to_cpu(param, self.low_cpu_mem_usage)
|
||||
cpu_param_dict[param] = param.data.cpu() if self.low_cpu_mem_usage else param.data.cpu().pin_memory()
|
||||
|
||||
for buffer in self.buffers:
|
||||
cpu_param_dict[buffer] = self._to_cpu(buffer, self.low_cpu_mem_usage)
|
||||
cpu_param_dict[buffer] = buffer.data.cpu() if self.low_cpu_mem_usage else buffer.data.cpu().pin_memory()
|
||||
|
||||
return cpu_param_dict
|
||||
|
||||
@@ -210,16 +157,9 @@ class ModuleGroup:
|
||||
pinned_dict = None
|
||||
|
||||
def _transfer_tensor_to_device(self, tensor, source_tensor, default_stream):
|
||||
moved = source_tensor.to(self.onload_device, non_blocking=self.non_blocking)
|
||||
if _is_torchao_tensor(tensor):
|
||||
_swap_torchao_tensor(tensor, moved)
|
||||
else:
|
||||
tensor.data = moved
|
||||
tensor.data = source_tensor.to(self.onload_device, non_blocking=self.non_blocking)
|
||||
if self.record_stream:
|
||||
if _is_torchao_tensor(tensor):
|
||||
_record_stream_torchao_tensor(tensor, default_stream)
|
||||
else:
|
||||
tensor.data.record_stream(default_stream)
|
||||
tensor.data.record_stream(default_stream)
|
||||
|
||||
def _process_tensors_from_modules(self, pinned_memory=None, default_stream=None):
|
||||
for group_module in self.modules:
|
||||
@@ -238,19 +178,7 @@ class ModuleGroup:
|
||||
source = pinned_memory[buffer] if pinned_memory else buffer.data
|
||||
self._transfer_tensor_to_device(buffer, source, default_stream)
|
||||
|
||||
def _check_disk_offload_torchao(self):
|
||||
all_tensors = list(self.tensor_to_key.keys())
|
||||
has_torchao = any(_is_torchao_tensor(t) for t in all_tensors)
|
||||
if has_torchao:
|
||||
raise ValueError(
|
||||
"Disk offloading is not supported for TorchAO quantized tensors because safetensors "
|
||||
"cannot serialize TorchAO subclass tensors. Use memory offloading instead by not "
|
||||
"setting `offload_to_disk_path`."
|
||||
)
|
||||
|
||||
def _onload_from_disk(self):
|
||||
self._check_disk_offload_torchao()
|
||||
|
||||
if self.stream is not None:
|
||||
# Wait for previous Host->Device transfer to complete
|
||||
self.stream.synchronize()
|
||||
@@ -293,8 +221,6 @@ class ModuleGroup:
|
||||
self._process_tensors_from_modules(None)
|
||||
|
||||
def _offload_to_disk(self):
|
||||
self._check_disk_offload_torchao()
|
||||
|
||||
# TODO: we can potentially optimize this code path by checking if the _all_ the desired
|
||||
# safetensor files exist on the disk and if so, skip this step entirely, reducing IO
|
||||
# overhead. Currently, we just check if the given `safetensors_file_path` exists and if not
|
||||
@@ -319,35 +245,18 @@ class ModuleGroup:
|
||||
|
||||
for group_module in self.modules:
|
||||
for param in group_module.parameters():
|
||||
if _is_torchao_tensor(param):
|
||||
_restore_torchao_tensor(param, self.cpu_param_dict[param])
|
||||
else:
|
||||
param.data = self.cpu_param_dict[param]
|
||||
for param in self.parameters:
|
||||
if _is_torchao_tensor(param):
|
||||
_restore_torchao_tensor(param, self.cpu_param_dict[param])
|
||||
else:
|
||||
param.data = self.cpu_param_dict[param]
|
||||
for param in self.parameters:
|
||||
param.data = self.cpu_param_dict[param]
|
||||
for buffer in self.buffers:
|
||||
if _is_torchao_tensor(buffer):
|
||||
_restore_torchao_tensor(buffer, self.cpu_param_dict[buffer])
|
||||
else:
|
||||
buffer.data = self.cpu_param_dict[buffer]
|
||||
buffer.data = self.cpu_param_dict[buffer]
|
||||
else:
|
||||
for group_module in self.modules:
|
||||
group_module.to(self.offload_device, non_blocking=False)
|
||||
for param in self.parameters:
|
||||
if _is_torchao_tensor(param):
|
||||
moved = param.to(self.offload_device, non_blocking=False)
|
||||
_swap_torchao_tensor(param, moved)
|
||||
else:
|
||||
param.data = param.data.to(self.offload_device, non_blocking=False)
|
||||
param.data = param.data.to(self.offload_device, non_blocking=False)
|
||||
for buffer in self.buffers:
|
||||
if _is_torchao_tensor(buffer):
|
||||
moved = buffer.to(self.offload_device, non_blocking=False)
|
||||
_swap_torchao_tensor(buffer, moved)
|
||||
else:
|
||||
buffer.data = buffer.data.to(self.offload_device, non_blocking=False)
|
||||
buffer.data = buffer.data.to(self.offload_device, non_blocking=False)
|
||||
|
||||
@torch.compiler.disable()
|
||||
def onload_(self):
|
||||
|
||||
Reference in New Issue
Block a user