Compare commits

..

29 Commits

Author SHA1 Message Date
Sayak Paul
ba73d8252a Merge branch 'main' into requirements-custom-blocks 2026-02-26 08:32:02 +05:30
Sayak Paul
5316fe2a2f Merge branch 'main' into requirements-custom-blocks 2026-02-26 01:26:09 +05:30
Steven Liu
f274df4fef [docs] validation for custom blocks (#13156)
validation
2026-02-18 10:54:13 -08:00
Sayak Paul
2504341a20 Merge branch 'main' into requirements-custom-blocks 2026-02-18 23:55:27 +05:30
sayakpaul
e8d4612a25 reviewer feedback. 2026-02-17 23:37:04 +05:30
Sayak Paul
29273538d1 Apply suggestions from code review
Co-authored-by: Dhruv Nair <dhruv.nair@gmail.com>
2026-02-17 23:27:52 +05:30
sayakpaul
445c42eb82 Merge branch 'main' into requirements-custom-blocks 2026-02-17 23:26:22 +05:30
sayakpaul
79fa0e2bd5 resolve merge conflicts. 2026-02-16 11:06:09 +05:30
Sayak Paul
60e3284003 Merge branch 'main' into requirements-custom-blocks 2026-01-20 19:10:24 +05:30
sayakpaul
7b43d0e409 add tests 2026-01-20 09:29:32 +05:30
Sayak Paul
3879e32254 Merge branch 'main' into requirements-custom-blocks 2026-01-20 08:20:38 +05:30
sayakpaul
a88d11bc90 resolve conflicts. 2025-11-06 10:29:24 +05:30
Sayak Paul
a9165eb749 Merge branch 'main' into requirements-custom-blocks 2025-11-03 12:12:08 +05:30
Sayak Paul
eeb3445444 Merge branch 'main' into requirements-custom-blocks 2025-11-01 08:36:16 +05:30
Sayak Paul
5b7d0dfab6 Merge branch 'main' into requirements-custom-blocks 2025-10-29 16:30:46 +05:30
sayakpaul
1de4402c26 up 2025-10-27 13:55:17 +05:30
sayakpaul
024c2b9839 Merge branch 'main' into requirements-custom-blocks 2025-10-27 11:56:00 +05:30
Sayak Paul
35d8d97c02 Merge branch 'main' into requirements-custom-blocks 2025-10-22 21:57:45 +05:30
Sayak Paul
e52cabeff2 Merge branch 'main' into requirements-custom-blocks 2025-10-22 06:23:40 +05:30
Sayak Paul
2c4d73d72d Merge branch 'main' into requirements-custom-blocks 2025-10-21 01:54:38 +05:30
sayakpaul
046be83946 up 2025-10-02 15:43:44 +05:30
Sayak Paul
b7fba892f5 Merge branch 'main' into requirements-custom-blocks 2025-09-23 13:35:49 +05:30
Sayak Paul
ecbd907e76 Merge branch 'main' into requirements-custom-blocks 2025-09-12 15:47:22 +05:30
Sayak Paul
d159ae025d Merge branch 'main' into requirements-custom-blocks 2025-09-02 10:04:22 +05:30
Sayak Paul
756a1567f5 Merge branch 'main' into requirements-custom-blocks 2025-08-29 08:03:00 +02:00
Sayak Paul
d2731ababa Merge branch 'main' into requirements-custom-blocks 2025-08-21 07:59:54 +05:30
sayakpaul
37d3887194 unify. 2025-08-20 12:09:33 +05:30
sayakpaul
127e9a39d8 up 2025-08-20 11:51:15 +05:30
sayakpaul
12ceecf077 feat: implement requirements validation for custom blocks. 2025-08-20 11:04:28 +05:30
13 changed files with 221 additions and 345 deletions

View File

@@ -14,8 +14,4 @@
## AutoPipelineBlocks
[[autodoc]] diffusers.modular_pipelines.modular_pipeline.AutoPipelineBlocks
## ConditionalPipelineBlocks
[[autodoc]] diffusers.modular_pipelines.modular_pipeline.ConditionalPipelineBlocks
[[autodoc]] diffusers.modular_pipelines.modular_pipeline.AutoPipelineBlocks

View File

@@ -121,7 +121,7 @@ from diffusers.modular_pipelines import AutoPipelineBlocks
class AutoImageBlocks(AutoPipelineBlocks):
# List of sub-block classes to choose from
block_classes = [InpaintBlock, ImageToImageBlock, TextToImageBlock]
block_classes = [block_inpaint_cls, block_i2i_cls, block_t2i_cls]
# Names for each block in the same order
block_names = ["inpaint", "img2img", "text2img"]
# Trigger inputs that determine which block to run
@@ -129,8 +129,8 @@ class AutoImageBlocks(AutoPipelineBlocks):
# - "image" triggers img2img workflow (but only if mask is not provided)
# - if none of above, runs the text2img workflow (default)
block_trigger_inputs = ["mask", "image", None]
# Description is extremely important for AutoPipelineBlocks
@property
def description(self):
return (
"Pipeline generates images given different types of conditions!\n"
@@ -141,7 +141,7 @@ class AutoImageBlocks(AutoPipelineBlocks):
)
```
It is **very** important to include a `description` to avoid any confusion over how to run a block and what inputs are required. While [`~modular_pipelines.AutoPipelineBlocks`] are convenient, its conditional logic may be difficult to figure out if it isn't properly explained.
It is **very** important to include a `description` to avoid any confusion over how to run a block and what inputs are required. While [`~modular_pipelines.AutoPipelineBlocks`] are convenient, it's conditional logic may be difficult to figure out if it isn't properly explained.
Create an instance of `AutoImageBlocks`.
@@ -152,74 +152,5 @@ auto_blocks = AutoImageBlocks()
For more complex compositions, such as nested [`~modular_pipelines.AutoPipelineBlocks`] blocks when they're used as sub-blocks in larger pipelines, use the [`~modular_pipelines.SequentialPipelineBlocks.get_execution_blocks`] method to extract the a block that is actually run based on your input.
```py
auto_blocks.get_execution_blocks(mask=True)
```
## ConditionalPipelineBlocks
[`~modular_pipelines.AutoPipelineBlocks`] is a special case of [`~modular_pipelines.ConditionalPipelineBlocks`]. While [`~modular_pipelines.AutoPipelineBlocks`] selects blocks based on whether a trigger input is provided or not, [`~modular_pipelines.ConditionalPipelineBlocks`] is able to select a block based on custom selection logic provided in the `select_block` method.
Here is the same example written using [`~modular_pipelines.ConditionalPipelineBlocks`] directly:
```py
from diffusers.modular_pipelines import ConditionalPipelineBlocks
class AutoImageBlocks(ConditionalPipelineBlocks):
block_classes = [InpaintBlock, ImageToImageBlock, TextToImageBlock]
block_names = ["inpaint", "img2img", "text2img"]
block_trigger_inputs = ["mask", "image"]
default_block_name = "text2img"
@property
def description(self):
return (
"Pipeline generates images given different types of conditions!\n"
+ "This is an auto pipeline block that works for text2img, img2img and inpainting tasks.\n"
+ " - inpaint workflow is run when `mask` is provided.\n"
+ " - img2img workflow is run when `image` is provided (but only when `mask` is not provided).\n"
+ " - text2img workflow is run when neither `image` nor `mask` is provided.\n"
)
def select_block(self, mask=None, image=None) -> str | None:
if mask is not None:
return "inpaint"
if image is not None:
return "img2img"
return None # falls back to default_block_name ("text2img")
```
The inputs listed in `block_trigger_inputs` are passed as keyword arguments to `select_block()`. When `select_block` returns `None`, it falls back to `default_block_name`. If `default_block_name` is also `None`, the entire conditional block is skipped — this is useful for optional processing steps that should only run when specific inputs are provided.
## Workflows
Pipelines that contain conditional blocks ([`~modular_pipelines.AutoPipelineBlocks`] or [`~modular_pipelines.ConditionalPipelineBlocks]`) can support multiple workflows — for example, our SDXL modular pipeline supports a dozen workflows all in one pipeline. But this also means it can be confusing for users to know what workflows are supported and how to run them. For pipeline builders, it's useful to be able to extract only the blocks relevant to a specific workflow.
We recommend defining a `_workflow_map` to give each workflow a name and explicitly list the inputs it requires.
```py
from diffusers.modular_pipelines import SequentialPipelineBlocks
class MyPipelineBlocks(SequentialPipelineBlocks):
block_classes = [TextEncoderBlock, AutoImageBlocks, DecodeBlock]
block_names = ["text_encoder", "auto_image", "decode"]
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"image": True, "prompt": True},
"inpaint": {"mask": True, "image": True, "prompt": True},
}
```
All of our built-in modular pipelines come with pre-defined workflows. The `available_workflows` property lists all supported workflows:
```py
pipeline_blocks = MyPipelineBlocks()
pipeline_blocks.available_workflows
# ['text2image', 'image2image', 'inpaint']
```
Retrieve a specific workflow with `get_workflow` to inspect and debug a specific block that executes the workflow.
```py
pipeline_blocks.get_workflow("inpaint")
auto_blocks.get_execution_blocks("mask")
```

View File

@@ -332,4 +332,49 @@ Make your custom block work with Mellon's visual interface. See the [Mellon Cust
Browse the [Modular Diffusers Custom Blocks](https://huggingface.co/collections/diffusers/modular-diffusers-custom-blocks) collection for inspiration and ready-to-use blocks.
</hfoption>
</hfoptions>
</hfoptions>
## Dependencies
Declaring package dependencies in custom blocks prevents runtime import errors later on. Diffusers validates the dependencies and returns a warning if a package is missing or incompatible.
Set a `_requirements` attribute in your block class, mapping package names to version specifiers.
```py
from diffusers.modular_pipelines import PipelineBlock
class MyCustomBlock(PipelineBlock):
_requirements = {
"transformers": ">=4.44.0",
"sentencepiece": ">=0.2.0"
}
```
When there are blocks with different requirements, Diffusers merges their requirements.
```py
from diffusers.modular_pipelines import SequentialPipelineBlocks
class BlockA(PipelineBlock):
_requirements = {"transformers": ">=4.44.0"}
# ...
class BlockB(PipelineBlock):
_requirements = {"sentencepiece": ">=0.2.0"}
# ...
pipe = SequentialPipelineBlocks.from_blocks_dict({
"block_a": BlockA,
"block_b": BlockB,
})
```
When this block is saved with [`~ModularPipeline.save_pretrained`], the requirements are saved to the `modular_config.json` file. When this block is loaded, Diffusers checks each requirement against the current environment. If there is a mismatch or a package isn't found, Diffusers returns the following warning.
```md
# missing package
xyz-package was specified in the requirements but wasn't found in the current environment.
# version mismatch
xyz requirement 'specific-version' is not satisfied by the installed version 'actual-version'. Things might work unexpected.
```

View File

@@ -89,8 +89,6 @@ class CustomBlocksCommand(BaseDiffusersCLICommand):
# automap = self._create_automap(parent_class=parent_class, child_class=child_class)
# with open(CONFIG, "w") as f:
# json.dump(automap, f)
with open("requirements.txt", "w") as f:
f.write("")
def _choose_block(self, candidates, chosen=None):
for cls, base in candidates:

View File

@@ -648,28 +648,6 @@ class ConfigMixin:
)
return config_file
@classmethod
def _get_dataclass_from_config(cls, config_dict: dict[str, Any]):
sig = inspect.signature(cls.__init__)
fields = []
for name, param in sig.parameters.items():
if name == "self" or name == "kwargs" or name in cls.ignore_for_config:
continue
annotation = param.annotation if param.annotation is not inspect.Parameter.empty else Any
if param.default is not inspect.Parameter.empty:
fields.append((name, annotation, dataclasses.field(default=param.default)))
else:
fields.append((name, annotation))
dc_cls = dataclasses.make_dataclass(
f"{cls.__name__}Config",
fields,
frozen=True,
)
valid_fields = {f.name for f in dataclasses.fields(dc_cls)}
init_kwargs = {k: v for k, v in config_dict.items() if k in valid_fields}
return dc_cls(**init_kwargs)
def register_to_config(init):
r"""

View File

@@ -856,7 +856,7 @@ def _convert_kohya_flux_lora_to_diffusers(state_dict):
)
state_dict = {k: v for k, v in state_dict.items() if not k.startswith("text_encoders.t5xxl.transformer.")}
has_diffb = any("diff_b" in k and k.startswith(("lora_unet_", "lora_te_", "lora_te1_")) for k in state_dict)
has_diffb = any("diff_b" in k and k.startswith(("lora_unet_", "lora_te_")) for k in state_dict)
if has_diffb:
zero_status_diff_b = state_dict_all_zero(state_dict, ".diff_b")
if zero_status_diff_b:
@@ -895,7 +895,7 @@ def _convert_kohya_flux_lora_to_diffusers(state_dict):
state_dict = {
_custom_replace(k, limit_substrings): v
for k, v in state_dict.items()
if k.startswith(("lora_unet_", "lora_te_", "lora_te1_"))
if k.startswith(("lora_unet_", "lora_te_"))
}
if any("text_projection" in k for k in state_dict):

View File

@@ -733,7 +733,7 @@ def _wrapped_flash_attn_3(
) -> tuple[torch.Tensor, torch.Tensor]:
# Hardcoded for now because pytorch does not support tuple/int type hints
window_size = (-1, -1)
result = flash_attn_3_func(
out, lse, *_ = flash_attn_3_func(
q=q,
k=k,
v=v,
@@ -750,9 +750,7 @@ def _wrapped_flash_attn_3(
pack_gqa=pack_gqa,
deterministic=deterministic,
sm_margin=sm_margin,
return_attn_probs=True,
)
out, lse, *_ = result
lse = lse.permute(0, 2, 1)
return out, lse
@@ -2703,7 +2701,7 @@ def _flash_varlen_attention_3(
key_packed = torch.cat(key_valid, dim=0)
value_packed = torch.cat(value_valid, dim=0)
result = flash_attn_3_varlen_func(
out, lse, *_ = flash_attn_3_varlen_func(
q=query_packed,
k=key_packed,
v=value_packed,
@@ -2713,13 +2711,7 @@ def _flash_varlen_attention_3(
max_seqlen_k=max_seqlen_k,
softmax_scale=scale,
causal=is_causal,
return_attn_probs=return_lse,
)
if isinstance(result, tuple):
out, lse, *_ = result
else:
out = result
lse = None
out = out.unflatten(0, (batch_size, -1))
return (out, lse) if return_lse else out

View File

@@ -40,6 +40,7 @@ from .modular_pipeline_utils import (
InputParam,
InsertableDict,
OutputParam,
_validate_requirements,
combine_inputs,
combine_outputs,
format_components,
@@ -290,6 +291,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
config_name = "modular_config.json"
model_name = None
_requirements: dict[str, str] | None = None
_workflow_map = None
@classmethod
@@ -404,6 +406,9 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
"Selected model repository does not happear to have any custom code or does not have a valid `config.json` file."
)
if "requirements" in config and config["requirements"] is not None:
_ = _validate_requirements(config["requirements"])
class_ref = config["auto_map"][cls.__name__]
module_file, class_name = class_ref.split(".")
module_file = module_file + ".py"
@@ -428,8 +433,13 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
module = full_mod.rsplit(".", 1)[-1].replace("__dynamic__", "")
parent_module = self.save_pretrained.__func__.__qualname__.split(".", 1)[0]
auto_map = {f"{parent_module}": f"{module}.{cls_name}"}
self.register_to_config(auto_map=auto_map)
# resolve requirements
requirements = _validate_requirements(getattr(self, "_requirements", None))
if requirements:
self.register_to_config(requirements=requirements)
self.save_config(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs)
config = dict(self.config)
self._internal_dict = FrozenDict(config)
@@ -1240,6 +1250,14 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
expected_configs=self.expected_configs,
)
@property
def _requirements(self) -> dict[str, str]:
requirements = {}
for block_name, block in self.sub_blocks.items():
if getattr(block, "_requirements", None):
requirements[block_name] = block._requirements
return requirements
class LoopSequentialPipelineBlocks(ModularPipelineBlocks):
"""
@@ -1633,14 +1651,7 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
blocks_class_name = self.default_blocks_name
if blocks_class_name is not None:
diffusers_module = importlib.import_module("diffusers")
blocks_class = getattr(diffusers_module, blocks_class_name, None)
# If the blocks_class is not found or is a base class (e.g. SequentialPipelineBlocks saved by from_blocks_dict) with empty block_classes
# fall back to default_blocks_name
if blocks_class is None or not blocks_class.block_classes:
blocks_class_name = self.default_blocks_name
blocks_class = getattr(diffusers_module, blocks_class_name)
if blocks_class is not None:
blocks_class = getattr(diffusers_module, blocks_class_name)
blocks = blocks_class()
else:
logger.warning(f"`blocks` is `None`, no default blocks class found for {self.__class__.__name__}")

View File

@@ -22,10 +22,12 @@ from typing import Any, Literal, Type, Union, get_args, get_origin
import PIL.Image
import torch
from packaging.specifiers import InvalidSpecifier, SpecifierSet
from ..configuration_utils import ConfigMixin, FrozenDict
from ..loaders.single_file_utils import _is_single_file_path_or_url
from ..utils import DIFFUSERS_LOAD_ID_FIELDS, is_torch_available, logging
from ..utils.import_utils import _is_package_available
if is_torch_available():
@@ -972,6 +974,89 @@ def make_doc_string(
return output
def _validate_requirements(reqs):
if reqs is None:
normalized_reqs = {}
else:
if not isinstance(reqs, dict):
raise ValueError(
"Requirements must be provided as a dictionary mapping package names to version specifiers."
)
normalized_reqs = _normalize_requirements(reqs)
if not normalized_reqs:
return {}
final: dict[str, str] = {}
for req, specified_ver in normalized_reqs.items():
req_available, req_actual_ver = _is_package_available(req)
if not req_available:
logger.warning(f"{req} was specified in the requirements but wasn't found in the current environment.")
if specified_ver:
try:
specifier = SpecifierSet(specified_ver)
except InvalidSpecifier as err:
raise ValueError(f"Requirement specifier '{specified_ver}' for {req} is invalid.") from err
if req_actual_ver == "N/A":
logger.warning(
f"Version of {req} could not be determined to validate requirement '{specified_ver}'. Things might work unexpected."
)
elif not specifier.contains(req_actual_ver, prereleases=True):
logger.warning(
f"{req} requirement '{specified_ver}' is not satisfied by the installed version {req_actual_ver}. Things might work unexpected."
)
final[req] = specified_ver
return final
def _normalize_requirements(reqs):
if not reqs:
return {}
normalized: "OrderedDict[str, str]" = OrderedDict()
def _accumulate(mapping: dict[str, Any]):
for pkg, spec in mapping.items():
if isinstance(spec, dict):
# This is recursive because blocks are composable. This way, we can merge requirements
# from multiple blocks.
_accumulate(spec)
continue
pkg_name = str(pkg).strip()
if not pkg_name:
raise ValueError("Requirement package name cannot be empty.")
spec_str = "" if spec is None else str(spec).strip()
if spec_str and not spec_str.startswith(("<", ">", "=", "!", "~")):
spec_str = f"=={spec_str}"
existing_spec = normalized.get(pkg_name)
if existing_spec is not None:
if not existing_spec and spec_str:
normalized[pkg_name] = spec_str
elif existing_spec and spec_str and existing_spec != spec_str:
try:
combined_spec = SpecifierSet(",".join(filter(None, [existing_spec, spec_str])))
except InvalidSpecifier:
logger.warning(
f"Conflicting requirements for '{pkg_name}' detected: '{existing_spec}' vs '{spec_str}'. Keeping '{existing_spec}'."
)
else:
normalized[pkg_name] = str(combined_spec)
continue
normalized[pkg_name] = spec_str
_accumulate(reqs)
return normalized
def combine_inputs(*named_input_lists: list[tuple[str, list[InputParam]]]) -> list[InputParam]:
"""
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if current

View File

@@ -699,13 +699,9 @@ class LTX2ImageToVideoPipeline(DiffusionPipeline, FromSingleFileMixin, LTX2LoraL
mask_shape = (batch_size, 1, num_frames, height, width)
if latents is not None:
conditioning_mask = latents.new_zeros(mask_shape)
conditioning_mask[:, :, 0] = 1.0
if latents.ndim == 5:
# conditioning_mask needs to the same shape as latents in two stages generation.
batch_size, _, num_frames, height, width = latents.shape
mask_shape = (batch_size, 1, num_frames, height, width)
conditioning_mask = latents.new_zeros(mask_shape)
conditioning_mask[:, :, 0] = 1.0
latents = self._normalize_latents(
latents, self.vae.latents_mean, self.vae.latents_std, self.vae.config.scaling_factor
)
@@ -714,9 +710,6 @@ class LTX2ImageToVideoPipeline(DiffusionPipeline, FromSingleFileMixin, LTX2LoraL
latents = self._pack_latents(
latents, self.transformer_spatial_patch_size, self.transformer_temporal_patch_size
)
else:
conditioning_mask = latents.new_zeros(mask_shape)
conditioning_mask[:, :, 0] = 1.0
conditioning_mask = self._pack_latents(
conditioning_mask, self.transformer_spatial_patch_size, self.transformer_temporal_patch_size
).squeeze(-1)

View File

@@ -10,6 +10,7 @@ import torch
import diffusers
from diffusers import AutoModel, ComponentsManager, ModularPipeline, ModularPipelineBlocks
from diffusers.guiders import ClassifierFreeGuidance
from diffusers.modular_pipelines import SequentialPipelineBlocks
from diffusers.modular_pipelines.modular_pipeline_utils import (
ComponentSpec,
ConfigSpec,
@@ -19,7 +20,13 @@ from diffusers.modular_pipelines.modular_pipeline_utils import (
)
from diffusers.utils import logging
from ..testing_utils import backend_empty_cache, numpy_cosine_similarity_distance, require_accelerator, torch_device
from ..testing_utils import (
CaptureLogger,
backend_empty_cache,
numpy_cosine_similarity_distance,
require_accelerator,
torch_device,
)
class ModularPipelineTesterMixin:
@@ -351,33 +358,6 @@ class ModularPipelineTesterMixin:
assert torch.abs(image_slices[0] - image_slices[1]).max() < 1e-3
def test_modular_index_consistency(self):
pipe = self.get_pipeline()
components_spec = pipe._component_specs
components = sorted(components_spec.keys())
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
index_file = os.path.join(tmpdir, "modular_model_index.json")
assert os.path.exists(index_file)
with open(index_file) as f:
index_contents = json.load(f)
compulsory_keys = {"_blocks_class_name", "_class_name", "_diffusers_version"}
for k in compulsory_keys:
assert k in index_contents
to_check_attrs = {"pretrained_model_name_or_path", "revision", "subfolder"}
for component in components:
spec = components_spec[component]
for attr in to_check_attrs:
if getattr(spec, "pretrained_model_name_or_path", None) is not None:
for attr in to_check_attrs:
assert component in index_contents, f"{component} should be present in index but isn't."
attr_value_from_index = index_contents[component][2][attr]
assert getattr(spec, attr) == attr_value_from_index
def test_workflow_map(self):
blocks = self.pipeline_blocks_class()
if blocks._workflow_map is None:
@@ -429,6 +409,56 @@ class ModularGuiderTesterMixin:
assert max_diff > expected_max_diff, "Output with CFG must be different from normal inference"
class TestCustomBlockRequirements:
def get_dummy_block_pipe(self):
class DummyBlockOne:
# keep two arbitrary deps so that we can test warnings.
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
class DummyBlockTwo:
# keep two dependencies that will be available during testing.
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
pipe = SequentialPipelineBlocks.from_blocks_dict(
{"dummy_block_one": DummyBlockOne, "dummy_block_two": DummyBlockTwo}
)
return pipe
def test_custom_requirements_save_load(self):
pipe = self.get_dummy_block_pipe()
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
config_path = os.path.join(tmpdir, "modular_config.json")
with open(config_path, "r") as f:
config = json.load(f)
assert "requirements" in config
requirements = config["requirements"]
expected_requirements = {
"xyz": ">=0.8.0",
"abc": ">=10.0.0",
"transformers": ">=4.44.0",
"diffusers": ">=0.2.0",
}
assert expected_requirements == requirements
def test_warnings(self):
pipe = self.get_dummy_block_pipe()
with tempfile.TemporaryDirectory() as tmpdir:
logger = logging.get_logger("diffusers.modular_pipelines.modular_pipeline_utils")
logger.setLevel(30)
with CaptureLogger(logger) as cap_logger:
pipe.save_pretrained(tmpdir)
template = "{req} was specified in the requirements but wasn't found in the current environment"
msg_xyz = template.format(req="xyz")
msg_abc = template.format(req="abc")
assert msg_xyz in str(cap_logger.out)
assert msg_abc in str(cap_logger.out)
class TestModularModelCardContent:
def create_mock_block(self, name="TestBlock", description="Test block description"):
class MockBlock:
@@ -728,27 +758,3 @@ class TestLoadComponentsSkipBehavior:
# Verify test_component was not loaded
assert not hasattr(pipe, "test_component") or pipe.test_component is None
class TestModularPipelineInitFallback:
"""Test that ModularPipeline.__init__ falls back to default_blocks_name when
_blocks_class_name is a base class (e.g. SequentialPipelineBlocks saved by from_blocks_dict)."""
def test_init_fallback_when_blocks_class_name_is_base_class(self, tmp_path):
# 1. Load pipeline and get a workflow (returns a base SequentialPipelineBlocks)
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
t2i_blocks = pipe.blocks.get_workflow("text2image")
assert t2i_blocks.__class__.__name__ == "SequentialPipelineBlocks"
# 2. Use init_pipeline to create a new pipeline from the workflow blocks
t2i_pipe = t2i_blocks.init_pipeline("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
# 3. Save and reload — the saved config will have _blocks_class_name="SequentialPipelineBlocks"
save_dir = str(tmp_path / "pipeline")
t2i_pipe.save_pretrained(save_dir)
loaded_pipe = ModularPipeline.from_pretrained(save_dir)
# 4. Verify it fell back to default_blocks_name and has correct blocks
assert loaded_pipe.__class__.__name__ == pipe.__class__.__name__
assert loaded_pipe._blocks.__class__.__name__ == pipe._blocks.__class__.__name__
assert len(loaded_pipe._blocks.sub_blocks) == len(pipe._blocks.sub_blocks)

View File

@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import json
import tempfile
import unittest
@@ -306,96 +305,3 @@ class ConfigTester(unittest.TestCase):
result = json.loads(json_string)
assert result["test_file_1"] == config.config.test_file_1.as_posix()
assert result["test_file_2"] == config.config.test_file_2.as_posix()
class SampleObjectTyped(ConfigMixin):
config_name = "config.json"
@register_to_config
def __init__(
self,
a: int = 2,
b: int = 5,
c: str = "hello",
):
pass
class SampleObjectWithIgnore(ConfigMixin):
config_name = "config.json"
ignore_for_config = ["secret"]
@register_to_config
def __init__(
self,
a: int = 2,
secret: str = "hidden",
):
pass
class DataclassFromConfigTester(unittest.TestCase):
def test_get_dataclass_from_config_returns_frozen_dataclass(self):
obj = SampleObject()
tc = SampleObject._get_dataclass_from_config(dict(obj.config))
assert dataclasses.is_dataclass(tc)
with self.assertRaises(dataclasses.FrozenInstanceError):
tc.a = 99
def test_get_dataclass_from_config_class_name(self):
obj = SampleObject()
tc = SampleObject._get_dataclass_from_config(dict(obj.config))
assert type(tc).__name__ == "SampleObjectConfig"
def test_get_dataclass_from_config_values_match_config(self):
obj = SampleObject(a=10, b=20)
tc = SampleObject._get_dataclass_from_config(dict(obj.config))
assert tc.a == 10
assert tc.b == 20
assert tc.c == (2, 5)
assert tc.d == "for diffusion"
assert tc.e == [1, 3]
def test_get_dataclass_from_config_from_raw_dict(self):
tc = SampleObjectTyped._get_dataclass_from_config({"a": 7, "b": 3, "c": "world"})
assert tc.a == 7
assert tc.b == 3
assert tc.c == "world"
def test_get_dataclass_from_config_annotations(self):
tc = SampleObjectTyped._get_dataclass_from_config({"a": 1, "b": 2, "c": "hi"})
fields = {f.name: f.type for f in dataclasses.fields(tc)}
assert fields["a"] is int
assert fields["b"] is int
assert fields["c"] is str
def test_get_dataclass_from_config_asdict_roundtrip(self):
tc = SampleObjectTyped._get_dataclass_from_config({"a": 7, "b": 3, "c": "world"})
d = dataclasses.asdict(tc)
assert d == {"a": 7, "b": 3, "c": "world"}
def test_get_dataclass_from_config_ignores_extra_keys(self):
tc = SampleObjectTyped._get_dataclass_from_config(
{"a": 1, "b": 2, "c": "hi", "_class_name": "Foo", "extra": 99}
)
assert tc.a == 1
assert not hasattr(tc, "_class_name")
assert not hasattr(tc, "extra")
def test_get_dataclass_from_config_respects_ignore_for_config(self):
tc = SampleObjectWithIgnore._get_dataclass_from_config({"a": 5})
assert not hasattr(tc, "secret")
assert tc.a == 5
def test_get_dataclass_from_config_works_for_scheduler(self):
scheduler = DDIMScheduler()
tc = DDIMScheduler._get_dataclass_from_config(dict(scheduler.config))
assert dataclasses.is_dataclass(tc)
assert type(tc).__name__ == "DDIMSchedulerConfig"
assert tc.num_train_timesteps == scheduler.config.num_train_timesteps
def test_get_dataclass_from_config_different_values(self):
tc1 = SampleObjectTyped._get_dataclass_from_config({"a": 1, "b": 2, "c": "x"})
tc2 = SampleObjectTyped._get_dataclass_from_config({"a": 9, "b": 8, "c": "y"})
assert tc1.a == 1
assert tc2.a == 9

View File

@@ -24,8 +24,7 @@ from diffusers import (
LTX2ImageToVideoPipeline,
LTX2VideoTransformer3DModel,
)
from diffusers.pipelines.ltx2 import LTX2LatentUpsamplePipeline, LTX2TextConnectors
from diffusers.pipelines.ltx2.latent_upsampler import LTX2LatentUpsamplerModel
from diffusers.pipelines.ltx2 import LTX2TextConnectors
from diffusers.pipelines.ltx2.vocoder import LTX2Vocoder
from ...testing_utils import enable_full_determinism
@@ -175,15 +174,6 @@ class LTX2ImageToVideoPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
return components
def get_dummy_upsample_component(self, in_channels=4, mid_channels=32, num_blocks_per_stage=1):
upsampler = LTX2LatentUpsamplerModel(
in_channels=in_channels,
mid_channels=mid_channels,
num_blocks_per_stage=num_blocks_per_stage,
)
return upsampler
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
@@ -297,60 +287,5 @@ class LTX2ImageToVideoPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
assert torch.allclose(expected_video_slice, generated_video_slice, atol=1e-4, rtol=1e-4)
assert torch.allclose(expected_audio_slice, generated_audio_slice, atol=1e-4, rtol=1e-4)
def test_two_stages_inference_with_upsampler(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
inputs["output_type"] = "latent"
first_stage_output = pipe(**inputs)
video_latent = first_stage_output.frames
audio_latent = first_stage_output.audio
self.assertEqual(video_latent.shape, (1, 4, 3, 16, 16))
self.assertEqual(audio_latent.shape, (1, 2, 5, 2))
self.assertEqual(audio_latent.shape[1], components["vocoder"].config.out_channels)
upsampler = self.get_dummy_upsample_component(in_channels=video_latent.shape[1])
upsample_pipe = LTX2LatentUpsamplePipeline(vae=pipe.vae, latent_upsampler=upsampler)
upscaled_video_latent = upsample_pipe(latents=video_latent, output_type="latent", return_dict=False)[0]
self.assertEqual(upscaled_video_latent.shape, (1, 4, 3, 32, 32))
inputs["latents"] = upscaled_video_latent
inputs["audio_latents"] = audio_latent
inputs["output_type"] = "pt"
second_stage_output = pipe(**inputs)
video = second_stage_output.frames
audio = second_stage_output.audio
self.assertEqual(video.shape, (1, 5, 3, 64, 64))
self.assertEqual(audio.shape[0], 1)
self.assertEqual(audio.shape[1], components["vocoder"].config.out_channels)
# fmt: off
expected_video_slice = torch.tensor(
[
0.4497, 0.6757, 0.4219, 0.7686, 0.4525, 0.6483, 0.3969, 0.7404, 0.3541, 0.3039, 0.4592, 0.3521, 0.3665, 0.2785, 0.3336, 0.3079
]
)
expected_audio_slice = torch.tensor(
[
0.0271, 0.0492, 0.1249, 0.1126, 0.1661, 0.1060, 0.1717, 0.0944, 0.0672, -0.0069, 0.0688, 0.0097, 0.0808, 0.1231, 0.0986, 0.0739
]
)
# fmt: on
video = video.flatten()
audio = audio.flatten()
generated_video_slice = torch.cat([video[:8], video[-8:]])
generated_audio_slice = torch.cat([audio[:8], audio[-8:]])
assert torch.allclose(expected_video_slice, generated_video_slice, atol=1e-4, rtol=1e-4)
assert torch.allclose(expected_audio_slice, generated_audio_slice, atol=1e-4, rtol=1e-4)
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(batch_size=2, expected_max_diff=2e-2)