Compare commits

...

42 Commits

Author SHA1 Message Date
yiyi@huggingface.co
fab1013e4d add test for qwen-image 2026-02-12 05:36:04 +00:00
yiyi@huggingface.co
b0b8fcfef7 add a test suit 2026-02-12 00:58:34 +00:00
yiyi@huggingface.co
1f8dc96f17 add workflow support for sdxl 2026-02-12 00:58:23 +00:00
yiyi@huggingface.co
ba41614e75 update qwen image docstring note 2026-02-12 00:57:45 +00:00
yiyi@huggingface.co
3c3b56c86a treeat loop sequential pipeline blocks as leaf 2026-02-12 00:57:15 +00:00
YiYi Xu
32677c795b Merge branch 'main' into modular-workflow 2026-02-11 12:00:08 -10:00
Jared Wen
d32483913a [Fix]Allow prompt and prior_token_ids to be provided simultaneously in GlmImagePipeline (#13092)
* allow loose input

Signed-off-by: JaredforReal <w13431838023@gmail.com>

* add tests

Signed-off-by: JaredforReal <w13431838023@gmail.com>

* format test_glm_image

Signed-off-by: JaredforReal <w13431838023@gmail.com>

---------

Signed-off-by: JaredforReal <w13431838023@gmail.com>
2026-02-11 08:29:36 -10:00
David El Malih
64e2adf8f5 docs: improve docstring scheduling_edm_dpmsolver_multistep.py (#13122)
Improve docstring scheduling edm dpmsolver multistep
2026-02-11 08:59:33 -08:00
Dhruv Nair
c3a4cd14b8 [CI] Refactor Wan Model Tests (#13082)
* update

* update

* update

* update

* update

* update

* update

* update
2026-02-11 14:42:58 +05:30
Sayak Paul
4d00980e25 [lora] fix non-diffusers lora key handling for flux2 (#13119)
fix non-diffusers lora key handling for flux2
2026-02-11 08:06:36 +05:30
Álvaro Somoza
5bf248ddd8 [SkyReelsV2] Fix ftfy import (#13113)
fix
2026-02-10 12:56:13 +05:30
Dhruv Nair
bedc67c75f [Docs] Add guide for AutoModel with custom code (#13099)
update
2026-02-10 12:19:44 +05:30
YiYi Xu
b73cc50e48 Merge branch 'main' into modular-workflow 2026-01-31 09:51:11 -10:00
yiyixuxu
20c35da75c up up 2026-01-25 12:11:37 +01:00
yiyixuxu
6a549f5f55 initial support: workflow 2026-01-25 11:40:52 +01:00
Sayak Paul
412e51c856 include auto-docstring check in the modular ci. (#13004) 2026-01-23 22:34:24 -10:00
github-actions[bot]
23d06423ab Apply style fixes 2026-01-19 09:23:31 +00:00
YiYi Xu
aba551c868 Merge branch 'main' into modular-doc-improv 2026-01-18 23:20:36 -10:00
yiyixuxu
1f9576a2ca fix 2026-01-19 09:56:14 +01:00
yiyixuxu
d75fbc43c7 Merge branch 'modular-doc-improv' of github.com:huggingface/diffusers into modular-doc-improv 2026-01-19 09:54:46 +01:00
yiyixuxu
b7127ce7a7 revert change in z 2026-01-19 09:54:40 +01:00
YiYi Xu
7e9d2b954e Apply suggestions from code review 2026-01-18 22:44:44 -10:00
yiyixuxu
94525200fd rmove space in make docstring 2026-01-19 09:35:39 +01:00
yiyixuxu
f056af1fbb make style 2026-01-19 09:27:40 +01:00
yiyixuxu
8d45ff5bf6 apply auto docstring 2026-01-19 09:22:04 +01:00
yiyixuxu
fb15752d55 up up up 2026-01-19 08:10:31 +01:00
yiyixuxu
1f2dbc9dd2 up 2026-01-19 04:10:17 +01:00
yiyixuxu
002c3e8239 add template method 2026-01-19 03:24:34 +01:00
yiyixuxu
de03d7f100 refactor based on dhruv's feedback: remove the class method 2026-01-18 00:35:01 +01:00
yiyixuxu
25c968a38f add TODO in the description for empty docstring 2026-01-17 09:57:56 +01:00
yiyixuxu
aea0d046f6 address feedbacks 2026-01-17 09:36:58 +01:00
yiyixuxu
1c90ce33f2 up 2026-01-10 12:21:26 +01:00
yiyixuxu
507953f415 more more 2026-01-10 12:19:14 +01:00
yiyixuxu
f0555af1c6 up up up 2026-01-10 12:15:53 +01:00
yiyixuxu
2a81f2ec54 style 2026-01-10 12:15:36 +01:00
yiyixuxu
d20f413f78 more auto docstring 2026-01-10 12:11:28 +01:00
yiyixuxu
ff09bf1a63 add modular_auto_docstring! 2026-01-10 11:55:03 +01:00
yiyixuxu
34a743e2dc style 2026-01-10 10:57:27 +01:00
yiyixuxu
43ab14845d update outputs 2026-01-10 10:56:54 +01:00
YiYi Xu
fbfe5c8d6b Merge branch 'main' into modular-doc-improv 2026-01-09 23:54:23 -10:00
yiyixuxu
b29873dee7 up up 2026-01-10 10:52:53 +01:00
yiyixuxu
7b499de6d0 up 2026-01-10 03:35:15 +01:00
28 changed files with 1614 additions and 463 deletions

View File

@@ -29,8 +29,31 @@ text_encoder = AutoModel.from_pretrained(
)
```
## Custom models
[`AutoModel`] also loads models from the [Hub](https://huggingface.co/models) that aren't included in Diffusers. Set `trust_remote_code=True` in [`AutoModel.from_pretrained`] to load custom models.
A custom model repository needs a Python module with the model class, and a `config.json` with an `auto_map` entry that maps `"AutoModel"` to `"module_file.ClassName"`.
```
custom/custom-transformer-model/
├── config.json
├── my_model.py
└── diffusion_pytorch_model.safetensors
```
The `config.json` includes the `auto_map` field pointing to the custom class.
```json
{
"auto_map": {
"AutoModel": "my_model.MyCustomModel"
}
}
```
Then load it with `trust_remote_code=True`.
```py
import torch
from diffusers import AutoModel
@@ -40,7 +63,39 @@ transformer = AutoModel.from_pretrained(
)
```
For a real-world example, [Overworld/Waypoint-1-Small](https://huggingface.co/Overworld/Waypoint-1-Small/tree/main/transformer) hosts a custom `WorldModel` class across several modules in its `transformer` subfolder.
```
transformer/
├── config.json # auto_map: "model.WorldModel"
├── model.py
├── attn.py
├── nn.py
├── cache.py
├── quantize.py
├── __init__.py
└── diffusion_pytorch_model.safetensors
```
```py
import torch
from diffusers import AutoModel
transformer = AutoModel.from_pretrained(
"Overworld/Waypoint-1-Small", subfolder="transformer", trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="cuda"
)
```
If the custom model inherits from the [`ModelMixin`] class, it gets access to the same features as Diffusers model classes, like [regional compilation](../optimization/fp16#regional-compilation) and [group offloading](../optimization/memory#group-offloading).
> [!WARNING]
> As a precaution with `trust_remote_code=True`, pass a commit hash to the `revision` argument in [`AutoModel.from_pretrained`] to make sure the code hasn't been updated with new malicious code (unless you fully trust the model owners).
>
> ```py
> transformer = AutoModel.from_pretrained(
> "Overworld/Waypoint-1-Small", subfolder="transformer", trust_remote_code=True, revision="a3d8cb2"
> )
> ```
> [!NOTE]
> Learn more about implementing custom models in the [Community components](../using-diffusers/custom_pipeline_overview#community-components) guide.

View File

@@ -2321,6 +2321,14 @@ def _convert_non_diffusers_flux2_lora_to_diffusers(state_dict):
prefix = "diffusion_model."
original_state_dict = {k[len(prefix) :]: v for k, v in state_dict.items()}
has_lora_down_up = any("lora_down" in k or "lora_up" in k for k in original_state_dict.keys())
if has_lora_down_up:
temp_state_dict = {}
for k, v in original_state_dict.items():
new_key = k.replace("lora_down", "lora_A").replace("lora_up", "lora_B")
temp_state_dict[new_key] = v
original_state_dict = temp_state_dict
num_double_layers = 0
num_single_layers = 0
for key in original_state_dict.keys():
@@ -2337,13 +2345,15 @@ def _convert_non_diffusers_flux2_lora_to_diffusers(state_dict):
attn_prefix = f"single_transformer_blocks.{sl}.attn"
for lora_key in lora_keys:
converted_state_dict[f"{attn_prefix}.to_qkv_mlp_proj.{lora_key}.weight"] = original_state_dict.pop(
f"{single_block_prefix}.linear1.{lora_key}.weight"
)
linear1_key = f"{single_block_prefix}.linear1.{lora_key}.weight"
if linear1_key in original_state_dict:
converted_state_dict[f"{attn_prefix}.to_qkv_mlp_proj.{lora_key}.weight"] = original_state_dict.pop(
linear1_key
)
converted_state_dict[f"{attn_prefix}.to_out.{lora_key}.weight"] = original_state_dict.pop(
f"{single_block_prefix}.linear2.{lora_key}.weight"
)
linear2_key = f"{single_block_prefix}.linear2.{lora_key}.weight"
if linear2_key in original_state_dict:
converted_state_dict[f"{attn_prefix}.to_out.{lora_key}.weight"] = original_state_dict.pop(linear2_key)
for dl in range(num_double_layers):
transformer_block_prefix = f"transformer_blocks.{dl}"
@@ -2352,6 +2362,10 @@ def _convert_non_diffusers_flux2_lora_to_diffusers(state_dict):
for attn_type in attn_types:
attn_prefix = f"{transformer_block_prefix}.attn"
qkv_key = f"double_blocks.{dl}.{attn_type}.qkv.{lora_key}.weight"
if qkv_key not in original_state_dict:
continue
fused_qkv_weight = original_state_dict.pop(qkv_key)
if lora_key == "lora_A":
@@ -2383,8 +2397,9 @@ def _convert_non_diffusers_flux2_lora_to_diffusers(state_dict):
for org_proj, diff_proj in proj_mappings:
for lora_key in lora_keys:
original_key = f"double_blocks.{dl}.{org_proj}.{lora_key}.weight"
diffusers_key = f"{transformer_block_prefix}.{diff_proj}.{lora_key}.weight"
converted_state_dict[diffusers_key] = original_state_dict.pop(original_key)
if original_key in original_state_dict:
diffusers_key = f"{transformer_block_prefix}.{diff_proj}.{lora_key}.weight"
converted_state_dict[diffusers_key] = original_state_dict.pop(original_key)
mlp_mappings = [
("img_mlp.0", "ff.linear_in"),
@@ -2395,8 +2410,27 @@ def _convert_non_diffusers_flux2_lora_to_diffusers(state_dict):
for org_mlp, diff_mlp in mlp_mappings:
for lora_key in lora_keys:
original_key = f"double_blocks.{dl}.{org_mlp}.{lora_key}.weight"
diffusers_key = f"{transformer_block_prefix}.{diff_mlp}.{lora_key}.weight"
converted_state_dict[diffusers_key] = original_state_dict.pop(original_key)
if original_key in original_state_dict:
diffusers_key = f"{transformer_block_prefix}.{diff_mlp}.{lora_key}.weight"
converted_state_dict[diffusers_key] = original_state_dict.pop(original_key)
extra_mappings = {
"img_in": "x_embedder",
"txt_in": "context_embedder",
"time_in.in_layer": "time_guidance_embed.timestep_embedder.linear_1",
"time_in.out_layer": "time_guidance_embed.timestep_embedder.linear_2",
"final_layer.linear": "proj_out",
"final_layer.adaLN_modulation.1": "norm_out.linear",
"single_stream_modulation.lin": "single_stream_modulation.linear",
"double_stream_modulation_img.lin": "double_stream_modulation_img.linear",
"double_stream_modulation_txt.lin": "double_stream_modulation_txt.linear",
}
for org_key, diff_key in extra_mappings.items():
for lora_key in lora_keys:
original_key = f"{org_key}.{lora_key}.weight"
if original_key in original_state_dict:
converted_state_dict[f"{diff_key}.{lora_key}.weight"] = original_state_dict.pop(original_key)
if len(original_state_dict) > 0:
raise ValueError(f"`original_state_dict` should be empty at this point but has {original_state_dict.keys()=}.")

View File

@@ -43,7 +43,7 @@ def _get_qkv_projections(attn: "WanAttention", hidden_states: torch.Tensor, enco
encoder_hidden_states = hidden_states
if attn.fused_projections:
if attn.cross_attention_dim_head is None:
if not attn.is_cross_attention:
# In self-attention layers, we can fuse the entire QKV projection into a single linear
query, key, value = attn.to_qkv(hidden_states).chunk(3, dim=-1)
else:
@@ -219,7 +219,10 @@ class WanAttention(torch.nn.Module, AttentionModuleMixin):
self.add_v_proj = torch.nn.Linear(added_kv_proj_dim, self.inner_dim, bias=True)
self.norm_added_k = torch.nn.RMSNorm(dim_head * heads, eps=eps)
self.is_cross_attention = cross_attention_dim_head is not None
if is_cross_attention is not None:
self.is_cross_attention = is_cross_attention
else:
self.is_cross_attention = cross_attention_dim_head is not None
self.set_processor(processor)
@@ -227,7 +230,7 @@ class WanAttention(torch.nn.Module, AttentionModuleMixin):
if getattr(self, "fused_projections", False):
return
if self.cross_attention_dim_head is None:
if not self.is_cross_attention:
concatenated_weights = torch.cat([self.to_q.weight.data, self.to_k.weight.data, self.to_v.weight.data])
concatenated_bias = torch.cat([self.to_q.bias.data, self.to_k.bias.data, self.to_v.bias.data])
out_features, in_features = concatenated_weights.shape

View File

@@ -42,7 +42,7 @@ def _get_qkv_projections(attn: "WanAttention", hidden_states: torch.Tensor, enco
encoder_hidden_states = hidden_states
if attn.fused_projections:
if attn.cross_attention_dim_head is None:
if not attn.is_cross_attention:
# In self-attention layers, we can fuse the entire QKV projection into a single linear
query, key, value = attn.to_qkv(hidden_states).chunk(3, dim=-1)
else:
@@ -214,7 +214,10 @@ class WanAttention(torch.nn.Module, AttentionModuleMixin):
self.add_v_proj = torch.nn.Linear(added_kv_proj_dim, self.inner_dim, bias=True)
self.norm_added_k = torch.nn.RMSNorm(dim_head * heads, eps=eps)
self.is_cross_attention = cross_attention_dim_head is not None
if is_cross_attention is not None:
self.is_cross_attention = is_cross_attention
else:
self.is_cross_attention = cross_attention_dim_head is not None
self.set_processor(processor)
@@ -222,7 +225,7 @@ class WanAttention(torch.nn.Module, AttentionModuleMixin):
if getattr(self, "fused_projections", False):
return
if self.cross_attention_dim_head is None:
if not self.is_cross_attention:
concatenated_weights = torch.cat([self.to_q.weight.data, self.to_k.weight.data, self.to_v.weight.data])
concatenated_bias = torch.cat([self.to_q.bias.data, self.to_k.bias.data, self.to_v.bias.data])
out_features, in_features = concatenated_weights.shape

View File

@@ -54,7 +54,7 @@ def _get_qkv_projections(attn: "WanAttention", hidden_states: torch.Tensor, enco
encoder_hidden_states = hidden_states
if attn.fused_projections:
if attn.cross_attention_dim_head is None:
if not attn.is_cross_attention:
# In self-attention layers, we can fuse the entire QKV projection into a single linear
query, key, value = attn.to_qkv(hidden_states).chunk(3, dim=-1)
else:
@@ -502,13 +502,16 @@ class WanAnimateFaceBlockCrossAttention(nn.Module, AttentionModuleMixin):
dim_head: int = 64,
eps: float = 1e-6,
cross_attention_dim_head: Optional[int] = None,
bias: bool = True,
processor=None,
):
super().__init__()
self.inner_dim = dim_head * heads
self.heads = heads
self.cross_attention_head_dim = cross_attention_dim_head
self.cross_attention_dim_head = cross_attention_dim_head
self.kv_inner_dim = self.inner_dim if cross_attention_dim_head is None else cross_attention_dim_head * heads
self.use_bias = bias
self.is_cross_attention = cross_attention_dim_head is not None
# 1. Pre-Attention Norms for the hidden_states (video latents) and encoder_hidden_states (motion vector).
# NOTE: this is not used in "vanilla" WanAttention
@@ -516,10 +519,10 @@ class WanAnimateFaceBlockCrossAttention(nn.Module, AttentionModuleMixin):
self.pre_norm_kv = nn.LayerNorm(dim, eps, elementwise_affine=False)
# 2. QKV and Output Projections
self.to_q = torch.nn.Linear(dim, self.inner_dim, bias=True)
self.to_k = torch.nn.Linear(dim, self.kv_inner_dim, bias=True)
self.to_v = torch.nn.Linear(dim, self.kv_inner_dim, bias=True)
self.to_out = torch.nn.Linear(self.inner_dim, dim, bias=True)
self.to_q = torch.nn.Linear(dim, self.inner_dim, bias=bias)
self.to_k = torch.nn.Linear(dim, self.kv_inner_dim, bias=bias)
self.to_v = torch.nn.Linear(dim, self.kv_inner_dim, bias=bias)
self.to_out = torch.nn.Linear(self.inner_dim, dim, bias=bias)
# 3. QK Norm
# NOTE: this is applied after the reshape, so only over dim_head rather than dim_head * heads
@@ -682,7 +685,10 @@ class WanAttention(torch.nn.Module, AttentionModuleMixin):
self.add_v_proj = torch.nn.Linear(added_kv_proj_dim, self.inner_dim, bias=True)
self.norm_added_k = torch.nn.RMSNorm(dim_head * heads, eps=eps)
self.is_cross_attention = cross_attention_dim_head is not None
if is_cross_attention is not None:
self.is_cross_attention = is_cross_attention
else:
self.is_cross_attention = cross_attention_dim_head is not None
self.set_processor(processor)
@@ -690,7 +696,7 @@ class WanAttention(torch.nn.Module, AttentionModuleMixin):
if getattr(self, "fused_projections", False):
return
if self.cross_attention_dim_head is None:
if not self.is_cross_attention:
concatenated_weights = torch.cat([self.to_q.weight.data, self.to_k.weight.data, self.to_v.weight.data])
concatenated_bias = torch.cat([self.to_q.bias.data, self.to_k.bias.data, self.to_v.bias.data])
out_features, in_features = concatenated_weights.shape

View File

@@ -76,6 +76,7 @@ class WanVACETransformerBlock(nn.Module):
eps=eps,
added_kv_proj_dim=added_kv_proj_dim,
processor=WanAttnProcessor(),
is_cross_attention=True,
)
self.norm2 = FP32LayerNorm(dim, eps, elementwise_affine=True) if cross_attn_norm else nn.Identity()
@@ -178,6 +179,7 @@ class WanVACETransformer3DModel(
_no_split_modules = ["WanTransformerBlock", "WanVACETransformerBlock"]
_keep_in_fp32_modules = ["time_embedder", "scale_shift_table", "norm1", "norm2", "norm3"]
_keys_to_ignore_on_load_unexpected = ["norm_added_q"]
_repeated_blocks = ["WanTransformerBlock", "WanVACETransformerBlock"]
@register_to_config
def __init__(

View File

@@ -40,8 +40,11 @@ from .modular_pipeline_utils import (
InputParam,
InsertableDict,
OutputParam,
combine_inputs,
combine_outputs,
format_components,
format_configs,
format_workflow,
generate_modular_model_card_content,
make_doc_string,
)
@@ -287,6 +290,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
config_name = "modular_config.json"
model_name = None
_workflow_map = None
@classmethod
def _get_signature_keys(cls, obj):
@@ -342,6 +346,35 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
def outputs(self) -> List[OutputParam]:
return self._get_outputs()
# currentlyonly ConditionalPipelineBlocks and SequentialPipelineBlocks support `get_execution_blocks`
def get_execution_blocks(self, **kwargs):
"""
Get the block(s) that would execute given the inputs. Must be implemented by subclasses that support
conditional block selection.
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
"""
raise NotImplementedError(f"`get_execution_blocks` is not implemented for {self.__class__.__name__}")
# currently only SequentialPipelineBlocks support workflows
@property
def workflow_names(self):
"""
Returns a list of available workflow names. Must be implemented by subclasses that define `_workflow_map`.
"""
raise NotImplementedError(f"`workflow_names` is not implemented for {self.__class__.__name__}")
def get_workflow(self, workflow_name: str):
"""
Get the execution blocks for a specific workflow. Must be implemented by subclasses that define
`_workflow_map`.
Args:
workflow_name: Name of the workflow to retrieve.
"""
raise NotImplementedError(f"`get_workflow` is not implemented for {self.__class__.__name__}")
@classmethod
def from_pretrained(
cls,
@@ -480,72 +513,6 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
if current_value is not param: # Using identity comparison to check if object was modified
state.set(param_name, param, input_param.kwargs_type)
@staticmethod
def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]:
"""
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if
current default value is None and new default value is not None. Warns if multiple non-None default values
exist for the same input.
Args:
named_input_lists: List of tuples containing (block_name, input_param_list) pairs
Returns:
List[InputParam]: Combined list of unique InputParam objects
"""
combined_dict = {} # name -> InputParam
value_sources = {} # name -> block_name
for block_name, inputs in named_input_lists:
for input_param in inputs:
if input_param.name is None and input_param.kwargs_type is not None:
input_name = "*_" + input_param.kwargs_type
else:
input_name = input_param.name
if input_name in combined_dict:
current_param = combined_dict[input_name]
if (
current_param.default is not None
and input_param.default is not None
and current_param.default != input_param.default
):
warnings.warn(
f"Multiple different default values found for input '{input_name}': "
f"{current_param.default} (from block '{value_sources[input_name]}') and "
f"{input_param.default} (from block '{block_name}'). Using {current_param.default}."
)
if current_param.default is None and input_param.default is not None:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
else:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
return list(combined_dict.values())
@staticmethod
def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]:
"""
Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
occurrence of each output name.
Args:
named_output_lists: List of tuples containing (block_name, output_param_list) pairs
Returns:
List[OutputParam]: Combined list of unique OutputParam objects
"""
combined_dict = {} # name -> OutputParam
for block_name, outputs in named_output_lists:
for output_param in outputs:
if (output_param.name not in combined_dict) or (
combined_dict[output_param.name].kwargs_type is None and output_param.kwargs_type is not None
):
combined_dict[output_param.name] = output_param
return list(combined_dict.values())
@property
def input_names(self) -> List[str]:
return [input_param.name for input_param in self.inputs if input_param.name is not None]
@@ -577,7 +544,8 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
class ConditionalPipelineBlocks(ModularPipelineBlocks):
"""
A Pipeline Blocks that conditionally selects a block to run based on the inputs. Subclasses must implement the
`select_block` method to define the logic for selecting the block.
`select_block` method to define the logic for selecting the block. Currently, we only support selection logic based
on the presence or absence of inputs (i.e., whether they are `None` or not)
This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the
library implements for all the pipeline blocks (such as loading or saving etc.)
@@ -585,15 +553,20 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
> [!WARNING] > This is an experimental feature and is likely to change in the future.
Attributes:
block_classes: List of block classes to be used
block_names: List of prefixes for each block
block_trigger_inputs: List of input names that select_block() uses to determine which block to run
block_classes: List of block classes to be used. Must have the same length as `block_names`.
block_names: List of names for each block. Must have the same length as `block_classes`.
block_trigger_inputs: List of input names that `select_block()` uses to determine which block to run.
For `ConditionalPipelineBlocks`, this does not need to correspond to `block_names` and `block_classes`. For
`AutoPipelineBlocks`, this must have the same length as `block_names` and `block_classes`, where each
element specifies the trigger input for the corresponding block.
default_block_name: Name of the default block to run when no trigger inputs match.
If None, this block can be skipped entirely when no trigger inputs are provided.
"""
block_classes = []
block_names = []
block_trigger_inputs = []
default_block_name = None # name of the default block if no trigger inputs are provided, if None, this block can be skipped if no trigger inputs are provided
default_block_name = None
def __init__(self):
sub_blocks = InsertableDict()
@@ -657,7 +630,7 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
@property
def inputs(self) -> List[Tuple[str, Any]]:
named_inputs = [(name, block.inputs) for name, block in self.sub_blocks.items()]
combined_inputs = self.combine_inputs(*named_inputs)
combined_inputs = combine_inputs(*named_inputs)
# mark Required inputs only if that input is required by all the blocks
for input_param in combined_inputs:
if input_param.name in self.required_inputs:
@@ -669,15 +642,16 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
@property
def intermediate_outputs(self) -> List[str]:
named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()]
combined_outputs = self.combine_outputs(*named_outputs)
combined_outputs = combine_outputs(*named_outputs)
return combined_outputs
@property
def outputs(self) -> List[str]:
named_outputs = [(name, block.outputs) for name, block in self.sub_blocks.items()]
combined_outputs = self.combine_outputs(*named_outputs)
combined_outputs = combine_outputs(*named_outputs)
return combined_outputs
# used for `__repr__`
def _get_trigger_inputs(self) -> set:
"""
Returns a set of all unique trigger input values found in this block and nested blocks.
@@ -706,11 +680,6 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
return all_triggers
@property
def trigger_inputs(self):
"""All trigger inputs including from nested blocks."""
return self._get_trigger_inputs()
def select_block(self, **kwargs) -> Optional[str]:
"""
Select the block to run based on the trigger inputs. Subclasses must implement this method to define the logic
@@ -750,6 +719,39 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
logger.error(error_msg)
raise
def get_execution_blocks(self, **kwargs) -> Optional["ModularPipelineBlocks"]:
"""
Get the block(s) that would execute given the inputs.
Recursively resolves nested ConditionalPipelineBlocks until reaching either:
- A leaf block (no sub_blocks) → returns single `ModularPipelineBlocks`
- A `SequentialPipelineBlocks` → delegates to its `get_execution_blocks()` which returns
a `SequentialPipelineBlocks` containing the resolved execution blocks
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
Returns:
- `ModularPipelineBlocks`: A leaf block or resolved `SequentialPipelineBlocks`
- `None`: If this block would be skipped (no trigger matched and no default)
"""
trigger_kwargs = {name: kwargs.get(name) for name in self.block_trigger_inputs if name is not None}
block_name = self.select_block(**trigger_kwargs)
if block_name is None:
block_name = self.default_block_name
if block_name is None:
return None
block = self.sub_blocks[block_name]
# Recursively resolve until we hit a leaf block or a SequentialPipelineBlocks
if block.sub_blocks and not isinstance(block, LoopSequentialPipelineBlocks):
return block.get_execution_blocks(**kwargs)
return block
def __repr__(self):
class_name = self.__class__.__name__
base_class = self.__class__.__bases__[0].__name__
@@ -757,11 +759,11 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
f"{class_name}(\n Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
)
if self.trigger_inputs:
if self._get_trigger_inputs():
header += "\n"
header += " " + "=" * 100 + "\n"
header += " This pipeline contains blocks that are selected at runtime based on inputs.\n"
header += f" Trigger Inputs: {sorted(self.trigger_inputs)}\n"
header += f" Trigger Inputs: {sorted(self._get_trigger_inputs())}\n"
header += " " + "=" * 100 + "\n\n"
# Format description with proper indentation
@@ -828,24 +830,56 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
class AutoPipelineBlocks(ConditionalPipelineBlocks):
"""
A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs.
A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs.
This is a specialized version of `ConditionalPipelineBlocks` where:
- Each block has one corresponding trigger input (1:1 mapping)
- Block selection is automatic: the first block whose trigger input is present gets selected
- `block_trigger_inputs` must have the same length as `block_names` and `block_classes`
- Use `None` in `block_trigger_inputs` to specify the default block, i.e the block that will run if no trigger
inputs are present
Attributes:
block_classes:
List of block classes to be used. Must have the same length as `block_names` and
`block_trigger_inputs`.
block_names:
List of names for each block. Must have the same length as `block_classes` and `block_trigger_inputs`.
block_trigger_inputs:
List of input names where each element specifies the trigger input for the corresponding block. Use
`None` to mark the default block.
Example:
```python
class MyAutoBlock(AutoPipelineBlocks):
block_classes = [InpaintEncoderBlock, ImageEncoderBlock, TextEncoderBlock]
block_names = ["inpaint", "img2img", "text2img"]
block_trigger_inputs = ["mask_image", "image", None] # text2img is the default
```
With this definition:
- As long as `mask_image` is provided, "inpaint" block runs (regardless of `image` being provided or not)
- If `mask_image` is not provided but `image` is provided, "img2img" block runs
- Otherwise, "text2img" block runs (default, trigger is `None`)
"""
def __init__(self):
super().__init__()
if self.default_block_name is not None:
raise ValueError(
f"In {self.__class__.__name__}, do not set `default_block_name` for AutoPipelineBlocks. "
f"Use `None` in `block_trigger_inputs` to specify the default block."
)
if not (len(self.block_classes) == len(self.block_names) == len(self.block_trigger_inputs)):
raise ValueError(
f"In {self.__class__.__name__}, the number of block_classes, block_names, and block_trigger_inputs must be the same."
)
@property
def default_block_name(self) -> Optional[str]:
"""Derive default_block_name from block_trigger_inputs (None entry)."""
if None in self.block_trigger_inputs:
idx = self.block_trigger_inputs.index(None)
return self.block_names[idx]
return None
self.default_block_name = self.block_names[idx]
def select_block(self, **kwargs) -> Optional[str]:
"""Select block based on which trigger input is present (not None)."""
@@ -899,6 +933,29 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
expected_configs.append(config)
return expected_configs
@property
def workflow_names(self):
if self._workflow_map is None:
raise NotImplementedError(
f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}"
)
return list(self._workflow_map.keys())
def get_workflow(self, workflow_name: str):
if self._workflow_map is None:
raise NotImplementedError(
f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}"
)
if workflow_name not in self._workflow_map:
raise ValueError(f"Workflow {workflow_name} not found in {self.__class__.__name__}")
trigger_inputs = self._workflow_map[workflow_name]
workflow_blocks = self.get_execution_blocks(**trigger_inputs)
return workflow_blocks
@classmethod
def from_blocks_dict(
cls, blocks_dict: Dict[str, Any], description: Optional[str] = None
@@ -994,7 +1051,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
# filter out them here so they do not end up as intermediate_outputs
if name not in inp_names:
named_outputs.append((name, block.intermediate_outputs))
combined_outputs = self.combine_outputs(*named_outputs)
combined_outputs = combine_outputs(*named_outputs)
return combined_outputs
# YiYi TODO: I think we can remove the outputs property
@@ -1018,6 +1075,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
raise
return pipeline, state
# used for `trigger_inputs` property
def _get_trigger_inputs(self):
"""
Returns a set of all unique trigger input values found in the blocks.
@@ -1041,89 +1099,50 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
return fn_recursive_get_trigger(self.sub_blocks)
@property
def trigger_inputs(self):
return self._get_trigger_inputs()
def _traverse_trigger_blocks(self, active_inputs):
def get_execution_blocks(self, **kwargs) -> "SequentialPipelineBlocks":
"""
Traverse blocks and select which ones would run given the active inputs.
Get the blocks that would execute given the specified inputs.
Args:
active_inputs: Dict of input names to values that are "present"
**kwargs: Input names and values. Only trigger inputs affect block selection.
Returns:
OrderedDict of block_name -> block that would execute
SequentialPipelineBlocks containing only the blocks that would execute
"""
# Copy kwargs so we can add outputs as we traverse
active_inputs = dict(kwargs)
def fn_recursive_traverse(block, block_name, active_inputs):
result_blocks = OrderedDict()
# ConditionalPipelineBlocks (includes AutoPipelineBlocks)
if isinstance(block, ConditionalPipelineBlocks):
trigger_kwargs = {name: active_inputs.get(name) for name in block.block_trigger_inputs}
selected_block_name = block.select_block(**trigger_kwargs)
if selected_block_name is None:
selected_block_name = block.default_block_name
if selected_block_name is None:
block = block.get_execution_blocks(**active_inputs)
if block is None:
return result_blocks
selected_block = block.sub_blocks[selected_block_name]
if selected_block.sub_blocks:
result_blocks.update(fn_recursive_traverse(selected_block, block_name, active_inputs))
else:
result_blocks[block_name] = selected_block
if hasattr(selected_block, "outputs"):
for out in selected_block.outputs:
active_inputs[out.name] = True
return result_blocks
# SequentialPipelineBlocks or LoopSequentialPipelineBlocks
if block.sub_blocks:
# Has sub_blocks (SequentialPipelineBlocks/ConditionalPipelineBlocks)
if block.sub_blocks and not isinstance(block, LoopSequentialPipelineBlocks):
for sub_block_name, sub_block in block.sub_blocks.items():
blocks_to_update = fn_recursive_traverse(sub_block, sub_block_name, active_inputs)
blocks_to_update = {f"{block_name}.{k}": v for k, v in blocks_to_update.items()}
result_blocks.update(blocks_to_update)
nested_blocks = fn_recursive_traverse(sub_block, sub_block_name, active_inputs)
nested_blocks = {f"{block_name}.{k}": v for k, v in nested_blocks.items()}
result_blocks.update(nested_blocks)
else:
# Leaf block: single ModularPipelineBlocks or LoopSequentialPipelineBlocks
result_blocks[block_name] = block
if hasattr(block, "outputs"):
for out in block.outputs:
# Add outputs to active_inputs so subsequent blocks can use them as triggers
if hasattr(block, "intermediate_outputs"):
for out in block.intermediate_outputs:
active_inputs[out.name] = True
return result_blocks
all_blocks = OrderedDict()
for block_name, block in self.sub_blocks.items():
blocks_to_update = fn_recursive_traverse(block, block_name, active_inputs)
all_blocks.update(blocks_to_update)
return all_blocks
nested_blocks = fn_recursive_traverse(block, block_name, active_inputs)
all_blocks.update(nested_blocks)
def get_execution_blocks(self, **kwargs):
"""
Get the blocks that would execute given the specified inputs.
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
Pass any inputs that would be non-None at runtime.
Returns:
SequentialPipelineBlocks containing only the blocks that would execute
Example:
# Get blocks for inpainting workflow blocks = pipeline.get_execution_blocks(prompt="a cat", mask=mask,
image=image)
# Get blocks for text2image workflow blocks = pipeline.get_execution_blocks(prompt="a cat")
"""
# Filter out None values
active_inputs = {k: v for k, v in kwargs.items() if v is not None}
blocks_triggered = self._traverse_trigger_blocks(active_inputs)
return SequentialPipelineBlocks.from_blocks_dict(blocks_triggered)
return SequentialPipelineBlocks.from_blocks_dict(all_blocks)
def __repr__(self):
class_name = self.__class__.__name__
@@ -1132,18 +1151,23 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
f"{class_name}(\n Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
)
if self.trigger_inputs:
if self._workflow_map is None and self._get_trigger_inputs():
header += "\n"
header += " " + "=" * 100 + "\n"
header += " This pipeline contains blocks that are selected at runtime based on inputs.\n"
header += f" Trigger Inputs: {[inp for inp in self.trigger_inputs if inp is not None]}\n"
header += f" Trigger Inputs: {[inp for inp in self._get_trigger_inputs() if inp is not None]}\n"
# Get first trigger input as example
example_input = next(t for t in self.trigger_inputs if t is not None)
example_input = next(t for t in self._get_trigger_inputs() if t is not None)
header += f" Use `get_execution_blocks()` to see selected blocks (e.g. `get_execution_blocks({example_input}=...)`).\n"
header += " " + "=" * 100 + "\n\n"
description = self.description
if self._workflow_map is not None:
workflow_str = format_workflow(self._workflow_map)
description = f"{self.description}\n\n{workflow_str}"
# Format description with proper indentation
desc_lines = self.description.split("\n")
desc_lines = description.split("\n")
desc = []
# First line with "Description:" label
desc.append(f" Description: {desc_lines[0]}")
@@ -1191,10 +1215,15 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
@property
def doc(self):
description = self.description
if self._workflow_map is not None:
workflow_str = format_workflow(self._workflow_map)
description = f"{self.description}\n\n{workflow_str}"
return make_doc_string(
self.inputs,
self.outputs,
self.description,
description=description,
class_name=self.__class__.__name__,
expected_components=self.expected_components,
expected_configs=self.expected_configs,
@@ -1327,7 +1356,7 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks):
@property
def intermediate_outputs(self) -> List[str]:
named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()]
combined_outputs = self.combine_outputs(*named_outputs)
combined_outputs = combine_outputs(*named_outputs)
for output in self.loop_intermediate_outputs:
if output.name not in {output.name for output in combined_outputs}:
combined_outputs.append(output)

View File

@@ -14,9 +14,10 @@
import inspect
import re
import warnings
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional, Type, Union
from dataclasses import dataclass, field, fields
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
import PIL.Image
import torch
@@ -886,6 +887,30 @@ def format_configs(configs, indent_level=4, max_line_length=115, add_empty_lines
return "\n".join(formatted_configs)
def format_workflow(workflow_map):
"""Format a workflow map into a readable string representation.
Args:
workflow_map: Dictionary mapping workflow names to trigger inputs
Returns:
A formatted string representing all workflows
"""
if workflow_map is None:
return ""
lines = ["Supported workflows:"]
for workflow_name, trigger_inputs in workflow_map.items():
required_inputs = [k for k, v in trigger_inputs.items() if v]
if required_inputs:
inputs_str = ", ".join(f"`{t}`" for t in required_inputs)
lines.append(f" - `{workflow_name}`: requires {inputs_str}")
else:
lines.append(f" - `{workflow_name}`: default (no additional inputs required)")
return "\n".join(lines)
def make_doc_string(
inputs,
outputs,
@@ -942,6 +967,70 @@ def make_doc_string(
return output
def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]:
"""
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if current
default value is None and new default value is not None. Warns if multiple non-None default values exist for the
same input.
Args:
named_input_lists: List of tuples containing (block_name, input_param_list) pairs
Returns:
List[InputParam]: Combined list of unique InputParam objects
"""
combined_dict = {} # name -> InputParam
value_sources = {} # name -> block_name
for block_name, inputs in named_input_lists:
for input_param in inputs:
if input_param.name is None and input_param.kwargs_type is not None:
input_name = "*_" + input_param.kwargs_type
else:
input_name = input_param.name
if input_name in combined_dict:
current_param = combined_dict[input_name]
if (
current_param.default is not None
and input_param.default is not None
and current_param.default != input_param.default
):
warnings.warn(
f"Multiple different default values found for input '{input_name}': "
f"{current_param.default} (from block '{value_sources[input_name]}') and "
f"{input_param.default} (from block '{block_name}'). Using {current_param.default}."
)
if current_param.default is None and input_param.default is not None:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
else:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
return list(combined_dict.values())
def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]:
"""
Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
occurrence of each output name.
Args:
named_output_lists: List of tuples containing (block_name, output_param_list) pairs
Returns:
List[OutputParam]: Combined list of unique OutputParam objects
"""
combined_dict = {} # name -> OutputParam
for block_name, outputs in named_output_lists:
for output_param in outputs:
if (output_param.name not in combined_dict) or (
combined_dict[output_param.name].kwargs_type is None and output_param.kwargs_type is not None
):
combined_dict[output_param.name] = output_param
return list(combined_dict.values())
def generate_modular_model_card_content(blocks) -> Dict[str, Any]:
"""
Generate model card content for a modular pipeline.

View File

@@ -1113,10 +1113,14 @@ AUTO_BLOCKS = InsertableDict(
class QwenImageAutoBlocks(SequentialPipelineBlocks):
"""
Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.
- for image-to-image generation, you need to provide `image`
- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.
- to run the controlnet workflow, you need to provide `control_image`
- for text-to-image generation, all you need to provide is `prompt`
Supported workflows:
- `text2image`: requires `prompt`
- `image2image`: requires `prompt`, `image`
- `inpainting`: requires `prompt`, `mask_image`, `image`
- `controlnet_text2image`: requires `prompt`, `control_image`
- `controlnet_image2image`: requires `prompt`, `image`, `control_image`
- `controlnet_inpainting`: requires `prompt`, `mask_image`, `image`, `control_image`
Components:
text_encoder (`Qwen2_5_VLForConditionalGeneration`): The text encoder to use tokenizer (`Qwen2Tokenizer`):
@@ -1197,15 +1201,23 @@ class QwenImageAutoBlocks(SequentialPipelineBlocks):
block_classes = AUTO_BLOCKS.values()
block_names = AUTO_BLOCKS.keys()
# Workflow map defines the trigger conditions for each workflow.
# How to define:
# - Only include required inputs and trigger inputs (inputs that determine which blocks run)
# - currently, only supports `True` means the workflow triggers when the input is not None
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"prompt": True, "image": True},
"inpainting": {"prompt": True, "mask_image": True, "image": True},
"controlnet_text2image": {"prompt": True, "control_image": True},
"controlnet_image2image": {"prompt": True, "image": True, "control_image": True},
"controlnet_inpainting": {"prompt": True, "mask_image": True, "image": True, "control_image": True},
}
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.\n"
+ "- for image-to-image generation, you need to provide `image`\n"
+ "- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.\n"
+ "- to run the controlnet workflow, you need to provide `control_image`\n"
+ "- for text-to-image generation, all you need to provide is `prompt`"
)
return "Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage."
@property
def outputs(self):

View File

@@ -774,6 +774,10 @@ class QwenImageEditAutoBlocks(SequentialPipelineBlocks):
model_name = "qwenimage-edit"
block_classes = EDIT_AUTO_BLOCKS.values()
block_names = EDIT_AUTO_BLOCKS.keys()
_workflow_map = {
"edit": {"prompt": True, "image": True},
"edit_inpainting": {"prompt": True, "mask_image": True, "image": True},
}
@property
def description(self):

View File

@@ -23,18 +23,7 @@ except OptionalDependencyNotAvailable:
else:
_import_structure["encoders"] = ["StableDiffusionXLTextEncoderStep"]
_import_structure["modular_blocks"] = [
"ALL_BLOCKS",
"AUTO_BLOCKS",
"CONTROLNET_BLOCKS",
"IMAGE2IMAGE_BLOCKS",
"INPAINT_BLOCKS",
"IP_ADAPTER_BLOCKS",
"TEXT2IMAGE_BLOCKS",
"StableDiffusionXLAutoBlocks",
"StableDiffusionXLAutoControlnetStep",
"StableDiffusionXLAutoDecodeStep",
"StableDiffusionXLAutoIPAdapterStep",
"StableDiffusionXLAutoVaeEncoderStep",
]
_import_structure["modular_pipeline"] = ["StableDiffusionXLModularPipeline"]
@@ -49,18 +38,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
StableDiffusionXLTextEncoderStep,
)
from .modular_blocks import (
ALL_BLOCKS,
AUTO_BLOCKS,
CONTROLNET_BLOCKS,
IMAGE2IMAGE_BLOCKS,
INPAINT_BLOCKS,
IP_ADAPTER_BLOCKS,
TEXT2IMAGE_BLOCKS,
StableDiffusionXLAutoBlocks,
StableDiffusionXLAutoControlnetStep,
StableDiffusionXLAutoDecodeStep,
StableDiffusionXLAutoIPAdapterStep,
StableDiffusionXLAutoVaeEncoderStep,
)
from .modular_pipeline import StableDiffusionXLModularPipeline
else:

View File

@@ -277,6 +277,7 @@ class StableDiffusionXLCoreDenoiseStep(SequentialPipelineBlocks):
# ip-adapter, controlnet, text2img, img2img, inpainting
# auto_docstring
class StableDiffusionXLAutoBlocks(SequentialPipelineBlocks):
block_classes = [
StableDiffusionXLTextEncoderStep,
@@ -293,103 +294,29 @@ class StableDiffusionXLAutoBlocks(SequentialPipelineBlocks):
"decode",
]
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using Stable Diffusion XL.\n"
+ "- for image-to-image generation, you need to provide either `image` or `image_latents`\n"
+ "- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop` \n"
+ "- to run the controlnet workflow, you need to provide `control_image`\n"
+ "- to run the controlnet_union workflow, you need to provide `control_image` and `control_mode`\n"
+ "- to run the ip_adapter workflow, you need to provide `ip_adapter_image`\n"
+ "- for text-to-image generation, all you need to provide is `prompt`"
)
# controlnet (input + denoise step)
class StableDiffusionXLAutoControlnetStep(SequentialPipelineBlocks):
block_classes = [
StableDiffusionXLAutoControlNetInputStep,
StableDiffusionXLAutoControlNetDenoiseStep,
]
block_names = ["controlnet_input", "controlnet_denoise"]
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"image": True, "prompt": True},
"inpainting": {"mask_image": True, "image": True, "prompt": True},
"controlnet_text2image": {"control_image": True, "prompt": True},
"controlnet_image2image": {"control_image": True, "image": True, "prompt": True},
"controlnet_inpainting": {"control_image": True, "mask_image": True, "image": True, "prompt": True},
"controlnet_union_text2image": {"control_image": True, "control_mode": True, "prompt": True},
"controlnet_union_image2image": {"control_image": True, "control_mode": True, "image": True, "prompt": True},
"controlnet_union_inpainting": {"control_image": True, "control_mode": True, "mask_image": True, "image": True, "prompt": True},
"ip_adapter_text2image": {"ip_adapter_image": True, "prompt": True},
"ip_adapter_image2image": {"ip_adapter_image": True, "image": True, "prompt": True},
"ip_adapter_inpainting": {"ip_adapter_image": True, "mask_image": True, "image": True, "prompt": True},
"ip_adapter_controlnet_text2image": {"ip_adapter_image": True, "control_image": True, "prompt": True},
"ip_adapter_controlnet_image2image": {"ip_adapter_image": True, "control_image": True, "image": True, "prompt": True},
"ip_adapter_controlnet_inpainting": {"ip_adapter_image": True, "control_image": True, "mask_image": True, "image": True, "prompt": True},
"ip_adapter_controlnet_union_text2image": {"ip_adapter_image": True, "control_image": True, "control_mode": True, "prompt": True},
"ip_adapter_controlnet_union_image2image": {"ip_adapter_image": True, "control_image": True, "control_mode": True, "image": True, "prompt": True},
"ip_adapter_controlnet_union_inpainting": {"ip_adapter_image": True, "control_image": True, "control_mode": True, "mask_image": True, "image": True, "prompt": True},
}
@property
def description(self):
return (
"Controlnet auto step that prepare the controlnet input and denoise the latents. "
+ "It works for both controlnet and controlnet_union and supports text2img, img2img and inpainting tasks."
+ " (it should be replace at 'denoise' step)"
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using Stable Diffusion XL."
)
TEXT2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("input", StableDiffusionXLInputStep),
("set_timesteps", StableDiffusionXLSetTimestepsStep),
("prepare_latents", StableDiffusionXLPrepareLatentsStep),
("prepare_add_cond", StableDiffusionXLPrepareAdditionalConditioningStep),
("denoise", StableDiffusionXLDenoiseStep),
("decode", StableDiffusionXLDecodeStep),
]
)
IMAGE2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("vae_encoder", StableDiffusionXLVaeEncoderStep),
("input", StableDiffusionXLInputStep),
("set_timesteps", StableDiffusionXLImg2ImgSetTimestepsStep),
("prepare_latents", StableDiffusionXLImg2ImgPrepareLatentsStep),
("prepare_add_cond", StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep),
("denoise", StableDiffusionXLDenoiseStep),
("decode", StableDiffusionXLDecodeStep),
]
)
INPAINT_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("vae_encoder", StableDiffusionXLInpaintVaeEncoderStep),
("input", StableDiffusionXLInputStep),
("set_timesteps", StableDiffusionXLImg2ImgSetTimestepsStep),
("prepare_latents", StableDiffusionXLInpaintPrepareLatentsStep),
("prepare_add_cond", StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep),
("denoise", StableDiffusionXLInpaintDenoiseStep),
("decode", StableDiffusionXLInpaintDecodeStep),
]
)
CONTROLNET_BLOCKS = InsertableDict(
[
("denoise", StableDiffusionXLAutoControlnetStep),
]
)
IP_ADAPTER_BLOCKS = InsertableDict(
[
("ip_adapter", StableDiffusionXLAutoIPAdapterStep),
]
)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("ip_adapter", StableDiffusionXLAutoIPAdapterStep),
("vae_encoder", StableDiffusionXLAutoVaeEncoderStep),
("denoise", StableDiffusionXLCoreDenoiseStep),
("decode", StableDiffusionXLAutoDecodeStep),
]
)
ALL_BLOCKS = {
"text2img": TEXT2IMAGE_BLOCKS,
"img2img": IMAGE2IMAGE_BLOCKS,
"inpaint": INPAINT_BLOCKS,
"controlnet": CONTROLNET_BLOCKS,
"ip_adapter": IP_ADAPTER_BLOCKS,
"auto": AUTO_BLOCKS,
}

