Compare commits

..

1 Commits

Author SHA1 Message Date
yiyi@huggingface.co
1c3b90986a [docs] add modular pipeline conventions and gotchas
Create .ai/modular.md as a shared reference for modular pipeline
conventions, patterns, and common mistakes — parallel to the existing
models.md for model conventions.

Consolidates content from the former modular-conversion.md skill file
and adds gotchas identified from reviewing recent modular pipeline PRs
(LTX #13378, SD3 #13324).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 08:26:54 +00:00
5 changed files with 54 additions and 119 deletions

View File

@@ -35,6 +35,10 @@ Strive to write code as simple and explicit as possible.
- Use `self.progress_bar(timesteps)` for progress tracking
- Don't subclass an existing pipeline for a variant — DO NOT use an existing pipeline class (e.g., `FluxPipeline`) to override another pipeline (e.g., `FluxImg2ImgPipeline`) which will be a part of the core codebase (`src`)
### Modular Pipelines
- See [modular.md](modular.md) for modular pipeline conventions, patterns, and gotchas.
## Skills
Task-specific guides live in `.ai/skills/` and are loaded on demand by AI agents. Available skills include:

View File

@@ -1,11 +1,6 @@
# Modular Pipeline Conversion Reference
# Modular pipeline conventions and rules
## When to use
Modular pipelines break a monolithic `__call__` into composable blocks. Convert when:
- The model supports multiple workflows (T2V, I2V, V2V, etc.)
- Users need to swap guidance strategies (CFG, CFG-Zero*, PAG)
- You want to share blocks across pipeline variants
Shared reference for modular pipeline conventions, patterns, and gotchas.
## File structure
@@ -14,7 +9,7 @@ src/diffusers/modular_pipelines/<model>/
__init__.py # Lazy imports
modular_pipeline.py # Pipeline class (tiny, mostly config)
encoders.py # Text encoder + image/video VAE encoder blocks
before_denoise.py # Pre-denoise setup blocks
before_denoise.py # Pre-denoise setup blocks (timesteps, latent prep, noise)
denoise.py # The denoising loop blocks
decoders.py # VAE decode block
modular_blocks_<model>.py # Block assembly (AutoBlocks)
@@ -81,15 +76,21 @@ for i, t in enumerate(timesteps):
latents = components.scheduler.step(noise_pred, t, latents, generator=generator)[0]
```
## Key pattern: Chunk loops for video models
## Key pattern: Denoising loop
Use `LoopSequentialPipelineBlocks` for outer loop:
All models use `LoopSequentialPipelineBlocks` for the denoising loop (iterating over timesteps):
```python
class ChunkDenoiseStep(LoopSequentialPipelineBlocks):
block_classes = [PrepareChunkStep, NoiseGenStep, DenoiseInnerStep, UpdateStep]
class MyModelDenoiseLoopWrapper(LoopSequentialPipelineBlocks):
block_classes = [LoopBeforeDenoiser, LoopDenoiser, LoopAfterDenoiser]
```
Note: blocks inside `LoopSequentialPipelineBlocks` receive `(components, block_state, k)` where `k` is the loop iteration index.
Autoregressive video models (e.g. Helios) also use it for an outer chunk loop:
```python
class HeliosChunkDenoiseStep(LoopSequentialPipelineBlocks):
block_classes = [ChunkHistorySlice, ChunkNoiseGen, ChunkDenoiseInner, ChunkUpdate]
```
Note: sub-blocks inside `LoopSequentialPipelineBlocks` receive `(components, block_state, i, t)` for denoise loops or `(components, block_state, k)` for chunk loops.
## Key pattern: Workflow selection
@@ -136,6 +137,26 @@ ComponentSpec(
)
```
## Gotchas
1. **Importing from standard pipelines.** The modular and standard pipeline systems are parallel — modular blocks must not import from `diffusers.pipelines.*`. For shared utility methods (e.g. `_pack_latents`, `retrieve_timesteps`), either redefine as standalone functions or use `# Copied from diffusers.pipelines.<model>...` headers. See `wan/before_denoise.py` and `helios/before_denoise.py` for examples.
2. **Cross-importing between modular pipelines.** Don't import utilities from another model's modular pipeline (e.g. SD3 importing from `qwenimage.inputs`). If a utility is shared, move it to `modular_pipeline_utils.py` or copy it with a `# Copied from` header.
3. **Accepting `guidance_scale` as a pipeline input.** Users configure the guider separately (see [guider docs](https://huggingface.co/docs/diffusers/main/en/api/guiders)). Different guider types have different parameters; forwarding them through the pipeline doesn't scale. Don't manually set `components.guider.guidance_scale = ...` inside blocks. Same applies to computing `do_classifier_free_guidance` — that logic belongs in the guider.
4. **Accepting pre-computed outputs as inputs to skip encoding.** In standard pipelines we accept `prompt_embeds`, `negative_prompt_embeds`, `image_latents`, etc. so users can skip encoding steps. In modular pipelines this is unnecessary — users just pop out the encoder block and run it separately. Encoder blocks should only accept raw inputs (`prompt`, `image`, etc.).
5. **VAE encoding inside prepare-latents.** Image encoding should be its own block in `encoders.py` (e.g. `MyModelVaeEncoderStep`). The prepare-latents block should accept `image_latents`, not raw images. This lets users run encoding standalone. See `WanVaeEncoderStep` for reference.
6. **Instantiating components inline.** If a class like `VideoProcessor` is needed, register it as a `ComponentSpec` and access via `components.video_processor`. Don't create new instances inside block `__call__`.
7. **Deeply nested block structure.** Prefer flat sequences over nesting Auto blocks inside Sequential blocks inside Auto blocks. Put the `Auto` selection at the top level and make each workflow variant a flat `InsertableDict` of leaf blocks. See `flux2/modular_blocks_flux2_klein.py` for the pattern.
8. **Using `InputParam.template()` / `OutputParam.template()` when semantics don't match.** Templates carry predefined descriptions — e.g. the `"latents"` output template means "Denoised latents". Don't use it for initial noisy latents from a prepare-latents step. Use a plain `InputParam(...)` / `OutputParam(...)` with an accurate description instead.
9. **Test model paths pointing to contributor repos.** Tiny test models must live under `hf-internal-testing/`, not personal repos like `username/tiny-model`. Move the model before merge.
## Conversion checklist
- [ ] Read original pipeline's `__call__` end-to-end, map stages

View File

@@ -5,6 +5,7 @@ Review-specific rules for Claude. Focus on correctness — style is handled by r
Before reviewing, read and apply the guidelines in:
- [AGENTS.md](AGENTS.md) — coding style, copied code
- [models.md](models.md) — model conventions, attention pattern, implementation rules, dependencies, gotchas
- [modular.md](modular.md) — modular pipeline conventions, patterns, common mistakes
- [skills/parity-testing/SKILL.md](skills/parity-testing/SKILL.md) — testing rules, comparison utilities
- [skills/parity-testing/pitfalls.md](skills/parity-testing/pitfalls.md) — known pitfalls (dtype mismatches, config assumptions, etc.)

View File

@@ -82,7 +82,7 @@ See [../../models.md](../../models.md) for the attention pattern, implementation
## Modular Pipeline Conversion
See [modular-conversion.md](modular-conversion.md) for the full guide on converting standard pipelines to modular format, including block types, build order, guider abstraction, and conversion checklist.
See [modular.md](../../modular.md) for the full guide on modular pipeline conventions, block types, build order, guider abstraction, gotchas, and conversion checklist.
---

View File

@@ -22,7 +22,7 @@ from typing import Set
import safetensors.torch
import torch
from ..utils import get_logger, is_accelerate_available, is_torchao_available
from ..utils import get_logger, is_accelerate_available
from ._common import _GO_LC_SUPPORTED_PYTORCH_LAYERS
from .hooks import HookRegistry, ModelHook
@@ -35,54 +35,6 @@ if is_accelerate_available():
logger = get_logger(__name__) # pylint: disable=invalid-name
def _is_torchao_tensor(tensor: torch.Tensor) -> bool:
if not is_torchao_available():
return False
from torchao.utils import TorchAOBaseTensor
return isinstance(tensor, TorchAOBaseTensor)
def _get_torchao_inner_tensor_names(tensor: torch.Tensor) -> list[str]:
"""Get names of all internal tensor data attributes from a TorchAO tensor."""
cls = type(tensor)
names = list(getattr(cls, "tensor_data_names", []))
for attr_name in getattr(cls, "optional_tensor_data_names", []):
if getattr(tensor, attr_name, None) is not None:
names.append(attr_name)
return names
def _swap_torchao_tensor(param: torch.Tensor, source: torch.Tensor) -> None:
"""Move a TorchAO parameter to the device of `source` via `swap_tensors`.
`param.data = source` does not work for `_make_wrapper_subclass` tensors because the `.data` setter only replaces
the outer wrapper storage while leaving the subclass's internal attributes (e.g. `.qdata`, `.scale`) on the
original device. `swap_tensors` swaps the full tensor contents in-place, preserving the parameter's identity so
that any dict keyed by `id(param)` remains valid.
Refer to https://github.com/huggingface/diffusers/pull/13276#discussion_r2944471548 for the full discussion.
"""
torch.utils.swap_tensors(param, source)
def _restore_torchao_tensor(param: torch.Tensor, source: torch.Tensor) -> None:
"""Restore internal tensor data of a TorchAO parameter from `source` without mutating `source`.
Unlike `_swap_torchao_tensor` this copies attribute references one-by-one via `setattr` so that `source` is **not**
modified. Use this when `source` is a cached tensor that must remain unchanged (e.g. a pinned CPU copy in
`cpu_param_dict`).
"""
for attr_name in _get_torchao_inner_tensor_names(source):
setattr(param, attr_name, getattr(source, attr_name))
def _record_stream_torchao_tensor(param: torch.Tensor, stream) -> None:
"""Record stream for all internal tensors of a TorchAO parameter."""
for attr_name in _get_torchao_inner_tensor_names(param):
getattr(param, attr_name).record_stream(stream)
# fmt: off
_GROUP_OFFLOADING = "group_offloading"
_LAYER_EXECUTION_TRACKER = "layer_execution_tracker"
@@ -172,13 +124,6 @@ class ModuleGroup:
else torch.cuda
)
@staticmethod
def _to_cpu(tensor, low_cpu_mem_usage):
# For TorchAO tensors, `.data` returns an incomplete wrapper without internal attributes
# (e.g. `.qdata`, `.scale`), so we must call `.cpu()` on the tensor directly.
t = tensor.cpu() if _is_torchao_tensor(tensor) else tensor.data.cpu()
return t if low_cpu_mem_usage else t.pin_memory()
def _init_cpu_param_dict(self):
cpu_param_dict = {}
if self.stream is None:
@@ -186,15 +131,17 @@ class ModuleGroup:
for module in self.modules:
for param in module.parameters():
cpu_param_dict[param] = self._to_cpu(param, self.low_cpu_mem_usage)
cpu_param_dict[param] = param.data.cpu() if self.low_cpu_mem_usage else param.data.cpu().pin_memory()
for buffer in module.buffers():
cpu_param_dict[buffer] = self._to_cpu(buffer, self.low_cpu_mem_usage)
cpu_param_dict[buffer] = (
buffer.data.cpu() if self.low_cpu_mem_usage else buffer.data.cpu().pin_memory()
)
for param in self.parameters:
cpu_param_dict[param] = self._to_cpu(param, self.low_cpu_mem_usage)
cpu_param_dict[param] = param.data.cpu() if self.low_cpu_mem_usage else param.data.cpu().pin_memory()
for buffer in self.buffers:
cpu_param_dict[buffer] = self._to_cpu(buffer, self.low_cpu_mem_usage)
cpu_param_dict[buffer] = buffer.data.cpu() if self.low_cpu_mem_usage else buffer.data.cpu().pin_memory()
return cpu_param_dict
@@ -210,16 +157,9 @@ class ModuleGroup:
pinned_dict = None
def _transfer_tensor_to_device(self, tensor, source_tensor, default_stream):
moved = source_tensor.to(self.onload_device, non_blocking=self.non_blocking)
if _is_torchao_tensor(tensor):
_swap_torchao_tensor(tensor, moved)
else:
tensor.data = moved
tensor.data = source_tensor.to(self.onload_device, non_blocking=self.non_blocking)
if self.record_stream:
if _is_torchao_tensor(tensor):
_record_stream_torchao_tensor(tensor, default_stream)
else:
tensor.data.record_stream(default_stream)
tensor.data.record_stream(default_stream)
def _process_tensors_from_modules(self, pinned_memory=None, default_stream=None):
for group_module in self.modules:
@@ -238,19 +178,7 @@ class ModuleGroup:
source = pinned_memory[buffer] if pinned_memory else buffer.data
self._transfer_tensor_to_device(buffer, source, default_stream)
def _check_disk_offload_torchao(self):
all_tensors = list(self.tensor_to_key.keys())
has_torchao = any(_is_torchao_tensor(t) for t in all_tensors)
if has_torchao:
raise ValueError(
"Disk offloading is not supported for TorchAO quantized tensors because safetensors "
"cannot serialize TorchAO subclass tensors. Use memory offloading instead by not "
"setting `offload_to_disk_path`."
)
def _onload_from_disk(self):
self._check_disk_offload_torchao()
if self.stream is not None:
# Wait for previous Host->Device transfer to complete
self.stream.synchronize()
@@ -293,8 +221,6 @@ class ModuleGroup:
self._process_tensors_from_modules(None)
def _offload_to_disk(self):
self._check_disk_offload_torchao()
# TODO: we can potentially optimize this code path by checking if the _all_ the desired
# safetensor files exist on the disk and if so, skip this step entirely, reducing IO
# overhead. Currently, we just check if the given `safetensors_file_path` exists and if not
@@ -319,35 +245,18 @@ class ModuleGroup:
for group_module in self.modules:
for param in group_module.parameters():
if _is_torchao_tensor(param):
_restore_torchao_tensor(param, self.cpu_param_dict[param])
else:
param.data = self.cpu_param_dict[param]
for param in self.parameters:
if _is_torchao_tensor(param):
_restore_torchao_tensor(param, self.cpu_param_dict[param])
else:
param.data = self.cpu_param_dict[param]
for param in self.parameters:
param.data = self.cpu_param_dict[param]
for buffer in self.buffers:
if _is_torchao_tensor(buffer):
_restore_torchao_tensor(buffer, self.cpu_param_dict[buffer])
else:
buffer.data = self.cpu_param_dict[buffer]
buffer.data = self.cpu_param_dict[buffer]
else:
for group_module in self.modules:
group_module.to(self.offload_device, non_blocking=False)
for param in self.parameters:
if _is_torchao_tensor(param):
moved = param.to(self.offload_device, non_blocking=False)
_swap_torchao_tensor(param, moved)
else:
param.data = param.data.to(self.offload_device, non_blocking=False)
param.data = param.data.to(self.offload_device, non_blocking=False)
for buffer in self.buffers:
if _is_torchao_tensor(buffer):
moved = buffer.to(self.offload_device, non_blocking=False)
_swap_torchao_tensor(buffer, moved)
else:
buffer.data = buffer.data.to(self.offload_device, non_blocking=False)
buffer.data = buffer.data.to(self.offload_device, non_blocking=False)
@torch.compiler.disable()
def onload_(self):