Compare commits

..

2 Commits

Author SHA1 Message Date
DN6
b1df740aac update 2026-02-28 12:02:38 +05:30
DN6
8d20369792 update 2026-02-28 11:19:49 +05:30
15 changed files with 350 additions and 897 deletions

View File

@@ -62,6 +62,20 @@ jobs:
with:
name: benchmark_test_reports
path: benchmarks/${{ env.BASE_PATH }}
# TODO: enable this once the connection problem has been resolved.
- name: Update benchmarking results to DB
env:
PGDATABASE: metrics
PGHOST: ${{ secrets.DIFFUSERS_BENCHMARKS_PGHOST }}
PGUSER: transformers_benchmarks
PGPASSWORD: ${{ secrets.DIFFUSERS_BENCHMARKS_PGPASSWORD }}
BRANCH_NAME: ${{ github.head_ref || github.ref_name }}
run: |
git config --global --add safe.directory /__w/diffusers/diffusers
commit_id=$GITHUB_SHA
commit_msg=$(git show -s --format=%s "$commit_id" | cut -c1-70)
cd benchmarks && python populate_into_db.py "$BRANCH_NAME" "$commit_id" "$commit_msg"
- name: Report success status
if: ${{ success() }}

View File

@@ -54,6 +54,7 @@ jobs:
python -m pip install --upgrade pip
pip install -U setuptools wheel twine
pip install -U torch --index-url https://download.pytorch.org/whl/cpu
pip install -U transformers
- name: Build the dist files
run: python setup.py bdist_wheel && python setup.py sdist
@@ -68,8 +69,6 @@ jobs:
run: |
pip install diffusers && pip uninstall diffusers -y
pip install -i https://test.pypi.org/simple/ diffusers
pip install -U transformers
python utils/print_env.py
python -c "from diffusers import __version__; print(__version__)"
python -c "from diffusers import DiffusionPipeline; pipe = DiffusionPipeline.from_pretrained('fusing/unet-ldm-dummy-update'); pipe()"
python -c "from diffusers import DiffusionPipeline; pipe = DiffusionPipeline.from_pretrained('hf-internal-testing/tiny-stable-diffusion-pipe', safety_checker=None); pipe('ah suh du')"

View File

@@ -0,0 +1,166 @@
import argparse
import os
import sys
import gpustat
import pandas as pd
import psycopg2
import psycopg2.extras
from psycopg2.extensions import register_adapter
from psycopg2.extras import Json
register_adapter(dict, Json)
FINAL_CSV_FILENAME = "collated_results.csv"
# https://github.com/huggingface/transformers/blob/593e29c5e2a9b17baec010e8dc7c1431fed6e841/benchmark/init_db.sql#L27
BENCHMARKS_TABLE_NAME = "benchmarks"
MEASUREMENTS_TABLE_NAME = "model_measurements"
def _init_benchmark(conn, branch, commit_id, commit_msg):
gpu_stats = gpustat.GPUStatCollection.new_query()
metadata = {"gpu_name": gpu_stats[0]["name"]}
repository = "huggingface/diffusers"
with conn.cursor() as cur:
cur.execute(
f"INSERT INTO {BENCHMARKS_TABLE_NAME} (repository, branch, commit_id, commit_message, metadata) VALUES (%s, %s, %s, %s, %s) RETURNING benchmark_id",
(repository, branch, commit_id, commit_msg, metadata),
)
benchmark_id = cur.fetchone()[0]
print(f"Initialised benchmark #{benchmark_id}")
return benchmark_id
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"branch",
type=str,
help="The branch name on which the benchmarking is performed.",
)
parser.add_argument(
"commit_id",
type=str,
help="The commit hash on which the benchmarking is performed.",
)
parser.add_argument(
"commit_msg",
type=str,
help="The commit message associated with the commit, truncated to 70 characters.",
)
args = parser.parse_args()
return args
if __name__ == "__main__":
args = parse_args()
try:
conn = psycopg2.connect(
host=os.getenv("PGHOST"),
database=os.getenv("PGDATABASE"),
user=os.getenv("PGUSER"),
password=os.getenv("PGPASSWORD"),
)
print("DB connection established successfully.")
except Exception as e:
print(f"Problem during DB init: {e}")
sys.exit(1)
try:
benchmark_id = _init_benchmark(
conn=conn,
branch=args.branch,
commit_id=args.commit_id,
commit_msg=args.commit_msg,
)
except Exception as e:
print(f"Problem during initializing benchmark: {e}")
sys.exit(1)
cur = conn.cursor()
df = pd.read_csv(FINAL_CSV_FILENAME)
# Helper to cast values (or None) given a dtype
def _cast_value(val, dtype: str):
if pd.isna(val):
return None
if dtype == "text":
return str(val).strip()
if dtype == "float":
try:
return float(val)
except ValueError:
return None
if dtype == "bool":
s = str(val).strip().lower()
if s in ("true", "t", "yes", "1"):
return True
if s in ("false", "f", "no", "0"):
return False
if val in (1, 1.0):
return True
if val in (0, 0.0):
return False
return None
return val
try:
rows_to_insert = []
for _, row in df.iterrows():
scenario = _cast_value(row.get("scenario"), "text")
model_cls = _cast_value(row.get("model_cls"), "text")
num_params_B = _cast_value(row.get("num_params_B"), "float")
flops_G = _cast_value(row.get("flops_G"), "float")
time_plain_s = _cast_value(row.get("time_plain_s"), "float")
mem_plain_GB = _cast_value(row.get("mem_plain_GB"), "float")
time_compile_s = _cast_value(row.get("time_compile_s"), "float")
mem_compile_GB = _cast_value(row.get("mem_compile_GB"), "float")
fullgraph = _cast_value(row.get("fullgraph"), "bool")
mode = _cast_value(row.get("mode"), "text")
# If "github_sha" column exists in the CSV, cast it; else default to None
if "github_sha" in df.columns:
github_sha = _cast_value(row.get("github_sha"), "text")
else:
github_sha = None
measurements = {
"scenario": scenario,
"model_cls": model_cls,
"num_params_B": num_params_B,
"flops_G": flops_G,
"time_plain_s": time_plain_s,
"mem_plain_GB": mem_plain_GB,
"time_compile_s": time_compile_s,
"mem_compile_GB": mem_compile_GB,
"fullgraph": fullgraph,
"mode": mode,
"github_sha": github_sha,
}
rows_to_insert.append((benchmark_id, measurements))
# Batch-insert all rows
insert_sql = f"""
INSERT INTO {MEASUREMENTS_TABLE_NAME} (
benchmark_id,
measurements
)
VALUES (%s, %s);
"""
psycopg2.extras.execute_batch(cur, insert_sql, rows_to_insert)
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"Exception: {e}")
sys.exit(1)

View File

@@ -14,8 +14,4 @@
## AutoPipelineBlocks
[[autodoc]] diffusers.modular_pipelines.modular_pipeline.AutoPipelineBlocks
## ConditionalPipelineBlocks
[[autodoc]] diffusers.modular_pipelines.modular_pipeline.ConditionalPipelineBlocks
[[autodoc]] diffusers.modular_pipelines.modular_pipeline.AutoPipelineBlocks

View File