View File

@@ -658,12 +658,7 @@ class GlmImagePipeline(DiffusionPipeline):
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
if prompt is not None and prior_token_ids is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prior_token_ids`: {prior_token_ids}. Please make sure to"
" only forward one of the two."
)
elif prompt is None and prior_token_ids is None:
if prompt is None and prior_token_ids is None:
raise ValueError(
"Provide either `prompt` or `prior_token_ids`. Cannot leave both `prompt` and `prior_token_ids` undefined."
)
@@ -694,8 +689,8 @@ class GlmImagePipeline(DiffusionPipeline):
"for i2i mode, as the images are needed for VAE encoding to build the KV cache."
)
if prior_token_ids is not None and prompt_embeds is None:
raise ValueError("`prompt_embeds` must also be provided with `prior_token_ids`.")
if prior_token_ids is not None and prompt_embeds is None and prompt is None:
raise ValueError("`prompt_embeds` or `prompt` must also be provided with `prior_token_ids`.")
@property
def guidance_scale(self):

View File

@@ -18,7 +18,6 @@ import re
from copy import deepcopy
from typing import Any, Callable, Dict, List, Optional, Union
import ftfy
import torch
from transformers import AutoTokenizer, UMT5EncoderModel

View File

@@ -18,7 +18,6 @@ import re
from copy import deepcopy
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import ftfy
import PIL
import torch
from transformers import AutoTokenizer, UMT5EncoderModel

View File

@@ -19,7 +19,6 @@ import re
from copy import deepcopy
from typing import Any, Callable, Dict, List, Optional, Union
import ftfy
import torch
from PIL import Image
from transformers import AutoTokenizer, UMT5EncoderModel

View File

@@ -41,7 +41,7 @@ class GGUFQuantizer(DiffusersQuantizer):
self.compute_dtype = quantization_config.compute_dtype
self.pre_quantized = quantization_config.pre_quantized
self.modules_to_not_convert = quantization_config.modules_to_not_convert
self.modules_to_not_convert = quantization_config.modules_to_not_convert or []
if not isinstance(self.modules_to_not_convert, list):
self.modules_to_not_convert = [self.modules_to_not_convert]

View File

@@ -15,7 +15,7 @@
# DISCLAIMER: This file is strongly influenced by https://github.com/LuChengTHU/dpm-solver and https://github.com/NVlabs/edm
import math
from typing import List, Optional, Tuple, Union
from typing import List, Literal, Optional, Tuple, Union
import numpy as np
import torch
@@ -51,13 +51,15 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
schedule was incorporated in this model: https://huggingface.co/stabilityai/cosxl.
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
solver_order (`int`, defaults to 2):
The DPMSolver order which can be `1` or `2` or `3`. It is recommended to use `solver_order=2` for guided
sampling, and `solver_order=3` for unconditional sampling.
prediction_type (`str`, defaults to `epsilon`, *optional*):
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
Video](https://huggingface.co/papers/2210.02303) paper).
rho (`float`, *optional*, defaults to 7.0):
The rho parameter in the Karras sigma schedule. This was set to 7.0 in the EDM paper [1].
solver_order (`int`, defaults to 2):
The DPMSolver order which can be `1` or `2` or `3`. It is recommended to use `solver_order=2` for guided
sampling, and `solver_order=3` for unconditional sampling.
thresholding (`bool`, defaults to `False`):
Whether to use the "dynamic thresholding" method. This is unsuitable for latent-space diffusion models such
as Stable Diffusion.
@@ -94,19 +96,19 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
sigma_min: float = 0.002,
sigma_max: float = 80.0,
sigma_data: float = 0.5,
sigma_schedule: str = "karras",
sigma_schedule: Literal["karras", "exponential"] = "karras",
num_train_timesteps: int = 1000,
prediction_type: str = "epsilon",
prediction_type: Literal["epsilon", "sample", "v_prediction"] = "epsilon",
rho: float = 7.0,
solver_order: int = 2,
thresholding: bool = False,
dynamic_thresholding_ratio: float = 0.995,
sample_max_value: float = 1.0,
algorithm_type: str = "dpmsolver++",
solver_type: str = "midpoint",
algorithm_type: Literal["dpmsolver++", "sde-dpmsolver++"] = "dpmsolver++",
solver_type: Literal["midpoint", "heun"] = "midpoint",
lower_order_final: bool = True,
euler_at_final: bool = False,
final_sigmas_type: Optional[str] = "zero", # "zero", "sigma_min"
final_sigmas_type: Optional[Literal["zero", "sigma_min"]] = "zero", # "zero", "sigma_min"
):
# settings for DPM-Solver
if algorithm_type not in ["dpmsolver++", "sde-dpmsolver++"]:
@@ -145,19 +147,19 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
@property
def init_noise_sigma(self):
def init_noise_sigma(self) -> float:
# standard deviation of the initial noise distribution
return (self.config.sigma_max**2 + 1) ** 0.5
@property
def step_index(self):
def step_index(self) -> int:
"""
The index counter for current timestep. It will increase 1 after each scheduler step.
"""
return self._step_index
@property
def begin_index(self):
def begin_index(self) -> int:
"""
The index for the first timestep. It should be set from pipeline with `set_begin_index` method.
"""
@@ -274,7 +276,11 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
self.is_scale_input_called = True
return sample
def set_timesteps(self, num_inference_steps: int = None, device: Union[str, torch.device] = None):
def set_timesteps(
self,
num_inference_steps: int = None,
device: Optional[Union[str, torch.device]] = None,
):
"""
Sets the discrete timesteps used for the diffusion chain (to be run before inference).
@@ -460,13 +466,12 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
def _sigma_to_alpha_sigma_t(self, sigma):
alpha_t = torch.tensor(1) # Inputs are pre-scaled before going into unet, so alpha_t = 1
sigma_t = sigma
return alpha_t, sigma_t
def convert_model_output(
self,
model_output: torch.Tensor,
sample: torch.Tensor = None,
sample: torch.Tensor,
) -> torch.Tensor:
"""
Convert the model output to the corresponding type the DPMSolver/DPMSolver++ algorithm needs. DPM-Solver is
@@ -497,7 +502,7 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
def dpm_solver_first_order_update(
self,
model_output: torch.Tensor,
sample: torch.Tensor = None,
sample: torch.Tensor,
noise: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
@@ -508,6 +513,8 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
The direct output from the learned diffusion model.
sample (`torch.Tensor`):
A current instance of a sample created by the diffusion process.
noise (`torch.Tensor`, *optional*):
The noise tensor to add to the original samples.
Returns:
`torch.Tensor`:
@@ -538,7 +545,7 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
def multistep_dpm_solver_second_order_update(
self,
model_output_list: List[torch.Tensor],
sample: torch.Tensor = None,
sample: torch.Tensor,
noise: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
@@ -549,6 +556,8 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
The direct outputs from learned diffusion model at current and latter timesteps.
sample (`torch.Tensor`):
A current instance of a sample created by the diffusion process.
noise (`torch.Tensor`, *optional*):
The noise tensor to add to the original samples.
Returns:
`torch.Tensor`:
@@ -609,7 +618,7 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
def multistep_dpm_solver_third_order_update(
self,
model_output_list: List[torch.Tensor],
sample: torch.Tensor = None,
sample: torch.Tensor,
) -> torch.Tensor:
"""
One step for the third-order multistep DPMSolver.
@@ -698,7 +707,7 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
return step_index
# Copied from diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler._init_step_index
def _init_step_index(self, timestep):
def _init_step_index(self, timestep: Union[int, torch.Tensor]) -> None:
"""
Initialize the step_index counter for the scheduler.
@@ -719,7 +728,7 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
model_output: torch.Tensor,
timestep: Union[int, torch.Tensor],
sample: torch.Tensor,
generator=None,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
) -> Union[SchedulerOutput, Tuple]:
"""
@@ -860,5 +869,5 @@ class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
c_in = 1 / ((sigma**2 + self.config.sigma_data**2) ** 0.5)
return c_in
def __len__(self):
def __len__(self) -> int:
return self.config.num_train_timesteps

View File

@@ -446,16 +446,17 @@ class ModelTesterMixin:
torch_device not in ["cuda", "xpu"],
reason="float16 and bfloat16 can only be used with an accelerator",
)
def test_keep_in_fp32_modules(self):
def test_keep_in_fp32_modules(self, tmp_path):
model = self.model_class(**self.get_init_dict())
fp32_modules = model._keep_in_fp32_modules
if fp32_modules is None or len(fp32_modules) == 0:
pytest.skip("Model does not have _keep_in_fp32_modules defined.")
# Test with float16
model.to(torch_device)
model.to(torch.float16)
# Save the model and reload with float16 dtype
# _keep_in_fp32_modules is only enforced during from_pretrained loading
model.save_pretrained(tmp_path)
model = self.model_class.from_pretrained(tmp_path, torch_dtype=torch.float16).to(torch_device)
for name, param in model.named_parameters():
if any(module_to_keep_in_fp32 in name.split(".") for module_to_keep_in_fp32 in fp32_modules):
@@ -470,7 +471,7 @@ class ModelTesterMixin:
)
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16], ids=["fp16", "bf16"])
@torch.no_grad()
def test_from_save_pretrained_dtype_inference(self, tmp_path, dtype):
def test_from_save_pretrained_dtype_inference(self, tmp_path, dtype, atol=1e-4, rtol=0):
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
fp32_modules = model._keep_in_fp32_modules or []
@@ -490,10 +491,6 @@ class ModelTesterMixin:
output = model(**inputs, return_dict=False)[0]
output_loaded = model_loaded(**inputs, return_dict=False)[0]
self._check_dtype_inference_output(output, output_loaded, dtype)
def _check_dtype_inference_output(self, output, output_loaded, dtype, atol=1e-4, rtol=0):
"""Check dtype inference output with configurable tolerance."""
assert_tensors_close(
output, output_loaded, atol=atol, rtol=rtol, msg=f"Loaded model output differs for {dtype}"
)

View File

@@ -176,15 +176,7 @@ class QuantizationTesterMixin:
model_quantized = self._create_quantized_model(config_kwargs)
model_quantized.to(torch_device)
# Get model dtype from first parameter
model_dtype = next(model_quantized.parameters()).dtype
inputs = self.get_dummy_inputs()
# Cast inputs to model dtype
inputs = {
k: v.to(model_dtype) if isinstance(v, torch.Tensor) and v.is_floating_point() else v
for k, v in inputs.items()
}
output = model_quantized(**inputs, return_dict=False)[0]
assert output is not None, "Model output is None"
@@ -229,6 +221,8 @@ class QuantizationTesterMixin:
init_lora_weights=False,
)
model.add_adapter(lora_config)
# Move LoRA adapter weights to device (they default to CPU)
model.to(torch_device)
inputs = self.get_dummy_inputs()
output = model(**inputs, return_dict=False)[0]
@@ -1021,9 +1015,6 @@ class GGUFTesterMixin(GGUFConfigMixin, QuantizationTesterMixin):
"""Test that dequantize() works correctly."""
self._test_dequantize({"compute_dtype": torch.bfloat16})
def test_gguf_quantized_layers(self):
self._test_quantized_layers({"compute_dtype": torch.bfloat16})
@is_quantization
@is_modelopt

View File

@@ -12,57 +12,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import pytest
import torch
from diffusers import WanTransformer3DModel
from diffusers.utils.torch_utils import randn_tensor
from ...testing_utils import (
enable_full_determinism,
torch_device,
from ...testing_utils import enable_full_determinism, torch_device
from ..testing_utils import (
AttentionTesterMixin,
BaseModelTesterConfig,
BitsAndBytesTesterMixin,
GGUFCompileTesterMixin,
GGUFTesterMixin,
MemoryTesterMixin,
ModelTesterMixin,
TorchAoTesterMixin,
TorchCompileTesterMixin,
TrainingTesterMixin,
)
from ..test_modeling_common import ModelTesterMixin, TorchCompileTesterMixin
enable_full_determinism()
class WanTransformer3DTests(ModelTesterMixin, unittest.TestCase):
model_class = WanTransformer3DModel
main_input_name = "hidden_states"
uses_custom_attn_processor = True
class WanTransformer3DTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return WanTransformer3DModel
@property
def dummy_input(self):
batch_size = 1
num_channels = 4
num_frames = 2
height = 16
width = 16
text_encoder_embedding_dim = 16
sequence_length = 12
def pretrained_model_name_or_path(self):
return "hf-internal-testing/tiny-wan22-transformer"
hidden_states = torch.randn((batch_size, num_channels, num_frames, height, width)).to(torch_device)
timestep = torch.randint(0, 1000, size=(batch_size,)).to(torch_device)
encoder_hidden_states = torch.randn((batch_size, sequence_length, text_encoder_embedding_dim)).to(torch_device)
@property
def output_shape(self) -> tuple[int, ...]:
return (4, 2, 16, 16)
@property
def input_shape(self) -> tuple[int, ...]:
return (4, 2, 16, 16)
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict[str, int | list[int] | tuple | str | bool]:
return {
"hidden_states": hidden_states,
"encoder_hidden_states": encoder_hidden_states,
"timestep": timestep,
}
@property
def input_shape(self):
return (4, 1, 16, 16)
@property
def output_shape(self):
return (4, 1, 16, 16)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"patch_size": (1, 2, 2),
"num_attention_heads": 2,
"attention_head_dim": 12,
@@ -76,16 +76,160 @@ class WanTransformer3DTests(ModelTesterMixin, unittest.TestCase):
"qk_norm": "rms_norm_across_heads",
"rope_max_seq_len": 32,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def get_dummy_inputs(self) -> dict[str, torch.Tensor]:
batch_size = 1
num_channels = 4
num_frames = 2
height = 16
width = 16
text_encoder_embedding_dim = 16
sequence_length = 12
return {
"hidden_states": randn_tensor(
(batch_size, num_channels, num_frames, height, width),
generator=self.generator,
device=torch_device,
),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, text_encoder_embedding_dim),
generator=self.generator,
device=torch_device,
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
}
class TestWanTransformer3D(WanTransformer3DTesterConfig, ModelTesterMixin):
"""Core model tests for Wan Transformer 3D."""
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16], ids=["fp16", "bf16"])
def test_from_save_pretrained_dtype_inference(self, tmp_path, dtype):
# Skip: fp16/bf16 require very high atol to pass, providing little signal.
# Dtype preservation is already tested by test_from_save_pretrained_dtype and test_keep_in_fp32_modules.
pytest.skip("Tolerance requirements too high for meaningful test")
class TestWanTransformer3DMemory(WanTransformer3DTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Wan Transformer 3D."""
class TestWanTransformer3DTraining(WanTransformer3DTesterConfig, TrainingTesterMixin):
"""Training tests for Wan Transformer 3D."""
def test_gradient_checkpointing_is_applied(self):
expected_set = {"WanTransformer3DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
class WanTransformerCompileTests(TorchCompileTesterMixin, unittest.TestCase):
model_class = WanTransformer3DModel
class TestWanTransformer3DAttention(WanTransformer3DTesterConfig, AttentionTesterMixin):
"""Attention processor tests for Wan Transformer 3D."""
def prepare_init_args_and_inputs_for_common(self):
return WanTransformer3DTests().prepare_init_args_and_inputs_for_common()
class TestWanTransformer3DCompile(WanTransformer3DTesterConfig, TorchCompileTesterMixin):
"""Torch compile tests for Wan Transformer 3D."""
class TestWanTransformer3DBitsAndBytes(WanTransformer3DTesterConfig, BitsAndBytesTesterMixin):
"""BitsAndBytes quantization tests for Wan Transformer 3D."""
@property
def torch_dtype(self):
return torch.float16
def get_dummy_inputs(self):
"""Override to provide inputs matching the tiny Wan model dimensions."""
return {
"hidden_states": randn_tensor(
(1, 36, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanTransformer3DTorchAo(WanTransformer3DTesterConfig, TorchAoTesterMixin):
"""TorchAO quantization tests for Wan Transformer 3D."""
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the tiny Wan model dimensions."""
return {
"hidden_states": randn_tensor(
(1, 36, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanTransformer3DGGUF(WanTransformer3DTesterConfig, GGUFTesterMixin):
"""GGUF quantization tests for Wan Transformer 3D."""
@property
def gguf_filename(self):
return "https://huggingface.co/QuantStack/Wan2.2-I2V-A14B-GGUF/blob/main/LowNoise/Wan2.2-I2V-A14B-LowNoise-Q2_K.gguf"
@property
def torch_dtype(self):
return torch.bfloat16
def _create_quantized_model(self, config_kwargs=None, **extra_kwargs):
return super()._create_quantized_model(
config_kwargs, config="Wan-AI/Wan2.2-I2V-A14B-Diffusers", subfolder="transformer", **extra_kwargs
)
def get_dummy_inputs(self):
"""Override to provide inputs matching the real Wan I2V model dimensions.
Wan 2.2 I2V: in_channels=36, text_dim=4096
"""
return {
"hidden_states": randn_tensor(
(1, 36, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanTransformer3DGGUFCompile(WanTransformer3DTesterConfig, GGUFCompileTesterMixin):
"""GGUF + compile tests for Wan Transformer 3D."""
@property
def gguf_filename(self):
return "https://huggingface.co/QuantStack/Wan2.2-I2V-A14B-GGUF/blob/main/LowNoise/Wan2.2-I2V-A14B-LowNoise-Q2_K.gguf"
@property
def torch_dtype(self):
return torch.bfloat16
def _create_quantized_model(self, config_kwargs=None, **extra_kwargs):
return super()._create_quantized_model(
config_kwargs, config="Wan-AI/Wan2.2-I2V-A14B-Diffusers", subfolder="transformer", **extra_kwargs
)
def get_dummy_inputs(self):
"""Override to provide inputs matching the real Wan I2V model dimensions.
Wan 2.2 I2V: in_channels=36, text_dim=4096
"""
return {
"hidden_states": randn_tensor(
(1, 36, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}

View File

@@ -12,76 +12,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import pytest
import torch
from diffusers import WanAnimateTransformer3DModel
from diffusers.utils.torch_utils import randn_tensor
from ...testing_utils import (
enable_full_determinism,
torch_device,
from ...testing_utils import enable_full_determinism, torch_device
from ..testing_utils import (
AttentionTesterMixin,
BaseModelTesterConfig,
BitsAndBytesTesterMixin,
GGUFCompileTesterMixin,
GGUFTesterMixin,
MemoryTesterMixin,
ModelTesterMixin,
TorchAoTesterMixin,
TorchCompileTesterMixin,
TrainingTesterMixin,
)
from ..test_modeling_common import ModelTesterMixin, TorchCompileTesterMixin
enable_full_determinism()
class WanAnimateTransformer3DTests(ModelTesterMixin, unittest.TestCase):
model_class = WanAnimateTransformer3DModel
main_input_name = "hidden_states"
uses_custom_attn_processor = True
class WanAnimateTransformer3DTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return WanAnimateTransformer3DModel
@property
def dummy_input(self):
batch_size = 1
num_channels = 4
num_frames = 20 # To make the shapes work out; for complicated reasons we want 21 to divide num_frames + 1
height = 16
width = 16
text_encoder_embedding_dim = 16
sequence_length = 12
clip_seq_len = 12
clip_dim = 16
inference_segment_length = 77 # The inference segment length in the full Wan2.2-Animate-14B model
face_height = 16 # Should be square and match `motion_encoder_size` below
face_width = 16
hidden_states = torch.randn((batch_size, 2 * num_channels + 4, num_frames + 1, height, width)).to(torch_device)
timestep = torch.randint(0, 1000, size=(batch_size,)).to(torch_device)
encoder_hidden_states = torch.randn((batch_size, sequence_length, text_encoder_embedding_dim)).to(torch_device)
clip_ref_features = torch.randn((batch_size, clip_seq_len, clip_dim)).to(torch_device)
pose_latents = torch.randn((batch_size, num_channels, num_frames, height, width)).to(torch_device)
face_pixel_values = torch.randn((batch_size, 3, inference_segment_length, face_height, face_width)).to(
torch_device
)
return {
"hidden_states": hidden_states,
"timestep": timestep,
"encoder_hidden_states": encoder_hidden_states,
"encoder_hidden_states_image": clip_ref_features,
"pose_hidden_states": pose_latents,
"face_pixel_values": face_pixel_values,
}
def pretrained_model_name_or_path(self):
return "hf-internal-testing/tiny-wan-animate-transformer"
@property
def input_shape(self):
return (12, 1, 16, 16)
def output_shape(self) -> tuple[int, ...]:
# Output has fewer channels than input (4 vs 12)
return (4, 21, 16, 16)
@property
def output_shape(self):
return (4, 1, 16, 16)
def input_shape(self) -> tuple[int, ...]:
return (12, 21, 16, 16)
def prepare_init_args_and_inputs_for_common(self):
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict[str, int | list[int] | tuple | str | bool | float | dict]:
# Use custom channel sizes since the default Wan Animate channel sizes will cause the motion encoder to
# contain the vast majority of the parameters in the test model
channel_sizes = {"4": 16, "8": 16, "16": 16}
init_dict = {
return {
"patch_size": (1, 2, 2),
"num_attention_heads": 2,
"attention_head_dim": 12,
@@ -105,22 +91,219 @@ class WanAnimateTransformer3DTests(ModelTesterMixin, unittest.TestCase):
"face_encoder_num_heads": 2,
"inject_face_latents_blocks": 2,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def get_dummy_inputs(self) -> dict[str, torch.Tensor]:
batch_size = 1
num_channels = 4
num_frames = 20 # To make the shapes work out; for complicated reasons we want 21 to divide num_frames + 1
height = 16
width = 16
text_encoder_embedding_dim = 16
sequence_length = 12
clip_seq_len = 12
clip_dim = 16
inference_segment_length = 77 # The inference segment length in the full Wan2.2-Animate-14B model
face_height = 16 # Should be square and match `motion_encoder_size`
face_width = 16
return {
"hidden_states": randn_tensor(
(batch_size, 2 * num_channels + 4, num_frames + 1, height, width),
generator=self.generator,
device=torch_device,
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, text_encoder_embedding_dim),
generator=self.generator,
device=torch_device,
),
"encoder_hidden_states_image": randn_tensor(
(batch_size, clip_seq_len, clip_dim),
generator=self.generator,
device=torch_device,
),
"pose_hidden_states": randn_tensor(
(batch_size, num_channels, num_frames, height, width),
generator=self.generator,
device=torch_device,
),
"face_pixel_values": randn_tensor(
(batch_size, 3, inference_segment_length, face_height, face_width),
generator=self.generator,
device=torch_device,
),
}
class TestWanAnimateTransformer3D(WanAnimateTransformer3DTesterConfig, ModelTesterMixin):
"""Core model tests for Wan Animate Transformer 3D."""
def test_output(self):
# Override test_output because the transformer output is expected to have less channels
# than the main transformer input.
expected_output_shape = (1, 4, 21, 16, 16)
super().test_output(expected_output_shape=expected_output_shape)
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16], ids=["fp16", "bf16"])
def test_from_save_pretrained_dtype_inference(self, tmp_path, dtype):
# Skip: fp16/bf16 require very high atol (~1e-2) to pass, providing little signal.
# Dtype preservation is already tested by test_from_save_pretrained_dtype and test_keep_in_fp32_modules.
pytest.skip("Tolerance requirements too high for meaningful test")
class TestWanAnimateTransformer3DMemory(WanAnimateTransformer3DTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Wan Animate Transformer 3D."""
class TestWanAnimateTransformer3DTraining(WanAnimateTransformer3DTesterConfig, TrainingTesterMixin):
"""Training tests for Wan Animate Transformer 3D."""
def test_gradient_checkpointing_is_applied(self):
expected_set = {"WanAnimateTransformer3DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
# Override test_output because the transformer output is expected to have less channels than the main transformer
# input.
def test_output(self):
expected_output_shape = (1, 4, 21, 16, 16)
super().test_output(expected_output_shape=expected_output_shape)
class TestWanAnimateTransformer3DAttention(WanAnimateTransformer3DTesterConfig, AttentionTesterMixin):
"""Attention processor tests for Wan Animate Transformer 3D."""
class WanAnimateTransformerCompileTests(TorchCompileTesterMixin, unittest.TestCase):
model_class = WanAnimateTransformer3DModel
class TestWanAnimateTransformer3DCompile(WanAnimateTransformer3DTesterConfig, TorchCompileTesterMixin):
"""Torch compile tests for Wan Animate Transformer 3D."""
def prepare_init_args_and_inputs_for_common(self):
return WanAnimateTransformer3DTests().prepare_init_args_and_inputs_for_common()
def test_torch_compile_recompilation_and_graph_break(self):
# Skip: F.pad with mode="replicate" in WanAnimateFaceEncoder triggers importlib.import_module
# internally, which dynamo doesn't support tracing through.
pytest.skip("F.pad with replicate mode triggers unsupported import in torch.compile")
class TestWanAnimateTransformer3DBitsAndBytes(WanAnimateTransformer3DTesterConfig, BitsAndBytesTesterMixin):
"""BitsAndBytes quantization tests for Wan Animate Transformer 3D."""
@property
def torch_dtype(self):
return torch.float16
def get_dummy_inputs(self):
"""Override to provide inputs matching the tiny Wan Animate model dimensions."""
return {
"hidden_states": randn_tensor(
(1, 36, 21, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states_image": randn_tensor(
(1, 257, 1280), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"pose_hidden_states": randn_tensor(
(1, 16, 20, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"face_pixel_values": randn_tensor(
(1, 3, 77, 512, 512), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanAnimateTransformer3DTorchAo(WanAnimateTransformer3DTesterConfig, TorchAoTesterMixin):
"""TorchAO quantization tests for Wan Animate Transformer 3D."""
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the tiny Wan Animate model dimensions."""
return {
"hidden_states": randn_tensor(
(1, 36, 21, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states_image": randn_tensor(
(1, 257, 1280), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"pose_hidden_states": randn_tensor(
(1, 16, 20, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"face_pixel_values": randn_tensor(
(1, 3, 77, 512, 512), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanAnimateTransformer3DGGUF(WanAnimateTransformer3DTesterConfig, GGUFTesterMixin):
"""GGUF quantization tests for Wan Animate Transformer 3D."""
@property
def gguf_filename(self):
return "https://huggingface.co/QuantStack/Wan2.2-Animate-14B-GGUF/blob/main/Wan2.2-Animate-14B-Q2_K.gguf"
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the real Wan Animate model dimensions.
Wan 2.2 Animate: in_channels=36 (2*16+4), text_dim=4096, image_dim=1280
"""
return {
"hidden_states": randn_tensor(
(1, 36, 21, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states_image": randn_tensor(
(1, 257, 1280), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"pose_hidden_states": randn_tensor(
(1, 16, 20, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"face_pixel_values": randn_tensor(
(1, 3, 77, 512, 512), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanAnimateTransformer3DGGUFCompile(WanAnimateTransformer3DTesterConfig, GGUFCompileTesterMixin):
"""GGUF + compile tests for Wan Animate Transformer 3D."""
@property
def gguf_filename(self):
return "https://huggingface.co/QuantStack/Wan2.2-Animate-14B-GGUF/blob/main/Wan2.2-Animate-14B-Q2_K.gguf"
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the real Wan Animate model dimensions.
Wan 2.2 Animate: in_channels=36 (2*16+4), text_dim=4096, image_dim=1280
"""
return {
"hidden_states": randn_tensor(
(1, 36, 21, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states_image": randn_tensor(
(1, 257, 1280), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"pose_hidden_states": randn_tensor(
(1, 16, 20, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"face_pixel_values": randn_tensor(
(1, 3, 77, 512, 512), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}

View File

@@ -0,0 +1,271 @@
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import torch
from diffusers import WanVACETransformer3DModel
from diffusers.utils.torch_utils import randn_tensor
from ...testing_utils import enable_full_determinism, torch_device
from ..testing_utils import (
AttentionTesterMixin,
BaseModelTesterConfig,
BitsAndBytesTesterMixin,
GGUFCompileTesterMixin,
GGUFTesterMixin,
MemoryTesterMixin,
ModelTesterMixin,
TorchAoTesterMixin,
TorchCompileTesterMixin,
TrainingTesterMixin,
)
enable_full_determinism()
class WanVACETransformer3DTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return WanVACETransformer3DModel
@property
def pretrained_model_name_or_path(self):
return "hf-internal-testing/tiny-wan-vace-transformer"
@property
def output_shape(self) -> tuple[int, ...]:
return (16, 2, 16, 16)
@property
def input_shape(self) -> tuple[int, ...]:
return (16, 2, 16, 16)
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict[str, int | list[int] | tuple | str | bool | None]:
return {
"patch_size": (1, 2, 2),
"num_attention_heads": 2,
"attention_head_dim": 12,
"in_channels": 16,
"out_channels": 16,
"text_dim": 32,
"freq_dim": 256,
"ffn_dim": 32,
"num_layers": 4,
"cross_attn_norm": True,
"qk_norm": "rms_norm_across_heads",
"rope_max_seq_len": 32,
"vace_layers": [0, 2],
"vace_in_channels": 48, # 3 * in_channels = 3 * 16 = 48
}
def get_dummy_inputs(self) -> dict[str, torch.Tensor]:
batch_size = 1
num_channels = 16
num_frames = 2
height = 16
width = 16
text_encoder_embedding_dim = 32
sequence_length = 12
# VACE requires control_hidden_states with vace_in_channels (3 * in_channels)
vace_in_channels = 48
return {
"hidden_states": randn_tensor(
(batch_size, num_channels, num_frames, height, width),
generator=self.generator,
device=torch_device,
),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, text_encoder_embedding_dim),
generator=self.generator,
device=torch_device,
),
"control_hidden_states": randn_tensor(
(batch_size, vace_in_channels, num_frames, height, width),
generator=self.generator,
device=torch_device,
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
}
class TestWanVACETransformer3D(WanVACETransformer3DTesterConfig, ModelTesterMixin):
"""Core model tests for Wan VACE Transformer 3D."""
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16], ids=["fp16", "bf16"])
def test_from_save_pretrained_dtype_inference(self, tmp_path, dtype):
# Skip: fp16/bf16 require very high atol to pass, providing little signal.
# Dtype preservation is already tested by test_from_save_pretrained_dtype and test_keep_in_fp32_modules.
pytest.skip("Tolerance requirements too high for meaningful test")
def test_model_parallelism(self, tmp_path):
# Skip: Device mismatch between cuda:0 and cuda:1 in VACE control flow
pytest.skip("Model parallelism not yet supported for WanVACE")
class TestWanVACETransformer3DMemory(WanVACETransformer3DTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Wan VACE Transformer 3D."""
class TestWanVACETransformer3DTraining(WanVACETransformer3DTesterConfig, TrainingTesterMixin):
"""Training tests for Wan VACE Transformer 3D."""
def test_gradient_checkpointing_is_applied(self):
expected_set = {"WanVACETransformer3DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
class TestWanVACETransformer3DAttention(WanVACETransformer3DTesterConfig, AttentionTesterMixin):
"""Attention processor tests for Wan VACE Transformer 3D."""
class TestWanVACETransformer3DCompile(WanVACETransformer3DTesterConfig, TorchCompileTesterMixin):
"""Torch compile tests for Wan VACE Transformer 3D."""
def test_torch_compile_repeated_blocks(self):
# WanVACE has two block types (WanTransformerBlock and WanVACETransformerBlock),
# so we need recompile_limit=2 instead of the default 1.
import torch._dynamo
import torch._inductor.utils
init_dict = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**init_dict).to(torch_device)
model.eval()
model.compile_repeated_blocks(fullgraph=True)
with (
torch._inductor.utils.fresh_inductor_cache(),
torch._dynamo.config.patch(recompile_limit=2),
):
_ = model(**inputs_dict)
_ = model(**inputs_dict)
class TestWanVACETransformer3DBitsAndBytes(WanVACETransformer3DTesterConfig, BitsAndBytesTesterMixin):
"""BitsAndBytes quantization tests for Wan VACE Transformer 3D."""
@property
def torch_dtype(self):
return torch.float16
def get_dummy_inputs(self):
"""Override to provide inputs matching the tiny Wan VACE model dimensions."""
return {
"hidden_states": randn_tensor(
(1, 16, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"control_hidden_states": randn_tensor(
(1, 96, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanVACETransformer3DTorchAo(WanVACETransformer3DTesterConfig, TorchAoTesterMixin):
"""TorchAO quantization tests for Wan VACE Transformer 3D."""
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the tiny Wan VACE model dimensions."""
return {
"hidden_states": randn_tensor(
(1, 16, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"control_hidden_states": randn_tensor(
(1, 96, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanVACETransformer3DGGUF(WanVACETransformer3DTesterConfig, GGUFTesterMixin):
"""GGUF quantization tests for Wan VACE Transformer 3D."""
@property
def gguf_filename(self):
return "https://huggingface.co/QuantStack/Wan2.1_14B_VACE-GGUF/blob/main/Wan2.1_14B_VACE-Q3_K_S.gguf"
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the real Wan VACE model dimensions.
Wan 2.1 VACE: in_channels=16, text_dim=4096, vace_in_channels=96
"""
return {
"hidden_states": randn_tensor(
(1, 16, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"control_hidden_states": randn_tensor(
(1, 96, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}
class TestWanVACETransformer3DGGUFCompile(WanVACETransformer3DTesterConfig, GGUFCompileTesterMixin):
"""GGUF + compile tests for Wan VACE Transformer 3D."""
@property
def gguf_filename(self):
return "https://huggingface.co/QuantStack/Wan2.1_14B_VACE-GGUF/blob/main/Wan2.1_14B_VACE-Q3_K_S.gguf"
@property
def torch_dtype(self):
return torch.bfloat16
def get_dummy_inputs(self):
"""Override to provide inputs matching the real Wan VACE model dimensions.
Wan 2.1 VACE: in_channels=16, text_dim=4096, vace_in_channels=96
"""
return {
"hidden_states": randn_tensor(
(1, 16, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"encoder_hidden_states": randn_tensor(
(1, 512, 4096), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"control_hidden_states": randn_tensor(
(1, 96, 2, 64, 64), generator=self.generator, device=torch_device, dtype=self.torch_dtype
),
"timestep": torch.tensor([1.0]).to(torch_device, self.torch_dtype),
}

View File

@@ -30,6 +30,102 @@ from ...testing_utils import torch_device
from ..test_modular_pipelines_common import ModularGuiderTesterMixin, ModularPipelineTesterMixin
QWEN_IMAGE_TEXT2IMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("denoise.input", "QwenImageTextInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.denoise", "QwenImageDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"image2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_img2img_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.denoise", "QwenImageDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"inpainting": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageInpaintProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_inpaint_latents.add_noise_to_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_inpaint_latents.create_mask_latents", "QwenImageCreateMaskLatentsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.denoise", "QwenImageInpaintDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageInpaintProcessImagesOutputStep"),
],
"controlnet_text2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("controlnet_vae_encoder", "QwenImageControlNetVaeEncoderStep"),
("denoise.input", "QwenImageTextInputsStep"),
("denoise.controlnet_input", "QwenImageControlNetInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.controlnet_before_denoise", "QwenImageControlNetBeforeDenoiserStep"),
("denoise.controlnet_denoise", "QwenImageControlNetDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"controlnet_image2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("controlnet_vae_encoder", "QwenImageControlNetVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.controlnet_input", "QwenImageControlNetInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_img2img_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.controlnet_before_denoise", "QwenImageControlNetBeforeDenoiserStep"),
("denoise.controlnet_denoise", "QwenImageControlNetDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"controlnet_inpainting": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageInpaintProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("controlnet_vae_encoder", "QwenImageControlNetVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.controlnet_input", "QwenImageControlNetInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_inpaint_latents.add_noise_to_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_inpaint_latents.create_mask_latents", "QwenImageCreateMaskLatentsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.controlnet_before_denoise", "QwenImageControlNetBeforeDenoiserStep"),
("denoise.controlnet_denoise", "QwenImageInpaintControlNetDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageInpaintProcessImagesOutputStep"),
],
}
class TestQwenImageModularPipelineFast(ModularPipelineTesterMixin, ModularGuiderTesterMixin):
pipeline_class = QwenImageModularPipeline
pipeline_blocks_class = QwenImageAutoBlocks
@@ -37,6 +133,7 @@ class TestQwenImageModularPipelineFast(ModularPipelineTesterMixin, ModularGuider
params = frozenset(["prompt", "height", "width", "negative_prompt", "attention_kwargs", "image", "mask_image"])
batch_params = frozenset(["prompt", "negative_prompt", "image", "mask_image"])
expected_workflow_blocks = QWEN_IMAGE_TEXT2IMAGE_WORKFLOWS
def get_dummy_inputs(self):
generator = self.get_generator()
@@ -55,6 +152,42 @@ class TestQwenImageModularPipelineFast(ModularPipelineTesterMixin, ModularGuider
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=5e-4)
QWEN_IMAGE_EDIT_WORKFLOWS = {
"edit": [
("text_encoder.resize", "QwenImageEditResizeStep"),
("text_encoder.encode", "QwenImageEditTextEncoderStep"),
("vae_encoder.resize", "QwenImageEditResizeStep"),
("vae_encoder.preprocess", "QwenImageEditProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsStep"),
("denoise.prepare_rope_inputs", "QwenImageEditRoPEInputsStep"),
("denoise.denoise", "QwenImageEditDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"edit_inpainting": [
("text_encoder.resize", "QwenImageEditResizeStep"),
("text_encoder.encode", "QwenImageEditTextEncoderStep"),
("vae_encoder.resize", "QwenImageEditResizeStep"),
("vae_encoder.preprocess", "QwenImageEditInpaintProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_inpaint_latents.add_noise_to_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_inpaint_latents.create_mask_latents", "QwenImageCreateMaskLatentsStep"),
("denoise.prepare_rope_inputs", "QwenImageEditRoPEInputsStep"),
("denoise.denoise", "QwenImageEditInpaintDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageInpaintProcessImagesOutputStep"),
],
}
class TestQwenImageEditModularPipelineFast(ModularPipelineTesterMixin, ModularGuiderTesterMixin):
pipeline_class = QwenImageEditModularPipeline
@@ -63,6 +196,7 @@ class TestQwenImageEditModularPipelineFast(ModularPipelineTesterMixin, ModularGu
params = frozenset(["prompt", "height", "width", "negative_prompt", "attention_kwargs", "image", "mask_image"])
batch_params = frozenset(["prompt", "negative_prompt", "image", "mask_image"])
expected_workflow_blocks = QWEN_IMAGE_EDIT_WORKFLOWS
def get_dummy_inputs(self):
generator = self.get_generator()

View File

@@ -267,6 +267,60 @@ class SDXLModularControlNetTesterMixin:
assert max_diff > 1e-2, "Output with CFG must be different from normal inference"
TEXT2IMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_union_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetUnionInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_controlnet_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
}
class TestSDXLModularPipelineFast(
SDXLModularTesterMixin,
SDXLModularIPAdapterTesterMixin,
@@ -291,6 +345,9 @@ class TestSDXLModularPipelineFast(
batch_params = frozenset(["prompt", "negative_prompt"])
expected_image_output_shape = (1, 3, 64, 64)
expected_workflow_blocks = TEXT2IMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
@@ -313,6 +370,63 @@ class TestSDXLModularPipelineFast(
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=3e-3)
IMAGE2IMAGE_WORKFLOWS = {
"image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_union_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetUnionInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_controlnet_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
}
class TestSDXLImg2ImgModularPipelineFast(
SDXLModularTesterMixin,
@@ -338,6 +452,7 @@ class TestSDXLImg2ImgModularPipelineFast(
)
batch_params = frozenset(["prompt", "negative_prompt", "image"])
expected_image_output_shape = (1, 3, 64, 64)
expected_workflow_blocks = IMAGE2IMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
@@ -366,6 +481,63 @@ class TestSDXLImg2ImgModularPipelineFast(
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=3e-3)
INPAINTING_WORKFLOWS = {
"inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLInpaintDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"controlnet_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLInpaintControlNetDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"controlnet_union_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetUnionInputStep"),
("denoise", "StableDiffusionXLInpaintControlNetDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"ip_adapter_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLInpaintDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"ip_adapter_controlnet_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLInpaintControlNetDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
}
class SDXLInpaintingModularPipelineFastTests(
SDXLModularTesterMixin,
@@ -392,6 +564,7 @@ class SDXLInpaintingModularPipelineFastTests(
)
batch_params = frozenset(["prompt", "negative_prompt", "image", "mask_image"])
expected_image_output_shape = (1, 3, 64, 64)
expected_workflow_blocks = INPAINTING_WORKFLOWS
def get_dummy_inputs(self, device, seed=0):
generator = self.get_generator(seed)

View File

@@ -100,6 +100,14 @@ class ModularPipelineTesterMixin:
"See existing pipeline tests for reference."
)
@property
def expected_workflow_blocks(self) -> dict:
raise NotImplementedError(
"You need to set the attribute `expected_workflow_blocks` in the child test class. "
"`expected_workflow_blocks` is a dictionary that maps workflow names to list of block names. "
"See existing pipeline tests for reference."
)
def setup_method(self):
# clean up the VRAM before each test
torch.compiler.reset()
@@ -341,6 +349,33 @@ class ModularPipelineTesterMixin:
assert torch.abs(image_slices[0] - image_slices[1]).max() < 1e-3
def test_workflow_map(self):
blocks = self.pipeline_blocks_class()
if blocks._workflow_map is None:
pytest.skip("Skipping test as _workflow_map is not set")
assert hasattr(self, "expected_workflow_blocks") and self.expected_workflow_blocks, (
"expected_workflow_blocks must be defined in the test class"
)
for workflow_name, expected_blocks in self.expected_workflow_blocks.items():
workflow_blocks = blocks.get_workflow(workflow_name)
actual_blocks = list(workflow_blocks.sub_blocks.items())
# Check that the number of blocks matches
assert len(actual_blocks) == len(expected_blocks), (
f"Workflow '{workflow_name}' has {len(actual_blocks)} blocks, "
f"expected {len(expected_blocks)}"
)
# Check that each block name and type matches
for i, ((actual_name, actual_block), (expected_name, expected_class_name)) in enumerate(
zip(actual_blocks, expected_blocks)
):
assert actual_block.__class__.__name__ == expected_class_name, (
f"Workflow '{workflow_name}': block '{actual_name}' has type "
f"{actual_block.__class__.__name__}, expected {expected_class_name}"
)
class ModularGuiderTesterMixin:
def test_guider_cfg(self, expected_max_diff=1e-2):

View File

@@ -281,6 +281,86 @@ class GlmImagePipelineFastTests(PipelineTesterMixin, unittest.TestCase):
# Should return 4 images (2 prompts × 2 images per prompt)
self.assertEqual(len(images), 4)
def test_prompt_with_prior_token_ids(self):
"""Test that prompt and prior_token_ids can be provided together.
When both are given, the AR generation step is skipped (prior_token_ids is used
directly) and prompt is used to generate prompt_embeds via the glyph encoder.
"""
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(device)
pipe.set_progress_bar_config(disable=None)
height, width = 32, 32
# Step 1: Run with prompt only to get prior_token_ids from AR model
generator = torch.Generator(device=device).manual_seed(0)
prior_token_ids, _, _ = pipe.generate_prior_tokens(
prompt="A photo of a cat",
height=height,
width=width,
device=torch.device(device),
generator=torch.Generator(device=device).manual_seed(0),
)
# Step 2: Run with both prompt and prior_token_ids — should not raise
generator = torch.Generator(device=device).manual_seed(0)
inputs_both = {
"prompt": "A photo of a cat",
"prior_token_ids": prior_token_ids,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 1.5,
"height": height,
"width": width,
"max_sequence_length": 16,
"output_type": "pt",
}
images = pipe(**inputs_both).images
self.assertEqual(len(images), 1)
self.assertEqual(images[0].shape, (3, 32, 32))
def test_check_inputs_rejects_invalid_combinations(self):
"""Test that check_inputs correctly rejects invalid input combinations."""
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(device)
height, width = 32, 32
# Neither prompt nor prior_token_ids → error
with self.assertRaises(ValueError):
pipe.check_inputs(
prompt=None,
height=height,
width=width,
callback_on_step_end_tensor_inputs=None,
prompt_embeds=torch.randn(1, 16, 32),
)
# prior_token_ids alone without prompt or prompt_embeds → error
with self.assertRaises(ValueError):
pipe.check_inputs(
prompt=None,
height=height,
width=width,
callback_on_step_end_tensor_inputs=None,
prior_token_ids=torch.randint(0, 100, (1, 64)),
)
# prompt + prompt_embeds together → error
with self.assertRaises(ValueError):
pipe.check_inputs(
prompt="A cat",
height=height,
width=width,
callback_on_step_end_tensor_inputs=None,
prompt_embeds=torch.randn(1, 16, 32),
)
@unittest.skip("Needs to be revisited.")
def test_encode_prompt_works_in_isolation(self):
pass

View File

@@ -168,7 +168,7 @@ def assert_tensors_close(
max_diff = abs_diff.max().item()
flat_idx = abs_diff.argmax().item()
max_idx = tuple(torch.unravel_index(torch.tensor(flat_idx), actual.shape).tolist())
max_idx = tuple(idx.item() for idx in torch.unravel_index(torch.tensor(flat_idx), actual.shape))
threshold = atol + rtol * expected.abs()
mismatched = (abs_diff > threshold).sum().item()