@@ -121,7 +121,7 @@ from diffusers.modular_pipelines import AutoPipelineBlocks
class AutoImageBlocks(AutoPipelineBlocks):
# List of sub-block classes to choose from
block_classes = [InpaintBlock, ImageToImageBlock, TextToImageBlock]
block_classes = [block_inpaint_cls, block_i2i_cls, block_t2i_cls]
# Names for each block in the same order
block_names = ["inpaint", "img2img", "text2img"]
# Trigger inputs that determine which block to run
@@ -129,8 +129,8 @@ class AutoImageBlocks(AutoPipelineBlocks):
# - "image" triggers img2img workflow (but only if mask is not provided)
# - if none of above, runs the text2img workflow (default)
block_trigger_inputs = ["mask", "image", None]
# Description is extremely important for AutoPipelineBlocks
@property
def description(self):
return (
"Pipeline generates images given different types of conditions!\n"
@@ -141,7 +141,7 @@ class AutoImageBlocks(AutoPipelineBlocks):
)
```
It is **very** important to include a `description` to avoid any confusion over how to run a block and what inputs are required. While [`~modular_pipelines.AutoPipelineBlocks`] are convenient, its conditional logic may be difficult to figure out if it isn't properly explained.
It is **very** important to include a `description` to avoid any confusion over how to run a block and what inputs are required. While [`~modular_pipelines.AutoPipelineBlocks`] are convenient, it's conditional logic may be difficult to figure out if it isn't properly explained.
Create an instance of `AutoImageBlocks`.
@@ -152,74 +152,5 @@ auto_blocks = AutoImageBlocks()
For more complex compositions, such as nested [`~modular_pipelines.AutoPipelineBlocks`] blocks when they're used as sub-blocks in larger pipelines, use the [`~modular_pipelines.SequentialPipelineBlocks.get_execution_blocks`] method to extract the a block that is actually run based on your input.
```py
auto_blocks.get_execution_blocks(mask=True)
```
## ConditionalPipelineBlocks
[`~modular_pipelines.AutoPipelineBlocks`] is a special case of [`~modular_pipelines.ConditionalPipelineBlocks`]. While [`~modular_pipelines.AutoPipelineBlocks`] selects blocks based on whether a trigger input is provided or not, [`~modular_pipelines.ConditionalPipelineBlocks`] is able to select a block based on custom selection logic provided in the `select_block` method.
Here is the same example written using [`~modular_pipelines.ConditionalPipelineBlocks`] directly:
```py
from diffusers.modular_pipelines import ConditionalPipelineBlocks
class AutoImageBlocks(ConditionalPipelineBlocks):
block_classes = [InpaintBlock, ImageToImageBlock, TextToImageBlock]
block_names = ["inpaint", "img2img", "text2img"]
block_trigger_inputs = ["mask", "image"]
default_block_name = "text2img"
@property
def description(self):
return (
"Pipeline generates images given different types of conditions!\n"
+ "This is an auto pipeline block that works for text2img, img2img and inpainting tasks.\n"
+ " - inpaint workflow is run when `mask` is provided.\n"
+ " - img2img workflow is run when `image` is provided (but only when `mask` is not provided).\n"
+ " - text2img workflow is run when neither `image` nor `mask` is provided.\n"
)
def select_block(self, mask=None, image=None) -> str | None:
if mask is not None:
return "inpaint"
if image is not None:
return "img2img"
return None # falls back to default_block_name ("text2img")
```
The inputs listed in `block_trigger_inputs` are passed as keyword arguments to `select_block()`. When `select_block` returns `None`, it falls back to `default_block_name`. If `default_block_name` is also `None`, the entire conditional block is skipped — this is useful for optional processing steps that should only run when specific inputs are provided.
## Workflows
Pipelines that contain conditional blocks ([`~modular_pipelines.AutoPipelineBlocks`] or [`~modular_pipelines.ConditionalPipelineBlocks]`) can support multiple workflows — for example, our SDXL modular pipeline supports a dozen workflows all in one pipeline. But this also means it can be confusing for users to know what workflows are supported and how to run them. For pipeline builders, it's useful to be able to extract only the blocks relevant to a specific workflow.
We recommend defining a `_workflow_map` to give each workflow a name and explicitly list the inputs it requires.
```py
from diffusers.modular_pipelines import SequentialPipelineBlocks
class MyPipelineBlocks(SequentialPipelineBlocks):
block_classes = [TextEncoderBlock, AutoImageBlocks, DecodeBlock]
block_names = ["text_encoder", "auto_image", "decode"]
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"image": True, "prompt": True},
"inpaint": {"mask": True, "image": True, "prompt": True},
}
```
All of our built-in modular pipelines come with pre-defined workflows. The `available_workflows` property lists all supported workflows:
```py
pipeline_blocks = MyPipelineBlocks()
pipeline_blocks.available_workflows
# ['text2image', 'image2image', 'inpaint']
```
Retrieve a specific workflow with `get_workflow` to inspect and debug a specific block that executes the workflow.
```py
pipeline_blocks.get_workflow("inpaint")
auto_blocks.get_execution_blocks("mask")
```

View File

@@ -97,32 +97,5 @@ If the custom model inherits from the [`ModelMixin`] class, it gets access to th
> )
> ```
### Saving custom models
Use [`~ConfigMixin.register_for_auto_class`] to add the `auto_map` entry to `config.json` automatically when saving. This avoids having to manually edit the config file.
```py
# my_model.py
from diffusers import ModelMixin, ConfigMixin
class MyCustomModel(ModelMixin, ConfigMixin):
...
MyCustomModel.register_for_auto_class("AutoModel")
model = MyCustomModel(...)
model.save_pretrained("./my_model")
```
The saved `config.json` will include the `auto_map` field.
```json
{
"auto_map": {
"AutoModel": "my_model.MyCustomModel"
}
}
```
> [!NOTE]
> Learn more about implementing custom models in the [Community components](../using-diffusers/custom_pipeline_overview#community-components) guide.

View File

@@ -107,38 +107,6 @@ class ConfigMixin:
has_compatibles = False
_deprecated_kwargs = []
_auto_class = None
@classmethod
def register_for_auto_class(cls, auto_class="AutoModel"):
"""
Register this class with the given auto class so that it can be loaded with `AutoModel.from_pretrained(...,
trust_remote_code=True)`.
When the config is saved, the resulting `config.json` will include an `auto_map` entry mapping the auto class
to this class's module and class name.
Args:
auto_class (`str` or type, *optional*, defaults to `"AutoModel"`):
The auto class to register this class with. Can be a string (e.g. `"AutoModel"`) or the class itself.
Currently only `"AutoModel"` is supported.
Example:
```python
from diffusers import ModelMixin, ConfigMixin
class MyCustomModel(ModelMixin, ConfigMixin): ...
MyCustomModel.register_for_auto_class("AutoModel")
```
"""
if auto_class != "AutoModel":
raise ValueError(f"Only 'AutoModel' is supported, got '{auto_class}'.")
cls._auto_class = auto_class
def register_to_config(self, **kwargs):
if self.config_name is None:
@@ -653,12 +621,6 @@ class ConfigMixin:
# pop the `_pre_quantization_dtype` as torch.dtypes are not serializable.
_ = config_dict.pop("_pre_quantization_dtype", None)
if getattr(self, "_auto_class", None) is not None:
module = self.__class__.__module__.split(".")[-1]
auto_map = config_dict.get("auto_map", {})
auto_map[self._auto_class] = f"{module}.{self.__class__.__name__}"
config_dict["auto_map"] = auto_map
return json.dumps(config_dict, indent=2, sort_keys=True) + "\n"
def to_json_file(self, json_file_path: str | os.PathLike):

View File

@@ -14,7 +14,6 @@
import importlib
import inspect
import os
import sys
import traceback
import warnings
from collections import OrderedDict
@@ -29,16 +28,10 @@ from tqdm.auto import tqdm
from typing_extensions import Self
from ..configuration_utils import ConfigMixin, FrozenDict
from ..pipelines.pipeline_loading_utils import (
LOADABLE_CLASSES,
_fetch_class_library_tuple,
_unwrap_model,
simple_get_class_obj,
)
from ..pipelines.pipeline_loading_utils import _fetch_class_library_tuple, simple_get_class_obj
from ..utils import PushToHubMixin, is_accelerate_available, logging
from ..utils.dynamic_modules_utils import get_class_from_dynamic_module, resolve_trust_remote_code
from ..utils.hub_utils import load_or_create_model_card, populate_model_card
from ..utils.torch_utils import is_compiled_module
from .components_manager import ComponentsManager
from .modular_pipeline_utils import (
MODULAR_MODEL_CARD_TEMPLATE,
@@ -1640,14 +1633,7 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
blocks_class_name = self.default_blocks_name
if blocks_class_name is not None:
diffusers_module = importlib.import_module("diffusers")
blocks_class = getattr(diffusers_module, blocks_class_name, None)
# If the blocks_class is not found or is a base class (e.g. SequentialPipelineBlocks saved by from_blocks_dict) with empty block_classes
# fall back to default_blocks_name
if blocks_class is None or not blocks_class.block_classes:
blocks_class_name = self.default_blocks_name
blocks_class = getattr(diffusers_module, blocks_class_name)
if blocks_class is not None:
blocks_class = getattr(diffusers_module, blocks_class_name)
blocks = blocks_class()
else:
logger.warning(f"`blocks` is `None`, no default blocks class found for {self.__class__.__name__}")
@@ -1707,8 +1693,6 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
_blocks_class_name=self._blocks.__class__.__name__ if self._blocks is not None else None
)
self._pretrained_model_name_or_path = pretrained_model_name_or_path
@property
def default_call_parameters(self) -> dict[str, Any]:
"""
@@ -1835,136 +1819,44 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
)
return pipeline
def save_pretrained(
self,
save_directory: str | os.PathLike,
safe_serialization: bool = True,
variant: str | None = None,
max_shard_size: int | str | None = None,
push_to_hub: bool = False,
**kwargs,
):
def save_pretrained(self, save_directory: str | os.PathLike, push_to_hub: bool = False, **kwargs):
"""
Save the pipeline and all its components to a directory, so that it can be re-loaded using the
[`~ModularPipeline.from_pretrained`] class method.
Save the pipeline to a directory. It does not save components, you need to save them separately.
Args:
save_directory (`str` or `os.PathLike`):
Directory to save the pipeline to. Will be created if it doesn't exist.
safe_serialization (`bool`, *optional*, defaults to `True`):
Whether to save the model using `safetensors` or the traditional PyTorch way with `pickle`.
variant (`str`, *optional*):
If specified, weights are saved in the format `pytorch_model.<variant>.bin`.
max_shard_size (`int` or `str`, defaults to `None`):
The maximum size for a checkpoint before being sharded. Checkpoints shard will then be each of size
lower than this size. If expressed as a string, needs to be digits followed by a unit (like `"5GB"`).
If expressed as an integer, the unit is bytes.
push_to_hub (`bool`, *optional*, defaults to `False`):
Whether to push the pipeline to the Hugging Face model hub after saving it.
**kwargs: Additional keyword arguments:
- `overwrite_modular_index` (`bool`, *optional*, defaults to `False`):
When saving a Modular Pipeline, its components in `modular_model_index.json` may reference repos
different from the destination repo. Setting this to `True` updates all component references in
`modular_model_index.json` so they point to the repo specified by `repo_id`.
- `repo_id` (`str`, *optional*):
The repository ID to push the pipeline to. Defaults to the last component of `save_directory`.
- `commit_message` (`str`, *optional*):
Commit message for the push to hub operation.
- `private` (`bool`, *optional*):
Whether the repository should be private.
- `create_pr` (`bool`, *optional*, defaults to `False`):
Whether to create a pull request instead of pushing directly.
- `token` (`str`, *optional*):
The Hugging Face token to use for authentication.
Path to the directory where the pipeline will be saved.
push_to_hub (`bool`, optional):
Whether to push the pipeline to the huggingface hub.
**kwargs: Additional arguments passed to `save_config()` method
"""
overwrite_modular_index = kwargs.pop("overwrite_modular_index", False)
repo_id = kwargs.pop("repo_id", save_directory.split(os.path.sep)[-1])
if push_to_hub:
commit_message = kwargs.pop("commit_message", None)
private = kwargs.pop("private", None)
create_pr = kwargs.pop("create_pr", False)
token = kwargs.pop("token", None)
update_model_card = kwargs.pop("update_model_card", False)
repo_id = kwargs.pop("repo_id", save_directory.split(os.path.sep)[-1])
repo_id = create_repo(repo_id, exist_ok=True, private=private, token=token).repo_id
for component_name, component_spec in self._component_specs.items():
if component_spec.default_creation_method != "from_pretrained":
continue
component = getattr(self, component_name, None)
if component is None:
continue
model_cls = component.__class__
if is_compiled_module(component):
component = _unwrap_model(component)
model_cls = component.__class__
save_method_name = None
for library_name, library_classes in LOADABLE_CLASSES.items():
if library_name in sys.modules:
library = importlib.import_module(library_name)
else:
logger.info(
f"{library_name} is not installed. Cannot save {component_name} as {library_classes} from {library_name}"
)
continue
for base_class, save_load_methods in library_classes.items():
class_candidate = getattr(library, base_class, None)
if class_candidate is not None and issubclass(model_cls, class_candidate):
save_method_name = save_load_methods[0]
break
if save_method_name is not None:
break
if save_method_name is None:
logger.warning(f"self.{component_name}={component} of type {type(component)} cannot be saved.")
continue
save_method = getattr(component, save_method_name)
save_method_signature = inspect.signature(save_method)
save_method_accept_safe = "safe_serialization" in save_method_signature.parameters
save_method_accept_variant = "variant" in save_method_signature.parameters
save_method_accept_max_shard_size = "max_shard_size" in save_method_signature.parameters
save_kwargs = {}
if save_method_accept_safe:
save_kwargs["safe_serialization"] = safe_serialization
if save_method_accept_variant:
save_kwargs["variant"] = variant
if save_method_accept_max_shard_size and max_shard_size is not None:
save_kwargs["max_shard_size"] = max_shard_size
component_save_path = os.path.join(save_directory, component_name)
save_method(component_save_path, **save_kwargs)
if component_name not in self.config:
continue
has_no_load_id = not hasattr(component, "_diffusers_load_id") or component._diffusers_load_id == "null"
if overwrite_modular_index or has_no_load_id:
library, class_name, component_spec_dict = self.config[component_name]
component_spec_dict["pretrained_model_name_or_path"] = repo_id if push_to_hub else save_directory
component_spec_dict["subfolder"] = component_name
self.register_to_config(**{component_name: (library, class_name, component_spec_dict)})
self.save_config(save_directory=save_directory)
if push_to_hub:
# Generate modular pipeline card content
card_content = generate_modular_model_card_content(self.blocks)
# Create a new empty model card and eventually tag it
model_card = load_or_create_model_card(
repo_id,
token=token,
is_pipeline=True,
model_description=MODULAR_MODEL_CARD_TEMPLATE.format(**card_content),
is_modular=True,
update_model_card=update_model_card,
)
model_card = populate_model_card(model_card, tags=card_content["tags"])
model_card.save(os.path.join(save_directory, "README.md"))
# YiYi TODO: maybe order the json file to make it more readable: configs first, then components
self.save_config(save_directory=save_directory)
if push_to_hub:
self._upload_folder(
save_directory,
repo_id,
@@ -2232,9 +2124,8 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
```
Notes:
- Components loaded with `AutoModel.from_pretrained()` or `ComponentSpec.load()` will have
loading specs preserved for serialization. Custom or locally loaded components without Hub references will
have their `modular_model_index.json` entries updated automatically during `save_pretrained()`.
- Components with trained weights should be loaded with `AutoModel.from_pretrained()` or
`ComponentSpec.load()` so that loading specs are preserved for serialization.
- ConfigMixin objects without weights (e.g., schedulers, guiders) can be passed directly.
"""
@@ -2256,10 +2147,13 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
new_component_spec = current_component_spec
if hasattr(self, name) and getattr(self, name) is not None:
logger.warning(f"ModularPipeline.update_components: setting {name} to None (spec unchanged)")
elif (
current_component_spec.default_creation_method == "from_pretrained"
and getattr(component, "_diffusers_load_id", None) is None
elif current_component_spec.default_creation_method == "from_pretrained" and not (
hasattr(component, "_diffusers_load_id") and component._diffusers_load_id is not None
):
logger.warning(
f"ModularPipeline.update_components: {name} has no valid _diffusers_load_id. "
f"This will result in empty loading spec, use ComponentSpec.load() for proper specs"
)
new_component_spec = ComponentSpec(name=name, type_hint=type(component))
else:
new_component_spec = ComponentSpec.from_component(name, component)
@@ -2332,49 +2226,17 @@ class ModularPipeline(ConfigMixin, PushToHubMixin):
elif "default" in value:
# check if the default is specified
component_load_kwargs[key] = value["default"]
# Only pass trust_remote_code to components from the same repo as the pipeline.
# When a user passes trust_remote_code=True, they intend to trust code from the
# pipeline's repo, not from external repos referenced in modular_model_index.json.
trust_remote_code_stripped = False
if (
"trust_remote_code" in component_load_kwargs
and self._pretrained_model_name_or_path is not None
and spec.pretrained_model_name_or_path != self._pretrained_model_name_or_path
):
component_load_kwargs.pop("trust_remote_code")
trust_remote_code_stripped = True
if not spec.pretrained_model_name_or_path:
logger.info(f"Skipping component `{name}`: no pretrained model path specified.")
continue
try:
components_to_register[name] = spec.load(**component_load_kwargs)
except Exception:
tb = traceback.format_exc()
if trust_remote_code_stripped and "trust_remote_code" in tb:
warning_msg = (
f"Failed to load component `{name}` from external repository "
f"`{spec.pretrained_model_name_or_path}`.\n\n"
f"`trust_remote_code=True` was not forwarded to `{name}` because it comes from "
f"a different repository than the pipeline (`{self._pretrained_model_name_or_path}`). "
f"For safety, `trust_remote_code` is only forwarded to components from the same "
f"repository as the pipeline.\n\n"
f"You need to load this component manually with `trust_remote_code=True` and pass it "
f"to the pipeline via `pipe.update_components()`. For example, if it is a custom model:\n\n"
f' {name} = AutoModel.from_pretrained("{spec.pretrained_model_name_or_path}", trust_remote_code=True)\n'
f" pipe.update_components({name}={name})\n"
)
else:
warning_msg = (
f"Failed to create component {name}:\n"
f"- Component spec: {spec}\n"
f"- load() called with kwargs: {component_load_kwargs}\n"
"If this component is not required for your workflow you can safely ignore this message.\n\n"
"Traceback:\n"
f"{tb}"
)
logger.warning(warning_msg)
logger.warning(
f"\nFailed to create component {name}:\n"
f"- Component spec: {spec}\n"
f"- load() called with kwargs: {component_load_kwargs}\n"
"If this component is not required for your workflow you can safely ignore this message.\n\n"
"Traceback:\n"
f"{traceback.format_exc()}"
)
# Register all components at once
self.register_components(**components_to_register)

View File

@@ -50,7 +50,11 @@ This modular pipeline is composed of the following blocks:
{components_description} {configs_section}
{io_specification_section}
## Input/Output Specification
### Inputs {inputs_description}
### Outputs {outputs_description}
"""
@@ -307,12 +311,6 @@ class ComponentSpec:
f"`type_hint` is required when loading a single file model but is missing for component: {self.name}"
)
# `torch_dtype` is not an accepted parameter for tokenizers and processors.
# As a result, it gets stored in `init_kwargs`, which are written to the config
# during save. This causes JSON serialization to fail when saving the component.
if self.type_hint is not None and not issubclass(self.type_hint, torch.nn.Module):
kwargs.pop("torch_dtype", None)
if self.type_hint is None:
try:
from diffusers import AutoModel
@@ -330,12 +328,6 @@ class ComponentSpec:
else getattr(self.type_hint, "from_pretrained")
)
# `torch_dtype` is not an accepted parameter for tokenizers and processors.
# As a result, it gets stored in `init_kwargs`, which are written to the config
# during save. This causes JSON serialization to fail when saving the component.
if not issubclass(self.type_hint, torch.nn.Module):
kwargs.pop("torch_dtype", None)
try:
component = load_method(pretrained_model_name_or_path, **load_kwargs, **kwargs)
except Exception as e:
@@ -807,46 +799,6 @@ def format_output_params(output_params, indent_level=4, max_line_length=115):
return format_params(output_params, "Outputs", indent_level, max_line_length)
def format_params_markdown(params, header="Inputs"):
"""Format a list of InputParam or OutputParam objects as a markdown bullet-point list.
Suitable for model cards rendered on Hugging Face Hub.
Args:
params: list of InputParam or OutputParam objects to format
header: Header text (e.g. "Inputs" or "Outputs")
Returns:
A formatted markdown string, or empty string if params is empty.
"""
if not params:
return ""
def get_type_str(type_hint):
if isinstance(type_hint, UnionType) or get_origin(type_hint) is Union:
type_strs = [t.__name__ if hasattr(t, "__name__") else str(t) for t in get_args(type_hint)]
return " | ".join(type_strs)
return type_hint.__name__ if hasattr(type_hint, "__name__") else str(type_hint)
lines = [f"**{header}:**\n"] if header else []
for param in params:
type_str = get_type_str(param.type_hint) if param.type_hint != Any else ""
name = f"**{param.kwargs_type}" if param.name is None and param.kwargs_type is not None else param.name
param_str = f"- `{name}` (`{type_str}`"
if hasattr(param, "required") and not param.required:
param_str += ", *optional*"
if param.default is not None:
param_str += f", defaults to `{param.default}`"
param_str += ")"
desc = param.description if param.description else "No description provided"
param_str += f": {desc}"
lines.append(param_str)
return "\n".join(lines)
def format_components(components, indent_level=4, max_line_length=115, add_empty_lines=True):
"""Format a list of ComponentSpec objects into a readable string representation.
@@ -1103,7 +1055,8 @@ def generate_modular_model_card_content(blocks) -> dict[str, Any]:
- blocks_description: Detailed architecture of blocks
- components_description: List of required components
- configs_section: Configuration parameters section
- io_specification_section: Input/Output specification (per-workflow or unified)
- inputs_description: Input parameters specification
- outputs_description: Output parameters specification
- trigger_inputs_section: Conditional execution information
- tags: List of relevant tags for the model card
"""
@@ -1122,6 +1075,15 @@ def generate_modular_model_card_content(blocks) -> dict[str, Any]:
if block_desc:
blocks_desc_parts.append(f" - {block_desc}")
# add sub-blocks if any
if hasattr(block, "sub_blocks") and block.sub_blocks:
for sub_name, sub_block in block.sub_blocks.items():
sub_class = sub_block.__class__.__name__
sub_desc = sub_block.description.split("\n")[0] if getattr(sub_block, "description", "") else ""
blocks_desc_parts.append(f" - *{sub_name}*: `{sub_class}`")
if sub_desc:
blocks_desc_parts.append(f" - {sub_desc}")
blocks_description = "\n".join(blocks_desc_parts) if blocks_desc_parts else "No blocks defined."
components = getattr(blocks, "expected_components", [])
@@ -1147,76 +1109,63 @@ def generate_modular_model_card_content(blocks) -> dict[str, Any]:
if configs_description:
configs_section = f"\n\n## Configuration Parameters\n\n{configs_description}"
# Branch on whether workflows are defined
has_workflows = getattr(blocks, "_workflow_map", None) is not None
inputs = blocks.inputs
outputs = blocks.outputs
if has_workflows:
workflow_map = blocks._workflow_map
parts = []
# format inputs as markdown list
inputs_parts = []
required_inputs = [inp for inp in inputs if inp.required]
optional_inputs = [inp for inp in inputs if not inp.required]
# If blocks overrides outputs (e.g. to return just "images" instead of all intermediates),
# use that as the shared output for all workflows
blocks_outputs = blocks.outputs
blocks_intermediate = getattr(blocks, "intermediate_outputs", None)
shared_outputs = (
blocks_outputs if blocks_intermediate is not None and blocks_outputs != blocks_intermediate else None
)
if required_inputs:
inputs_parts.append("**Required:**\n")
for inp in required_inputs:
if hasattr(inp.type_hint, "__name__"):
type_str = inp.type_hint.__name__
elif inp.type_hint is not None:
type_str = str(inp.type_hint).replace("typing.", "")
else:
type_str = "Any"
desc = inp.description or "No description provided"
inputs_parts.append(f"- `{inp.name}` (`{type_str}`): {desc}")
parts.append("## Workflow Input Specification\n")
if optional_inputs:
if required_inputs:
inputs_parts.append("")
inputs_parts.append("**Optional:**\n")
for inp in optional_inputs:
if hasattr(inp.type_hint, "__name__"):
type_str = inp.type_hint.__name__
elif inp.type_hint is not None:
type_str = str(inp.type_hint).replace("typing.", "")
else:
type_str = "Any"
desc = inp.description or "No description provided"
default_str = f", default: `{inp.default}`" if inp.default is not None else ""
inputs_parts.append(f"- `{inp.name}` (`{type_str}`){default_str}: {desc}")
# Per-workflow details: show trigger inputs with full param descriptions
for wf_name, trigger_inputs in workflow_map.items():
trigger_input_names = set(trigger_inputs.keys())
try:
workflow_blocks = blocks.get_workflow(wf_name)
except Exception:
parts.append(f"<details>\n<summary><strong>{wf_name}</strong></summary>\n")
parts.append("*Could not resolve workflow blocks.*\n")
parts.append("</details>\n")
continue
inputs_description = "\n".join(inputs_parts) if inputs_parts else "No specific inputs defined."
wf_inputs = workflow_blocks.inputs
# Show only trigger inputs with full parameter descriptions
trigger_params = [p for p in wf_inputs if p.name in trigger_input_names]
# format outputs as markdown list
outputs_parts = []
for out in outputs:
if hasattr(out.type_hint, "__name__"):
type_str = out.type_hint.__name__
elif out.type_hint is not None:
type_str = str(out.type_hint).replace("typing.", "")
else:
type_str = "Any"
desc = out.description or "No description provided"
outputs_parts.append(f"- `{out.name}` (`{type_str}`): {desc}")
parts.append(f"<details>\n<summary><strong>{wf_name}</strong></summary>\n")
outputs_description = "\n".join(outputs_parts) if outputs_parts else "Standard pipeline outputs."
inputs_str = format_params_markdown(trigger_params, header=None)
parts.append(inputs_str if inputs_str else "No additional inputs required.")
parts.append("")
parts.append("</details>\n")
# Common Inputs & Outputs section (like non-workflow pipelines)
all_inputs = blocks.inputs
all_outputs = shared_outputs if shared_outputs is not None else blocks.outputs
inputs_str = format_params_markdown(all_inputs, "Inputs")
outputs_str = format_params_markdown(all_outputs, "Outputs")
inputs_description = inputs_str if inputs_str else "No specific inputs defined."
outputs_description = outputs_str if outputs_str else "Standard pipeline outputs."
parts.append(f"\n## Input/Output Specification\n\n{inputs_description}\n\n{outputs_description}")
io_specification_section = "\n".join(parts)
# Suppress trigger_inputs_section when workflows are shown (it's redundant)
trigger_inputs_section = ""
else:
# Unified I/O section (original behavior)
inputs = blocks.inputs
outputs = blocks.outputs
inputs_str = format_params_markdown(inputs, "Inputs")
outputs_str = format_params_markdown(outputs, "Outputs")
inputs_description = inputs_str if inputs_str else "No specific inputs defined."
outputs_description = outputs_str if outputs_str else "Standard pipeline outputs."
io_specification_section = f"## Input/Output Specification\n\n{inputs_description}\n\n{outputs_description}"
trigger_inputs_section = ""
if hasattr(blocks, "trigger_inputs") and blocks.trigger_inputs:
trigger_inputs_list = sorted([t for t in blocks.trigger_inputs if t is not None])
if trigger_inputs_list:
trigger_inputs_str = ", ".join(f"`{t}`" for t in trigger_inputs_list)
trigger_inputs_section = f"""
trigger_inputs_section = ""
if hasattr(blocks, "trigger_inputs") and blocks.trigger_inputs:
trigger_inputs_list = sorted([t for t in blocks.trigger_inputs if t is not None])
if trigger_inputs_list:
trigger_inputs_str = ", ".join(f"`{t}`" for t in trigger_inputs_list)
trigger_inputs_section = f"""
### Conditional Execution
This pipeline contains blocks that are selected at runtime based on inputs:
@@ -1229,18 +1178,7 @@ This pipeline contains blocks that are selected at runtime based on inputs:
if hasattr(blocks, "model_name") and blocks.model_name:
tags.append(blocks.model_name)
if has_workflows:
# Derive tags from workflow names
workflow_names = set(blocks._workflow_map.keys())
if any("inpainting" in wf for wf in workflow_names):
tags.append("inpainting")
if any("image2image" in wf for wf in workflow_names):
tags.append("image-to-image")
if any("controlnet" in wf for wf in workflow_names):
tags.append("controlnet")
if any("text2image" in wf for wf in workflow_names):
tags.append("text-to-image")
elif hasattr(blocks, "trigger_inputs") and blocks.trigger_inputs:
if hasattr(blocks, "trigger_inputs") and blocks.trigger_inputs:
triggers = blocks.trigger_inputs
if any(t in triggers for t in ["mask", "mask_image"]):
tags.append("inpainting")
@@ -1268,7 +1206,8 @@ This pipeline uses a {block_count}-block architecture that can be customized and
"blocks_description": blocks_description,
"components_description": components_description,
"configs_section": configs_section,
"io_specification_section": io_specification_section,
"inputs_description": inputs_description,
"outputs_description": outputs_description,
"trigger_inputs_section": trigger_inputs_section,
"tags": tags,
}

View File

@@ -31,18 +31,14 @@ class IPNDMScheduler(SchedulerMixin, ConfigMixin):
Args:
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
trained_betas (`np.ndarray` or `List[float]`, *optional*):
trained_betas (`np.ndarray`, *optional*):
Pass an array of betas directly to the constructor to bypass `beta_start` and `beta_end`.
"""
order = 1
@register_to_config
def __init__(
self,
num_train_timesteps: int = 1000,
trained_betas: np.ndarray | list[float] | None = None,
):
def __init__(self, num_train_timesteps: int = 1000, trained_betas: np.ndarray | list[float] | None = None):
# set `betas`, `alphas`, `timesteps`
self.set_timesteps(num_train_timesteps)
@@ -60,29 +56,21 @@ class IPNDMScheduler(SchedulerMixin, ConfigMixin):
self._begin_index = None
@property
def step_index(self) -> int | None:
def step_index(self):
"""
The index counter for current timestep. It will increase 1 after each scheduler step.
Returns:
`int` or `None`:
The index counter for current timestep.
"""
return self._step_index
@property
def begin_index(self) -> int | None:
def begin_index(self):
"""
The index for the first timestep. It should be set from pipeline with `set_begin_index` method.
Returns:
`int` or `None`:
The index for the first timestep.
"""
return self._begin_index
# Copied from diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index
def set_begin_index(self, begin_index: int = 0) -> None:
def set_begin_index(self, begin_index: int = 0):
"""
Sets the begin index for the scheduler. This function should be run from pipeline before the inference.
@@ -181,7 +169,7 @@ class IPNDMScheduler(SchedulerMixin, ConfigMixin):
Args:
model_output (`torch.Tensor`):
The direct output from learned diffusion model.
timestep (`int` or `torch.Tensor`):
timestep (`int`):
The current discrete timestep in the diffusion chain.
sample (`torch.Tensor`):
A current instance of a sample created by the diffusion process.
@@ -240,30 +228,7 @@ class IPNDMScheduler(SchedulerMixin, ConfigMixin):
"""
return sample
def _get_prev_sample(
self,
sample: torch.Tensor,
timestep_index: int,
prev_timestep_index: int,
ets: torch.Tensor,
) -> torch.Tensor:
"""
Predicts the previous sample based on the current sample, timestep indices, and running model outputs.
Args:
sample (`torch.Tensor`):
The current sample.
timestep_index (`int`):
Index of the current timestep in the schedule.
prev_timestep_index (`int`):
Index of the previous timestep in the schedule.
ets (`torch.Tensor`):
The running sequence of model outputs.
Returns:
`torch.Tensor`:
The predicted previous sample.
"""
def _get_prev_sample(self, sample, timestep_index, prev_timestep_index, ets):
alpha = self.alphas[timestep_index]
sigma = self.betas[timestep_index]
@@ -275,5 +240,5 @@ class IPNDMScheduler(SchedulerMixin, ConfigMixin):
return prev_sample
def __len__(self) -> int:
def __len__(self):
return self.config.num_train_timesteps

View File

@@ -107,7 +107,6 @@ def load_or_create_model_card(
widget: list[dict] | None = None,
inference: bool | None = None,
is_modular: bool = False,
update_model_card: bool = False,
) -> ModelCard:
"""
Loads or creates a model card.
@@ -134,9 +133,6 @@ def load_or_create_model_card(
`load_or_create_model_card` from a training script.
is_modular: (`bool`, optional): Boolean flag to denote if the model card is for a modular pipeline.
When True, uses model_description as-is without additional template formatting.
update_model_card: (`bool`, optional): When True, regenerates the model card content even if one
already exists on the remote repo. Existing card metadata (tags, license, etc.) is preserved. Only
supported for modular pipelines (i.e., `is_modular=True`).
"""
if not is_jinja_available():
raise ValueError(
@@ -145,17 +141,9 @@ def load_or_create_model_card(
" To install it, please run `pip install Jinja2`."
)
if update_model_card and not is_modular:
raise ValueError("`update_model_card=True` is only supported for modular pipelines (`is_modular=True`).")
try:
# Check if the model card is present on the remote repo
model_card = ModelCard.load(repo_id_or_path, token=token)
# For modular pipelines, regenerate card content when requested (preserve existing metadata)
if update_model_card and is_modular and model_description is not None:
existing_data = model_card.data
model_card = ModelCard(model_description)
model_card.data = existing_data
except (EntryNotFoundError, RepositoryNotFoundError):
# Otherwise create a model card from template
if from_training:

View File

@@ -7,9 +7,7 @@ from unittest.mock import MagicMock, patch
import torch
from transformers import CLIPTextModel, LongformerModel
from diffusers import ConfigMixin
from diffusers.models import AutoModel, UNet2DConditionModel
from diffusers.models.modeling_utils import ModelMixin
class TestAutoModel(unittest.TestCase):
@@ -145,51 +143,3 @@ class TestAutoModelFromConfig(unittest.TestCase):
def test_from_config_raises_on_none(self):
with self.assertRaises(ValueError, msg="Please provide a `pretrained_model_name_or_path_or_dict`"):
AutoModel.from_config(None)
class TestRegisterForAutoClass(unittest.TestCase):
def test_register_for_auto_class_sets_attribute(self):
class DummyModel(ModelMixin, ConfigMixin):
config_name = "config.json"
DummyModel.register_for_auto_class("AutoModel")
self.assertEqual(DummyModel._auto_class, "AutoModel")
def test_register_for_auto_class_rejects_unsupported(self):
class DummyModel(ModelMixin, ConfigMixin):
config_name = "config.json"
with self.assertRaises(ValueError, msg="Only 'AutoModel' is supported"):
DummyModel.register_for_auto_class("AutoPipeline")
def test_auto_map_in_saved_config(self):
class DummyModel(ModelMixin, ConfigMixin):
config_name = "config.json"
DummyModel.register_for_auto_class("AutoModel")
model = DummyModel()
with tempfile.TemporaryDirectory() as tmpdir:
model.save_config(tmpdir)
config_path = os.path.join(tmpdir, "config.json")
with open(config_path, "r") as f:
config = json.load(f)
self.assertIn("auto_map", config)
self.assertIn("AutoModel", config["auto_map"])
module_name = DummyModel.__module__.split(".")[-1]
self.assertEqual(config["auto_map"]["AutoModel"], f"{module_name}.DummyModel")
def test_no_auto_map_without_register(self):
class DummyModel(ModelMixin, ConfigMixin):
config_name = "config.json"
model = DummyModel()
with tempfile.TemporaryDirectory() as tmpdir:
model.save_config(tmpdir)
config_path = os.path.join(tmpdir, "config.json")
with open(config_path, "r") as f:
config = json.load(f)
self.assertNotIn("auto_map", config)

View File

@@ -1,6 +1,4 @@
import gc
import json
import os
import tempfile
from typing import Callable
@@ -351,33 +349,6 @@ class ModularPipelineTesterMixin:
assert torch.abs(image_slices[0] - image_slices[1]).max() < 1e-3
def test_modular_index_consistency(self):
pipe = self.get_pipeline()
components_spec = pipe._component_specs
components = sorted(components_spec.keys())
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
index_file = os.path.join(tmpdir, "modular_model_index.json")
assert os.path.exists(index_file)
with open(index_file) as f:
index_contents = json.load(f)
compulsory_keys = {"_blocks_class_name", "_class_name", "_diffusers_version"}
for k in compulsory_keys:
assert k in index_contents
to_check_attrs = {"pretrained_model_name_or_path", "revision", "subfolder"}
for component in components:
spec = components_spec[component]
for attr in to_check_attrs:
if getattr(spec, "pretrained_model_name_or_path", None) is not None:
for attr in to_check_attrs:
assert component in index_contents, f"{component} should be present in index but isn't."
attr_value_from_index = index_contents[component][2][attr]
assert getattr(spec, attr) == attr_value_from_index
def test_workflow_map(self):
blocks = self.pipeline_blocks_class()
if blocks._workflow_map is None:
@@ -483,7 +454,8 @@ class TestModularModelCardContent:
"blocks_description",
"components_description",
"configs_section",
"io_specification_section",
"inputs_description",
"outputs_description",
"trigger_inputs_section",
"tags",
]
@@ -580,19 +552,18 @@ class TestModularModelCardContent:
blocks = self.create_mock_blocks(inputs=inputs)
content = generate_modular_model_card_content(blocks)
io_section = content["io_specification_section"]
assert "**Inputs:**" in io_section
assert "prompt" in io_section
assert "num_steps" in io_section
assert "*optional*" in io_section
assert "defaults to `50`" in io_section
assert "**Required:**" in content["inputs_description"]
assert "**Optional:**" in content["inputs_description"]
assert "prompt" in content["inputs_description"]
assert "num_steps" in content["inputs_description"]
assert "default: `50`" in content["inputs_description"]
def test_inputs_description_empty(self):
"""Test handling of pipelines without specific inputs."""
blocks = self.create_mock_blocks(inputs=[])
content = generate_modular_model_card_content(blocks)
assert "No specific inputs defined" in content["io_specification_section"]
assert "No specific inputs defined" in content["inputs_description"]
def test_outputs_description_formatting(self):
"""Test that outputs are correctly formatted."""
@@ -602,16 +573,15 @@ class TestModularModelCardContent:
blocks = self.create_mock_blocks(outputs=outputs)
content = generate_modular_model_card_content(blocks)
io_section = content["io_specification_section"]
assert "images" in io_section
assert "Generated images" in io_section
assert "images" in content["outputs_description"]
assert "Generated images" in content["outputs_description"]
def test_outputs_description_empty(self):
"""Test handling of pipelines without specific outputs."""
blocks = self.create_mock_blocks(outputs=[])
content = generate_modular_model_card_content(blocks)
assert "Standard pipeline outputs" in content["io_specification_section"]
assert "Standard pipeline outputs" in content["outputs_description"]
def test_trigger_inputs_section_with_triggers(self):
"""Test that trigger inputs section is generated when present."""
@@ -629,6 +599,35 @@ class TestModularModelCardContent:
assert content["trigger_inputs_section"] == ""
def test_blocks_description_with_sub_blocks(self):
"""Test that blocks with sub-blocks are correctly described."""
class MockBlockWithSubBlocks:
def __init__(self):
self.__class__.__name__ = "ParentBlock"
self.description = "Parent block"
self.sub_blocks = {
"child1": self.create_child_block("ChildBlock1", "Child 1 description"),
"child2": self.create_child_block("ChildBlock2", "Child 2 description"),
}
def create_child_block(self, name, desc):
class ChildBlock:
def __init__(self):
self.__class__.__name__ = name
self.description = desc
return ChildBlock()
blocks = self.create_mock_blocks()
blocks.sub_blocks["parent"] = MockBlockWithSubBlocks()
content = generate_modular_model_card_content(blocks)
assert "parent" in content["blocks_description"]
assert "child1" in content["blocks_description"]
assert "child2" in content["blocks_description"]
def test_model_description_includes_block_count(self):
"""Test that model description includes the number of blocks."""
blocks = self.create_mock_blocks(num_blocks=5)
@@ -687,18 +686,6 @@ class TestLoadComponentsSkipBehavior:
assert pipe.unet is not None
assert getattr(pipe, "vae", None) is None
def test_load_components_selective_loading_incremental(self):
"""Loading a subset of components should not affect already-loaded components."""
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
pipe.load_components(names="unet", torch_dtype=torch.float32)
pipe.load_components(names="text_encoder", torch_dtype=torch.float32)
assert hasattr(pipe, "unet")
assert pipe.unet is not None
assert hasattr(pipe, "text_encoder")
assert pipe.text_encoder is not None
def test_load_components_skips_invalid_pretrained_path(self):
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
@@ -712,133 +699,3 @@ class TestLoadComponentsSkipBehavior:
# Verify test_component was not loaded
assert not hasattr(pipe, "test_component") or pipe.test_component is None
class TestCustomModelSavePretrained:
def test_save_pretrained_updates_index_for_local_model(self, tmp_path):
"""When a component without _diffusers_load_id (custom/local model) is saved,
modular_model_index.json should point to the save directory."""
import json
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
pipe.load_components(torch_dtype=torch.float32)
pipe.unet._diffusers_load_id = "null"
save_dir = str(tmp_path / "my-pipeline")
pipe.save_pretrained(save_dir)
with open(os.path.join(save_dir, "modular_model_index.json")) as f:
index = json.load(f)
_library, _cls, unet_spec = index["unet"]
assert unet_spec["pretrained_model_name_or_path"] == save_dir
assert unet_spec["subfolder"] == "unet"
_library, _cls, vae_spec = index["vae"]
assert vae_spec["pretrained_model_name_or_path"] == "hf-internal-testing/tiny-stable-diffusion-xl-pipe"
def test_save_pretrained_roundtrip_with_local_model(self, tmp_path):
"""A pipeline with a custom/local model should be saveable and re-loadable with identical outputs."""
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
pipe.load_components(torch_dtype=torch.float32)
pipe.unet._diffusers_load_id = "null"
original_state_dict = pipe.unet.state_dict()
save_dir = str(tmp_path / "my-pipeline")
pipe.save_pretrained(save_dir)
loaded_pipe = ModularPipeline.from_pretrained(save_dir)
loaded_pipe.load_components(torch_dtype=torch.float32)
assert loaded_pipe.unet is not None
assert loaded_pipe.unet.__class__.__name__ == pipe.unet.__class__.__name__
loaded_state_dict = loaded_pipe.unet.state_dict()
assert set(original_state_dict.keys()) == set(loaded_state_dict.keys())
for key in original_state_dict:
assert torch.equal(original_state_dict[key], loaded_state_dict[key]), f"Mismatch in {key}"
def test_save_pretrained_updates_index_for_model_with_no_load_id(self, tmp_path):
"""testing the workflow of update the pipeline with a custom model and save the pipeline,
the modular_model_index.json should point to the save directory."""
import json
from diffusers import UNet2DConditionModel
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
pipe.load_components(torch_dtype=torch.float32)
unet = UNet2DConditionModel.from_pretrained(
"hf-internal-testing/tiny-stable-diffusion-xl-pipe", subfolder="unet"
)
assert not hasattr(unet, "_diffusers_load_id")
pipe.update_components(unet=unet)
save_dir = str(tmp_path / "my-pipeline")
pipe.save_pretrained(save_dir)
with open(os.path.join(save_dir, "modular_model_index.json")) as f:
index = json.load(f)
_library, _cls, unet_spec = index["unet"]
assert unet_spec["pretrained_model_name_or_path"] == save_dir
assert unet_spec["subfolder"] == "unet"
_library, _cls, vae_spec = index["vae"]
assert vae_spec["pretrained_model_name_or_path"] == "hf-internal-testing/tiny-stable-diffusion-xl-pipe"
def test_save_pretrained_overwrite_modular_index(self, tmp_path):
"""With overwrite_modular_index=True, all component references should point to the save directory."""
import json
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
pipe.load_components(torch_dtype=torch.float32)
save_dir = str(tmp_path / "my-pipeline")
pipe.save_pretrained(save_dir, overwrite_modular_index=True)
with open(os.path.join(save_dir, "modular_model_index.json")) as f:
index = json.load(f)
for component_name in ["unet", "vae", "text_encoder", "text_encoder_2"]:
if component_name not in index:
continue
_library, _cls, spec = index[component_name]
assert spec["pretrained_model_name_or_path"] == save_dir, (
f"{component_name} should point to save dir but got {spec['pretrained_model_name_or_path']}"
)
assert spec["subfolder"] == component_name
loaded_pipe = ModularPipeline.from_pretrained(save_dir)
loaded_pipe.load_components(torch_dtype=torch.float32)
assert loaded_pipe.unet is not None
assert loaded_pipe.vae is not None
class TestModularPipelineInitFallback:
"""Test that ModularPipeline.__init__ falls back to default_blocks_name when
_blocks_class_name is a base class (e.g. SequentialPipelineBlocks saved by from_blocks_dict)."""
def test_init_fallback_when_blocks_class_name_is_base_class(self, tmp_path):
# 1. Load pipeline and get a workflow (returns a base SequentialPipelineBlocks)
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
t2i_blocks = pipe.blocks.get_workflow("text2image")
assert t2i_blocks.__class__.__name__ == "SequentialPipelineBlocks"
# 2. Use init_pipeline to create a new pipeline from the workflow blocks
t2i_pipe = t2i_blocks.init_pipeline("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
# 3. Save and reload — the saved config will have _blocks_class_name="SequentialPipelineBlocks"
save_dir = str(tmp_path / "pipeline")
t2i_pipe.save_pretrained(save_dir)
loaded_pipe = ModularPipeline.from_pretrained(save_dir)
# 4. Verify it fell back to default_blocks_name and has correct blocks
assert loaded_pipe.__class__.__name__ == pipe.__class__.__name__
assert loaded_pipe._blocks.__class__.__name__ == pipe._blocks.__class__.__name__
assert len(loaded_pipe._blocks.sub_blocks) == len(pipe._blocks.sub_blocks)

View File

@@ -192,156 +192,6 @@ class TestModularCustomBlocks:
assert len(pipe.components) == 1
assert pipe.component_names[0] == "transformer"
def test_trust_remote_code_not_propagated_to_external_repo(self):
"""When a modular pipeline repo references a component from an external repo that has custom
code (auto_map in config), calling load_components(trust_remote_code=True) should NOT
propagate trust_remote_code to that external component. The external component should fail
to load."""
from diffusers import ModularPipeline
CUSTOM_MODEL_CODE = (
"import torch\n"
"from diffusers import ModelMixin, ConfigMixin\n"
"from diffusers.configuration_utils import register_to_config\n"
"\n"
"class CustomModel(ModelMixin, ConfigMixin):\n"
" @register_to_config\n"
" def __init__(self, hidden_size=8):\n"
" super().__init__()\n"
" self.linear = torch.nn.Linear(hidden_size, hidden_size)\n"
"\n"
" def forward(self, x):\n"
" return self.linear(x)\n"
)
with tempfile.TemporaryDirectory() as external_repo_dir, tempfile.TemporaryDirectory() as pipeline_repo_dir:
# Step 1: Create an external model repo with custom code (requires trust_remote_code)
with open(os.path.join(external_repo_dir, "modeling.py"), "w") as f:
f.write(CUSTOM_MODEL_CODE)
config = {
"_class_name": "CustomModel",
"_diffusers_version": "0.0.0",
"auto_map": {"AutoModel": "modeling.CustomModel"},
"hidden_size": 8,
}
with open(os.path.join(external_repo_dir, "config.json"), "w") as f:
json.dump(config, f)
torch.save({}, os.path.join(external_repo_dir, "diffusion_pytorch_model.bin"))
# Step 2: Create a custom block that references the external repo.
# Define both the class (for direct use) and its code string (for block.py).
class ExternalRefBlock(ModularPipelineBlocks):
@property
def expected_components(self):
return [
ComponentSpec(
"custom_model",
AutoModel,
pretrained_model_name_or_path=external_repo_dir,
)
]
@property
def inputs(self) -> List[InputParam]:
return [InputParam("prompt", type_hint=str, required=True)]
@property
def intermediate_inputs(self) -> List[InputParam]:
return []
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [OutputParam("output", type_hint=str)]
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
block_state.output = "test"
self.set_block_state(state, block_state)
return components, state
EXTERNAL_REF_BLOCK_CODE_STR = (
"from typing import List\n"
"from diffusers import AutoModel\n"
"from diffusers.modular_pipelines import (\n"
" ComponentSpec,\n"
" InputParam,\n"
" ModularPipelineBlocks,\n"
" OutputParam,\n"
" PipelineState,\n"
")\n"
"\n"
"class ExternalRefBlock(ModularPipelineBlocks):\n"
" @property\n"
" def expected_components(self):\n"
" return [\n"
" ComponentSpec(\n"
' "custom_model",\n'
" AutoModel,\n"
f' pretrained_model_name_or_path="{external_repo_dir}",\n'
" )\n"
" ]\n"
"\n"
" @property\n"
" def inputs(self) -> List[InputParam]:\n"
' return [InputParam("prompt", type_hint=str, required=True)]\n'
"\n"
" @property\n"
" def intermediate_inputs(self) -> List[InputParam]:\n"
" return []\n"
"\n"
" @property\n"
" def intermediate_outputs(self) -> List[OutputParam]:\n"
' return [OutputParam("output", type_hint=str)]\n'
"\n"
" def __call__(self, components, state: PipelineState) -> PipelineState:\n"
" block_state = self.get_block_state(state)\n"
' block_state.output = "test"\n'
" self.set_block_state(state, block_state)\n"
" return components, state\n"
)
# Save the block config, write block.py, then load back via from_pretrained
block = ExternalRefBlock()
block.save_pretrained(pipeline_repo_dir)
# auto_map will reference the module name derived from ExternalRefBlock.__module__,
# which is "test_modular_pipelines_custom_blocks". Write the code file with that name.
code_path = os.path.join(pipeline_repo_dir, "test_modular_pipelines_custom_blocks.py")
with open(code_path, "w") as f:
f.write(EXTERNAL_REF_BLOCK_CODE_STR)
block = ModularPipelineBlocks.from_pretrained(pipeline_repo_dir, trust_remote_code=True)
pipe = block.init_pipeline()
pipe.save_pretrained(pipeline_repo_dir)
# Step 3: Load the pipeline from the saved directory.
loaded_pipe = ModularPipeline.from_pretrained(pipeline_repo_dir, trust_remote_code=True)
assert loaded_pipe._pretrained_model_name_or_path == pipeline_repo_dir
assert loaded_pipe._component_specs["custom_model"].pretrained_model_name_or_path == external_repo_dir
assert getattr(loaded_pipe, "custom_model", None) is None
# Step 4a: load_components WITHOUT trust_remote_code.
# It should still fail
loaded_pipe.load_components()
assert getattr(loaded_pipe, "custom_model", None) is None
# Step 4b: load_components with trust_remote_code=True.
# trust_remote_code should be stripped for the external component, so it fails.
# The warning should contain guidance about manually loading with trust_remote_code.
loaded_pipe.load_components(trust_remote_code=True)
assert getattr(loaded_pipe, "custom_model", None) is None
# Step 4c: Manually load with AutoModel and update_components — this should work.
from diffusers import AutoModel
custom_model = AutoModel.from_pretrained(external_repo_dir, trust_remote_code=True)
loaded_pipe.update_components(custom_model=custom_model)
assert getattr(loaded_pipe, "custom_model", None) is not None
def test_custom_block_loads_from_hub(self):
repo_id = "hf-internal-testing/tiny-modular-diffusers-block"
block = ModularPipelineBlocks.from_pretrained(repo_id, trust_remote_code=True)

View File

@@ -74,7 +74,7 @@ if is_torchao_available():
@require_torch
@require_torch_accelerator
@require_torchao_version_greater_or_equal("0.14.0")
@require_torchao_version_greater_or_equal("0.7.0")
class TorchAoConfigTest(unittest.TestCase):
def test_to_dict(self):
"""
@@ -132,7 +132,7 @@ class TorchAoConfigTest(unittest.TestCase):
# Slices for these tests have been obtained on our aws-g6e-xlarge-plus runners
@require_torch
@require_torch_accelerator
@require_torchao_version_greater_or_equal("0.14.0")
@require_torchao_version_greater_or_equal("0.7.0")
class TorchAoTest(unittest.TestCase):
def tearDown(self):
gc.collect()
@@ -587,7 +587,7 @@ class TorchAoTest(unittest.TestCase):
# Slices for these tests have been obtained on our aws-g6e-xlarge-plus runners
@require_torch
@require_torch_accelerator
@require_torchao_version_greater_or_equal("0.14.0")
@require_torchao_version_greater_or_equal("0.7.0")
class TorchAoSerializationTest(unittest.TestCase):
model_name = "hf-internal-testing/tiny-flux-pipe"
@@ -698,22 +698,23 @@ class TorchAoSerializationTest(unittest.TestCase):
self._check_serialization_expected_slice(quant_method, quant_method_kwargs, expected_slice, device)
@require_torchao_version_greater_or_equal("0.14.0")
@require_torchao_version_greater_or_equal("0.7.0")
class TorchAoCompileTest(QuantCompileTests, unittest.TestCase):
@property
def quantization_config(self):
return PipelineQuantizationConfig(
quant_mapping={"transformer": TorchAoConfig(Int8WeightOnlyConfig())},
quant_mapping={
"transformer": TorchAoConfig(quant_type="int8_weight_only"),
},
)
@unittest.skip(
"Changing the device of AQT tensor with module._apply (called from doing module.to() in accelerate) does not work "
"when compiling."
)
def test_torch_compile_with_cpu_offload(self):
pipe = self._init_pipeline(self.quantization_config, torch.bfloat16)
pipe.enable_model_cpu_offload()
# No compilation because it fails with:
# RuntimeError: _apply(): Couldn't swap Linear.weight
# small resolutions to ensure speedy execution.
pipe("a dog", num_inference_steps=2, max_sequence_length=16, height=256, width=256)
super().test_torch_compile_with_cpu_offload()
@parameterized.expand([False, True])
@unittest.skip(
@@ -744,7 +745,7 @@ class TorchAoCompileTest(QuantCompileTests, unittest.TestCase):
# Slices for these tests have been obtained on our aws-g6e-xlarge-plus runners
@require_torch
@require_torch_accelerator
@require_torchao_version_greater_or_equal("0.14.0")
@require_torchao_version_greater_or_equal("0.7.0")
@slow
@nightly
class SlowTorchAoTests(unittest.TestCase):
@@ -906,7 +907,7 @@ class SlowTorchAoTests(unittest.TestCase):
@require_torch
@require_torch_accelerator
@require_torchao_version_greater_or_equal("0.14.0")
@require_torchao_version_greater_or_equal("0.7.0")
@slow
@nightly
class SlowTorchAoPreserializedModelTests(unittest.TestCase):