mirror of
https://github.com/huggingface/diffusers.git
synced 2026-03-16 13:37:55 +08:00
Compare commits
4 Commits
modular-da
...
refactor-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e6578bbe3 | ||
|
|
81aa43271b | ||
|
|
9239908f5d | ||
|
|
9cd3e6ba88 |
@@ -1,447 +0,0 @@
|
||||
# Copyright 2025 The HuggingFace Team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ast
|
||||
import re
|
||||
from argparse import ArgumentParser, Namespace
|
||||
from collections import OrderedDict
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from ..utils import logging
|
||||
from . import BaseDiffusersCLICommand
|
||||
|
||||
|
||||
logger = logging.get_logger("diffusers-cli/daggr")
|
||||
|
||||
INTERNAL_TYPE_NAMES = {
|
||||
"Tensor",
|
||||
"Generator",
|
||||
}
|
||||
|
||||
INTERNAL_TYPE_FULL_NAMES = {
|
||||
"torch.Tensor",
|
||||
"torch.Generator",
|
||||
"torch.dtype",
|
||||
}
|
||||
|
||||
SLIDER_PARAMS = {
|
||||
"height": {"minimum": 256, "maximum": 2048, "step": 64},
|
||||
"width": {"minimum": 256, "maximum": 2048, "step": 64},
|
||||
"num_inference_steps": {"minimum": 1, "maximum": 100, "step": 1},
|
||||
"guidance_scale": {"minimum": 0, "maximum": 30, "step": 0.5},
|
||||
"strength": {"minimum": 0, "maximum": 1, "step": 0.05},
|
||||
"control_guidance_start": {"minimum": 0, "maximum": 1, "step": 0.05},
|
||||
"control_guidance_end": {"minimum": 0, "maximum": 1, "step": 0.05},
|
||||
"controlnet_conditioning_scale": {"minimum": 0, "maximum": 2, "step": 0.1},
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class BlockInfo:
|
||||
name: str
|
||||
class_name: str
|
||||
description: str
|
||||
inputs: list
|
||||
outputs: list
|
||||
user_inputs: list = field(default_factory=list)
|
||||
port_connections: list = field(default_factory=list)
|
||||
fixed_inputs: list = field(default_factory=list)
|
||||
|
||||
|
||||
def daggr_command_factory(args: Namespace):
|
||||
return DaggrCommand(
|
||||
repo_id=args.repo_id,
|
||||
output=args.output or "daggr_app.py",
|
||||
workflow=getattr(args, "workflow", None),
|
||||
trigger_inputs=getattr(args, "trigger_inputs", None),
|
||||
)
|
||||
|
||||
|
||||
class DaggrCommand(BaseDiffusersCLICommand):
|
||||
@staticmethod
|
||||
def register_subcommand(parser: ArgumentParser):
|
||||
daggr_parser = parser.add_parser("daggr", help="Generate a daggr app from a modular pipeline repo.")
|
||||
daggr_parser.add_argument(
|
||||
"repo_id",
|
||||
type=str,
|
||||
help="HuggingFace Hub repo ID containing a modular pipeline (with modular_model_index.json).",
|
||||
)
|
||||
daggr_parser.add_argument(
|
||||
"--output",
|
||||
type=str,
|
||||
default="daggr_app.py",
|
||||
help="Output file path for the generated daggr app. Default: daggr_app.py",
|
||||
)
|
||||
daggr_parser.add_argument(
|
||||
"--workflow",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Named workflow to resolve conditional blocks (e.g. 'text2image', 'image2image').",
|
||||
)
|
||||
daggr_parser.add_argument(
|
||||
"--trigger-inputs",
|
||||
nargs="*",
|
||||
default=None,
|
||||
help="Trigger input names for manual conditional resolution.",
|
||||
)
|
||||
daggr_parser.set_defaults(func=daggr_command_factory)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repo_id: str,
|
||||
output: str = "daggr_app.py",
|
||||
workflow: str | None = None,
|
||||
trigger_inputs: list | None = None,
|
||||
):
|
||||
self.repo_id = repo_id
|
||||
self.output = output
|
||||
self.workflow = workflow
|
||||
self.trigger_inputs = trigger_inputs
|
||||
|
||||
def run(self):
|
||||
from ..modular_pipelines.modular_pipeline import ModularPipelineBlocks
|
||||
|
||||
logger.info(f"Loading blocks from {self.repo_id}...")
|
||||
blocks = ModularPipelineBlocks.from_pretrained(self.repo_id, trust_remote_code=True)
|
||||
blocks_class_name = blocks.__class__.__name__
|
||||
|
||||
if self.workflow:
|
||||
logger.info(f"Resolving workflow: {self.workflow}")
|
||||
exec_blocks = blocks.get_workflow(self.workflow)
|
||||
elif self.trigger_inputs:
|
||||
trigger_kwargs = {name: True for name in self.trigger_inputs}
|
||||
logger.info(f"Resolving with trigger inputs: {self.trigger_inputs}")
|
||||
exec_blocks = blocks.get_execution_blocks(**trigger_kwargs)
|
||||
else:
|
||||
logger.info("Resolving default execution blocks...")
|
||||
exec_blocks = blocks.get_execution_blocks()
|
||||
|
||||
block_infos = _analyze_blocks(exec_blocks)
|
||||
_classify_inputs(block_infos)
|
||||
|
||||
workflow_label = self.workflow or "default"
|
||||
workflow_resolve_code = self._get_workflow_resolve_code()
|
||||
code = _generate_code(block_infos, self.repo_id, blocks_class_name, workflow_label, workflow_resolve_code)
|
||||
|
||||
try:
|
||||
ast.parse(code)
|
||||
except SyntaxError as e:
|
||||
logger.warning(f"Generated code has syntax error: {e}")
|
||||
|
||||
with open(self.output, "w") as f:
|
||||
f.write(code)
|
||||
|
||||
logger.info(f"Daggr app written to {self.output}")
|
||||
print(f"Generated daggr app: {self.output}")
|
||||
print(f" Pipeline: {blocks_class_name}")
|
||||
print(f" Workflow: {workflow_label}")
|
||||
print(f" Blocks: {len(block_infos)}")
|
||||
print(f"\nRun with: python {self.output}")
|
||||
|
||||
def _get_workflow_resolve_code(self):
|
||||
if self.workflow:
|
||||
return f"_pipeline._blocks.get_workflow({self.workflow!r})"
|
||||
elif self.trigger_inputs:
|
||||
kwargs_str = ", ".join(f"{name!r}: True" for name in self.trigger_inputs)
|
||||
return f"_pipeline._blocks.get_execution_blocks(**{{{kwargs_str}}})"
|
||||
else:
|
||||
return "_pipeline._blocks.get_execution_blocks()"
|
||||
|
||||
|
||||
def _analyze_blocks(exec_blocks):
|
||||
block_infos = []
|
||||
for name, block in exec_blocks.sub_blocks.items():
|
||||
info = BlockInfo(
|
||||
name=name,
|
||||
class_name=block.__class__.__name__,
|
||||
description=getattr(block, "description", "") or "",
|
||||
inputs=list(block.inputs) if hasattr(block, "inputs") else [],
|
||||
outputs=list(block.intermediate_outputs) if hasattr(block, "intermediate_outputs") else [],
|
||||
)
|
||||
block_infos.append(info)
|
||||
return block_infos
|
||||
|
||||
|
||||
def _get_type_name(type_hint):
|
||||
if type_hint is None:
|
||||
return None
|
||||
if hasattr(type_hint, "__name__"):
|
||||
return type_hint.__name__
|
||||
if hasattr(type_hint, "__module__") and hasattr(type_hint, "__qualname__"):
|
||||
return f"{type_hint.__module__}.{type_hint.__qualname__}"
|
||||
return str(type_hint)
|
||||
|
||||
|
||||
def _is_internal_type(type_hint):
|
||||
if type_hint is None:
|
||||
return True
|
||||
type_name = _get_type_name(type_hint)
|
||||
if type_name is None:
|
||||
return True
|
||||
if type_name in INTERNAL_TYPE_NAMES or type_name in INTERNAL_TYPE_FULL_NAMES:
|
||||
return True
|
||||
type_str = str(type_hint)
|
||||
for full_name in INTERNAL_TYPE_FULL_NAMES:
|
||||
if full_name in type_str:
|
||||
return True
|
||||
if type_str.startswith("dict[") or type_str == "dict":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _type_hint_to_gradio(type_hint, param_name, default=None):
|
||||
if _is_internal_type(type_hint):
|
||||
return None
|
||||
|
||||
if param_name in SLIDER_PARAMS:
|
||||
slider_opts = SLIDER_PARAMS[param_name]
|
||||
val = default if default is not None else slider_opts.get("minimum", 0)
|
||||
return (
|
||||
f'gr.Slider(label="{param_name}", value={val!r}, '
|
||||
f"minimum={slider_opts['minimum']}, maximum={slider_opts['maximum']}, "
|
||||
f"step={slider_opts['step']})"
|
||||
)
|
||||
|
||||
type_name = _get_type_name(type_hint)
|
||||
type_str = str(type_hint)
|
||||
|
||||
if type_name == "str" or type_hint is str:
|
||||
lines = 3 if "prompt" in param_name else 1
|
||||
default_repr = f", value={default!r}" if default is not None else ""
|
||||
return f'gr.Textbox(label="{param_name}", lines={lines}{default_repr})'
|
||||
|
||||
if type_name == "int" or type_hint is int:
|
||||
val = f", value={default!r}" if default is not None else ""
|
||||
return f'gr.Number(label="{param_name}", precision=0{val})'
|
||||
|
||||
if type_name == "float" or type_hint is float:
|
||||
val = f", value={default!r}" if default is not None else ""
|
||||
return f'gr.Number(label="{param_name}"{val})'
|
||||
|
||||
if type_name == "bool" or type_hint is bool:
|
||||
val = default if default is not None else False
|
||||
return f'gr.Checkbox(label="{param_name}", value={val!r})'
|
||||
|
||||
if "Image" in type_str:
|
||||
if "list" in type_str.lower():
|
||||
return f'gr.Gallery(label="{param_name}")'
|
||||
return f'gr.Image(label="{param_name}")'
|
||||
|
||||
if default is not None:
|
||||
return f'gr.Textbox(label="{param_name}", value={default!r})'
|
||||
|
||||
return f'gr.Textbox(label="{param_name}")'
|
||||
|
||||
|
||||
def _output_type_to_gradio(type_hint, param_name):
|
||||
if _is_internal_type(type_hint):
|
||||
return None
|
||||
type_str = str(type_hint)
|
||||
if "Image" in type_str:
|
||||
if "list" in type_str.lower():
|
||||
return f'gr.Gallery(label="{param_name}")'
|
||||
return f'gr.Image(label="{param_name}")'
|
||||
if type_hint is str:
|
||||
return f'gr.Textbox(label="{param_name}")'
|
||||
if type_hint is int or type_hint is float:
|
||||
return f'gr.Number(label="{param_name}")'
|
||||
return None
|
||||
|
||||
|
||||
def _classify_inputs(block_infos):
|
||||
all_prior_outputs = {}
|
||||
|
||||
for info in block_infos:
|
||||
user_inputs = []
|
||||
port_connections = []
|
||||
fixed_inputs = []
|
||||
|
||||
for inp in info.inputs:
|
||||
if inp.name is None:
|
||||
continue
|
||||
if inp.name in all_prior_outputs:
|
||||
port_connections.append((inp.name, all_prior_outputs[inp.name]))
|
||||
elif _is_internal_type(inp.type_hint):
|
||||
fixed_inputs.append(inp)
|
||||
else:
|
||||
user_inputs.append(inp)
|
||||
|
||||
info.user_inputs = user_inputs
|
||||
info.port_connections = port_connections
|
||||
info.fixed_inputs = fixed_inputs
|
||||
|
||||
for out in info.outputs:
|
||||
if out.name and out.name not in all_prior_outputs:
|
||||
all_prior_outputs[out.name] = info.name
|
||||
|
||||
|
||||
def _sanitize_name(name):
|
||||
sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", name)
|
||||
if sanitized and sanitized[0].isdigit():
|
||||
sanitized = f"_{sanitized}"
|
||||
return sanitized
|
||||
|
||||
|
||||
def _generate_code(block_infos, repo_id, blocks_class_name, workflow_label, workflow_resolve_code):
|
||||
lines = []
|
||||
|
||||
lines.append(f'"""Daggr app for {blocks_class_name} ({workflow_label} workflow)')
|
||||
lines.append("Generated by: diffusers-cli daggr")
|
||||
lines.append('"""')
|
||||
lines.append("")
|
||||
lines.append("import gradio as gr")
|
||||
lines.append("from daggr import FnNode, InputNode, Graph")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
|
||||
# Pipeline and resolved blocks loader
|
||||
lines.append("_pipeline = None")
|
||||
lines.append("_exec_blocks = None")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("def _get_pipeline():")
|
||||
lines.append(" global _pipeline, _exec_blocks")
|
||||
lines.append(" if _pipeline is None:")
|
||||
lines.append(" from diffusers import ModularPipeline")
|
||||
lines.append(f" _pipeline = ModularPipeline.from_pretrained({repo_id!r}, trust_remote_code=True)")
|
||||
lines.append(" _pipeline.load_components()")
|
||||
lines.append(f" _exec_blocks = {workflow_resolve_code}")
|
||||
lines.append(" return _pipeline, _exec_blocks")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
|
||||
# Wrapper functions
|
||||
for info in block_infos:
|
||||
fn_name = f"run_{_sanitize_name(info.name)}"
|
||||
all_input_names = []
|
||||
for inp in info.inputs:
|
||||
if inp.name is not None:
|
||||
all_input_names.append(inp.name)
|
||||
|
||||
params = ", ".join(all_input_names)
|
||||
lines.append(f"def {fn_name}({params}):")
|
||||
lines.append(" from diffusers.modular_pipelines.modular_pipeline import PipelineState")
|
||||
lines.append("")
|
||||
lines.append(" pipe, exec_blocks = _get_pipeline()")
|
||||
lines.append(" state = PipelineState()")
|
||||
for inp_name in all_input_names:
|
||||
lines.append(f' state.set("{inp_name}", {inp_name})')
|
||||
lines.append(f' block = exec_blocks.sub_blocks["{info.name}"]')
|
||||
lines.append(" _, state = block(pipe, state)")
|
||||
|
||||
if len(info.outputs) == 0:
|
||||
lines.append(" return None")
|
||||
elif len(info.outputs) == 1:
|
||||
out = info.outputs[0]
|
||||
lines.append(f' return state.get("{out.name}")')
|
||||
else:
|
||||
out_names = [out.name for out in info.outputs]
|
||||
out_dict = ", ".join(f'"{n}": state.get("{n}")' for n in out_names)
|
||||
lines.append(f" return {{{out_dict}}}")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
|
||||
# Collect all user-facing inputs across blocks
|
||||
all_user_inputs = OrderedDict()
|
||||
for info in block_infos:
|
||||
for inp in info.user_inputs:
|
||||
if inp.name not in all_user_inputs:
|
||||
all_user_inputs[inp.name] = inp
|
||||
|
||||
# InputNode
|
||||
if all_user_inputs:
|
||||
lines.append("# -- User Inputs --")
|
||||
lines.append('user_inputs = InputNode("User Inputs", ports={')
|
||||
for inp_name, inp in all_user_inputs.items():
|
||||
gradio_comp = _type_hint_to_gradio(inp.type_hint, inp_name, inp.default)
|
||||
if gradio_comp:
|
||||
lines.append(f' "{inp_name}": {gradio_comp},')
|
||||
lines.append("})")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
|
||||
# FnNode definitions
|
||||
lines.append("# -- Pipeline Blocks --")
|
||||
node_var_names = {}
|
||||
|
||||
for info in block_infos:
|
||||
var_name = f"{_sanitize_name(info.name)}_node"
|
||||
node_var_names[info.name] = var_name
|
||||
fn_name = f"run_{_sanitize_name(info.name)}"
|
||||
|
||||
display_name = info.name.replace("_", " ").replace(".", " > ").title()
|
||||
|
||||
# Build inputs dict
|
||||
input_entries = []
|
||||
for inp in info.inputs:
|
||||
if inp.name is None:
|
||||
continue
|
||||
|
||||
connected = False
|
||||
for conn_name, source_block in info.port_connections:
|
||||
if conn_name == inp.name:
|
||||
source_var = node_var_names[source_block]
|
||||
input_entries.append(f' "{inp.name}": {source_var}.{inp.name},')
|
||||
connected = True
|
||||
break
|
||||
|
||||
if not connected:
|
||||
if inp.name in all_user_inputs:
|
||||
input_entries.append(f' "{inp.name}": user_inputs.{inp.name},')
|
||||
elif inp.default is not None:
|
||||
input_entries.append(f' "{inp.name}": {inp.default!r},')
|
||||
else:
|
||||
input_entries.append(f' "{inp.name}": None,')
|
||||
|
||||
# Build outputs dict
|
||||
output_entries = []
|
||||
for out in info.outputs:
|
||||
gradio_out = _output_type_to_gradio(out.type_hint, out.name)
|
||||
if gradio_out:
|
||||
output_entries.append(f' "{out.name}": {gradio_out},')
|
||||
else:
|
||||
output_entries.append(f' "{out.name}": None,')
|
||||
|
||||
lines.append(f"{var_name} = FnNode(")
|
||||
lines.append(f" fn={fn_name},")
|
||||
lines.append(f' name="{display_name}",')
|
||||
|
||||
if input_entries:
|
||||
lines.append(" inputs={")
|
||||
lines.extend(input_entries)
|
||||
lines.append(" },")
|
||||
|
||||
if output_entries:
|
||||
lines.append(" outputs={")
|
||||
lines.extend(output_entries)
|
||||
lines.append(" },")
|
||||
|
||||
lines.append(")")
|
||||
lines.append("")
|
||||
|
||||
# Graph
|
||||
lines.append("")
|
||||
lines.append("# -- Graph --")
|
||||
all_node_vars = []
|
||||
if all_user_inputs:
|
||||
all_node_vars.append("user_inputs")
|
||||
all_node_vars.extend(node_var_names[info.name] for info in block_infos)
|
||||
|
||||
graph_name = f"{blocks_class_name} - {workflow_label}"
|
||||
nodes_str = ", ".join(all_node_vars)
|
||||
lines.append(f'graph = Graph("{graph_name}", nodes=[{nodes_str}])')
|
||||
lines.append("graph.launch()")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -16,7 +16,6 @@
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from .custom_blocks import CustomBlocksCommand
|
||||
from .daggr_app import DaggrCommand
|
||||
from .env import EnvironmentCommand
|
||||
from .fp16_safetensors import FP16SafetensorsCommand
|
||||
|
||||
@@ -29,7 +28,6 @@ def main():
|
||||
EnvironmentCommand.register_subcommand(commands_parser)
|
||||
FP16SafetensorsCommand.register_subcommand(commands_parser)
|
||||
CustomBlocksCommand.register_subcommand(commands_parser)
|
||||
DaggrCommand.register_subcommand(commands_parser)
|
||||
|
||||
# Let's go
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -36,7 +36,7 @@ from typing import Any, Callable
|
||||
|
||||
from packaging import version
|
||||
|
||||
from ..utils import deprecate, is_torch_available, is_torchao_available, is_torchao_version, logging
|
||||
from ..utils import is_torch_available, is_torchao_available, is_torchao_version, logging
|
||||
|
||||
|
||||
if is_torch_available():
|
||||
@@ -844,8 +844,6 @@ class QuantoConfig(QuantizationConfigMixin):
|
||||
modules_to_not_convert: list[str] | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
deprecation_message = "`QuantoConfig` is deprecated and will be removed in version 1.0.0."
|
||||
deprecate("QuantoConfig", "1.0.0", deprecation_message)
|
||||
self.quant_method = QuantizationMethod.QUANTO
|
||||
self.weights_dtype = weights_dtype
|
||||
self.modules_to_not_convert = modules_to_not_convert
|
||||
|
||||
@@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, Any
|
||||
from diffusers.utils.import_utils import is_optimum_quanto_version
|
||||
|
||||
from ...utils import (
|
||||
deprecate,
|
||||
get_module_from_name,
|
||||
is_accelerate_available,
|
||||
is_accelerate_version,
|
||||
@@ -43,9 +42,6 @@ class QuantoQuantizer(DiffusersQuantizer):
|
||||
super().__init__(quantization_config, **kwargs)
|
||||
|
||||
def validate_environment(self, *args, **kwargs):
|
||||
deprecation_message = "The Quanto quantizer is deprecated and will be removed in version 1.0.0."
|
||||
deprecate("QuantoQuantizer", "1.0.0", deprecation_message)
|
||||
|
||||
if not is_optimum_quanto_available():
|
||||
raise ImportError(
|
||||
"Loading an optimum-quanto quantized model requires optimum-quanto library (`pip install optimum-quanto`)"
|
||||
|
||||
@@ -1,244 +0,0 @@
|
||||
# Copyright 2025 HuggingFace Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from diffusers import MagCacheConfig, apply_mag_cache
|
||||
from diffusers.hooks._helpers import TransformerBlockMetadata, TransformerBlockRegistry
|
||||
from diffusers.models import ModelMixin
|
||||
from diffusers.utils import logging
|
||||
|
||||
|
||||
logger = logging.get_logger(__name__)
|
||||
|
||||
|
||||
class DummyBlock(torch.nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
||||
# Output is double input
|
||||
# This ensures Residual = 2*Input - Input = Input
|
||||
return hidden_states * 2.0
|
||||
|
||||
|
||||
class DummyTransformer(ModelMixin):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.transformer_blocks = torch.nn.ModuleList([DummyBlock(), DummyBlock()])
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None):
|
||||
for block in self.transformer_blocks:
|
||||
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
||||
return hidden_states
|
||||
|
||||
|
||||
class TupleOutputBlock(torch.nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
||||
# Returns a tuple
|
||||
return hidden_states * 2.0, encoder_hidden_states
|
||||
|
||||
|
||||
class TupleTransformer(ModelMixin):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.transformer_blocks = torch.nn.ModuleList([TupleOutputBlock()])
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None):
|
||||
for block in self.transformer_blocks:
|
||||
# Emulate Flux-like behavior
|
||||
output = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
||||
hidden_states = output[0]
|
||||
encoder_hidden_states = output[1]
|
||||
return hidden_states, encoder_hidden_states
|
||||
|
||||
|
||||
class MagCacheTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Register standard dummy block
|
||||
TransformerBlockRegistry.register(
|
||||
DummyBlock,
|
||||
TransformerBlockMetadata(return_hidden_states_index=None, return_encoder_hidden_states_index=None),
|
||||
)
|
||||
# Register tuple block (Flux style)
|
||||
TransformerBlockRegistry.register(
|
||||
TupleOutputBlock,
|
||||
TransformerBlockMetadata(return_hidden_states_index=0, return_encoder_hidden_states_index=1),
|
||||
)
|
||||
|
||||
def _set_context(self, model, context_name):
|
||||
"""Helper to set context on all hooks in the model."""
|
||||
for module in model.modules():
|
||||
if hasattr(module, "_diffusers_hook"):
|
||||
module._diffusers_hook._set_context(context_name)
|
||||
|
||||
def _get_calibration_data(self, model):
|
||||
for module in model.modules():
|
||||
if hasattr(module, "_diffusers_hook"):
|
||||
hook = module._diffusers_hook.get_hook("mag_cache_block_hook")
|
||||
if hook:
|
||||
return hook.state_manager.get_state().calibration_ratios
|
||||
return []
|
||||
|
||||
def test_mag_cache_validation(self):
|
||||
"""Test that missing mag_ratios raises ValueError."""
|
||||
with self.assertRaises(ValueError):
|
||||
MagCacheConfig(num_inference_steps=10, calibrate=False)
|
||||
|
||||
def test_mag_cache_skipping_logic(self):
|
||||
"""
|
||||
Tests that MagCache correctly calculates residuals and skips blocks when conditions are met.
|
||||
"""
|
||||
model = DummyTransformer()
|
||||
|
||||
# Dummy ratios: [1.0, 1.0] implies 0 accumulated error if we skip
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0,
|
||||
num_inference_steps=2,
|
||||
retention_ratio=0.0, # Enable immediate skipping
|
||||
max_skip_steps=5,
|
||||
mag_ratios=ratios,
|
||||
)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0: Input 10.0 -> Output 40.0 (2 blocks * 2x each)
|
||||
# HeadInput=10. Output=40. Residual=30.
|
||||
input_t0 = torch.tensor([[[10.0]]])
|
||||
output_t0 = model(input_t0)
|
||||
self.assertTrue(torch.allclose(output_t0, torch.tensor([[[40.0]]])), "Step 0 failed")
|
||||
|
||||
# Step 1: Input 11.0.
|
||||
# If Skipped: Output = Input(11) + Residual(30) = 41.0
|
||||
# If Computed: Output = 11 * 4 = 44.0
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
output_t1 = model(input_t1)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(output_t1, torch.tensor([[[41.0]]])), f"Expected Skip (41.0), got {output_t1.item()}"
|
||||
)
|
||||
|
||||
def test_mag_cache_retention(self):
|
||||
"""Test that retention_ratio prevents skipping even if error is low."""
|
||||
model = DummyTransformer()
|
||||
# Ratios that imply 0 error, so it *would* skip if retention allowed it
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0,
|
||||
num_inference_steps=2,
|
||||
retention_ratio=1.0, # Force retention for ALL steps
|
||||
mag_ratios=ratios,
|
||||
)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Step 1: Should COMPUTE (44.0) not SKIP (41.0) because of retention
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
output_t1 = model(input_t1)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(output_t1, torch.tensor([[[44.0]]])),
|
||||
f"Expected Compute (44.0) due to retention, got {output_t1.item()}",
|
||||
)
|
||||
|
||||
def test_mag_cache_tuple_outputs(self):
|
||||
"""Test compatibility with models returning (hidden, encoder_hidden) like Flux."""
|
||||
model = TupleTransformer()
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=ratios)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0: Compute. Input 10.0 -> Output 20.0 (1 block * 2x)
|
||||
# Residual = 10.0
|
||||
input_t0 = torch.tensor([[[10.0]]])
|
||||
enc_t0 = torch.tensor([[[1.0]]])
|
||||
out_0, _ = model(input_t0, encoder_hidden_states=enc_t0)
|
||||
self.assertTrue(torch.allclose(out_0, torch.tensor([[[20.0]]])))
|
||||
|
||||
# Step 1: Skip. Input 11.0.
|
||||
# Skipped Output = 11 + 10 = 21.0
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
out_1, _ = model(input_t1, encoder_hidden_states=enc_t0)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(out_1, torch.tensor([[[21.0]]])), f"Tuple skip failed. Expected 21.0, got {out_1.item()}"
|
||||
)
|
||||
|
||||
def test_mag_cache_reset(self):
|
||||
"""Test that state resets correctly after num_inference_steps."""
|
||||
model = DummyTransformer()
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=np.array([1.0, 1.0])
|
||||
)
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
input_t = torch.ones(1, 1, 1)
|
||||
|
||||
model(input_t) # Step 0
|
||||
model(input_t) # Step 1 (Skipped)
|
||||
|
||||
# Step 2 (Reset -> Step 0) -> Should Compute
|
||||
# Input 2.0 -> Output 8.0
|
||||
input_t2 = torch.tensor([[[2.0]]])
|
||||
output_t2 = model(input_t2)
|
||||
|
||||
self.assertTrue(torch.allclose(output_t2, torch.tensor([[[8.0]]])), "State did not reset correctly")
|
||||
|
||||
def test_mag_cache_calibration(self):
|
||||
"""Test that calibration mode records ratios."""
|
||||
model = DummyTransformer()
|
||||
config = MagCacheConfig(num_inference_steps=2, calibrate=True)
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0
|
||||
# HeadInput = 10. Output = 40. Residual = 30.
|
||||
# Ratio 0 is placeholder 1.0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Check intermediate state
|
||||
ratios = self._get_calibration_data(model)
|
||||
self.assertEqual(len(ratios), 1)
|
||||
self.assertEqual(ratios[0], 1.0)
|
||||
|
||||
# Step 1
|
||||
# HeadInput = 10. Output = 40. Residual = 30.
|
||||
# PrevResidual = 30. CurrResidual = 30.
|
||||
# Ratio = 30/30 = 1.0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Verify it computes fully (no skip)
|
||||
# If it skipped, output would be 41.0. It should be 40.0
|
||||
# Actually in test setup, input is same (10.0) so output 40.0.
|
||||
# Let's ensure list is empty after reset (end of step 1)
|
||||
ratios_after = self._get_calibration_data(model)
|
||||
self.assertEqual(ratios_after, [])
|
||||
@@ -5,8 +5,12 @@ from .cache import (
|
||||
FasterCacheTesterMixin,
|
||||
FirstBlockCacheConfigMixin,
|
||||
FirstBlockCacheTesterMixin,
|
||||
MagCacheConfigMixin,
|
||||
MagCacheTesterMixin,
|
||||
PyramidAttentionBroadcastConfigMixin,
|
||||
PyramidAttentionBroadcastTesterMixin,
|
||||
TaylorSeerCacheConfigMixin,
|
||||
TaylorSeerCacheTesterMixin,
|
||||
)
|
||||
from .common import BaseModelTesterConfig, ModelTesterMixin
|
||||
from .compile import TorchCompileTesterMixin
|
||||
@@ -50,6 +54,8 @@ __all__ = [
|
||||
"FasterCacheTesterMixin",
|
||||
"FirstBlockCacheConfigMixin",
|
||||
"FirstBlockCacheTesterMixin",
|
||||
"MagCacheConfigMixin",
|
||||
"MagCacheTesterMixin",
|
||||
"GGUFCompileTesterMixin",
|
||||
"GGUFConfigMixin",
|
||||
"GGUFTesterMixin",
|
||||
@@ -65,6 +71,8 @@ __all__ = [
|
||||
"ModelTesterMixin",
|
||||
"PyramidAttentionBroadcastConfigMixin",
|
||||
"PyramidAttentionBroadcastTesterMixin",
|
||||
"TaylorSeerCacheConfigMixin",
|
||||
"TaylorSeerCacheTesterMixin",
|
||||
"QuantizationCompileTesterMixin",
|
||||
"QuantizationTesterMixin",
|
||||
"QuantoCompileTesterMixin",
|
||||
|
||||
@@ -18,10 +18,18 @@ import gc
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from diffusers.hooks import FasterCacheConfig, FirstBlockCacheConfig, PyramidAttentionBroadcastConfig
|
||||
from diffusers.hooks import (
|
||||
FasterCacheConfig,
|
||||
FirstBlockCacheConfig,
|
||||
MagCacheConfig,
|
||||
PyramidAttentionBroadcastConfig,
|
||||
TaylorSeerCacheConfig,
|
||||
)
|
||||
from diffusers.hooks.faster_cache import _FASTER_CACHE_BLOCK_HOOK, _FASTER_CACHE_DENOISER_HOOK
|
||||
from diffusers.hooks.first_block_cache import _FBC_BLOCK_HOOK, _FBC_LEADER_BLOCK_HOOK
|
||||
from diffusers.hooks.mag_cache import _MAG_CACHE_BLOCK_HOOK, _MAG_CACHE_LEADER_BLOCK_HOOK
|
||||
from diffusers.hooks.pyramid_attention_broadcast import _PYRAMID_ATTENTION_BROADCAST_HOOK
|
||||
from diffusers.hooks.taylorseer_cache import _TAYLORSEER_CACHE_HOOK
|
||||
from diffusers.models.cache_utils import CacheMixin
|
||||
|
||||
from ...testing_utils import assert_tensors_close, backend_empty_cache, is_cache, torch_device
|
||||
@@ -554,3 +562,192 @@ class FasterCacheTesterMixin(FasterCacheConfigMixin, CacheTesterMixin):
|
||||
@require_cache_mixin
|
||||
def test_faster_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
|
||||
@is_cache
|
||||
class MagCacheConfigMixin:
|
||||
"""
|
||||
Base mixin providing MagCache config.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
"""
|
||||
|
||||
# Default MagCache config - can be overridden by subclasses.
|
||||
# Uses neutral ratios [1.0, 1.0] and a high threshold so the second
|
||||
# inference step is always skipped, which is required by _test_cache_inference.
|
||||
MAG_CACHE_CONFIG = {
|
||||
"num_inference_steps": 2,
|
||||
"retention_ratio": 0.0,
|
||||
"threshold": 100.0,
|
||||
"mag_ratios": [1.0, 1.0],
|
||||
}
|
||||
|
||||
def _get_cache_config(self):
|
||||
return MagCacheConfig(**self.MAG_CACHE_CONFIG)
|
||||
|
||||
def _get_hook_names(self):
|
||||
return [_MAG_CACHE_LEADER_BLOCK_HOOK, _MAG_CACHE_BLOCK_HOOK]
|
||||
|
||||
|
||||
@is_cache
|
||||
class MagCacheTesterMixin(MagCacheConfigMixin, CacheTesterMixin):
|
||||
"""
|
||||
Mixin class for testing MagCache on models.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
|
||||
Expected methods to be implemented by subclasses:
|
||||
- get_init_dict(): Returns dict of arguments to initialize the model
|
||||
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
|
||||
|
||||
Pytest mark: cache
|
||||
Use `pytest -m "not cache"` to skip these tests
|
||||
"""
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_enable_disable_state(self):
|
||||
self._test_cache_enable_disable_state()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_double_enable_raises_error(self):
|
||||
self._test_cache_double_enable_raises_error()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_hooks_registered(self):
|
||||
self._test_cache_hooks_registered()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_inference(self):
|
||||
self._test_cache_inference()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_context_manager(self):
|
||||
self._test_cache_context_manager()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
|
||||
@is_cache
|
||||
class TaylorSeerCacheConfigMixin:
|
||||
"""
|
||||
Base mixin providing TaylorSeerCache config.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
"""
|
||||
|
||||
# Default TaylorSeerCache config - can be overridden by subclasses.
|
||||
# Uses a low cache_interval and disable_cache_before_step=0 so the second
|
||||
# inference step is always predicted, which is required by _test_cache_inference.
|
||||
TAYLORSEER_CACHE_CONFIG = {
|
||||
"cache_interval": 3,
|
||||
"disable_cache_before_step": 1,
|
||||
"max_order": 1,
|
||||
}
|
||||
|
||||
def _get_cache_config(self):
|
||||
return TaylorSeerCacheConfig(**self.TAYLORSEER_CACHE_CONFIG)
|
||||
|
||||
def _get_hook_names(self):
|
||||
return [_TAYLORSEER_CACHE_HOOK]
|
||||
|
||||
|
||||
@is_cache
|
||||
class TaylorSeerCacheTesterMixin(TaylorSeerCacheConfigMixin, CacheTesterMixin):
|
||||
"""
|
||||
Mixin class for testing TaylorSeerCache on models.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
|
||||
Expected methods to be implemented by subclasses:
|
||||
- get_init_dict(): Returns dict of arguments to initialize the model
|
||||
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
|
||||
|
||||
Pytest mark: cache
|
||||
Use `pytest -m "not cache"` to skip these tests
|
||||
"""
|
||||
|
||||
@torch.no_grad()
|
||||
def _test_cache_inference(self):
|
||||
"""Test that model can run inference with TaylorSeer cache enabled (requires cache_context)."""
|
||||
init_dict = self.get_init_dict()
|
||||
inputs_dict = self.get_dummy_inputs()
|
||||
model = self.model_class(**init_dict).to(torch_device)
|
||||
model.eval()
|
||||
|
||||
config = self._get_cache_config()
|
||||
model.enable_cache(config)
|
||||
|
||||
# TaylorSeer requires cache_context to be set for inference
|
||||
with model.cache_context("taylorseer_test"):
|
||||
# First pass populates the cache
|
||||
_ = model(**inputs_dict, return_dict=False)[0]
|
||||
|
||||
# Create modified inputs for second pass
|
||||
inputs_dict_step2 = inputs_dict.copy()
|
||||
if self.cache_input_key in inputs_dict_step2:
|
||||
inputs_dict_step2[self.cache_input_key] = inputs_dict_step2[self.cache_input_key] + torch.randn_like(
|
||||
inputs_dict_step2[self.cache_input_key]
|
||||
)
|
||||
|
||||
# Second pass - TaylorSeer should use cached Taylor series predictions
|
||||
output_with_cache = model(**inputs_dict_step2, return_dict=False)[0]
|
||||
|
||||
assert output_with_cache is not None, "Model output should not be None with cache enabled."
|
||||
assert not torch.isnan(output_with_cache).any(), "Model output contains NaN with cache enabled."
|
||||
|
||||
# Run same inputs without cache to compare
|
||||
model.disable_cache()
|
||||
output_without_cache = model(**inputs_dict_step2, return_dict=False)[0]
|
||||
|
||||
# Cached output should be different from non-cached output (due to approximation)
|
||||
assert not torch.allclose(output_without_cache, output_with_cache, atol=1e-5), (
|
||||
"Cached output should be different from non-cached output due to cache approximation."
|
||||
)
|
||||
|
||||
@torch.no_grad()
|
||||
def _test_reset_stateful_cache(self):
|
||||
"""Test that _reset_stateful_cache resets the TaylorSeer cache state (requires cache_context)."""
|
||||
init_dict = self.get_init_dict()
|
||||
inputs_dict = self.get_dummy_inputs()
|
||||
model = self.model_class(**init_dict).to(torch_device)
|
||||
model.eval()
|
||||
|
||||
config = self._get_cache_config()
|
||||
model.enable_cache(config)
|
||||
|
||||
with model.cache_context("taylorseer_test"):
|
||||
_ = model(**inputs_dict, return_dict=False)[0]
|
||||
|
||||
model._reset_stateful_cache()
|
||||
|
||||
model.disable_cache()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_enable_disable_state(self):
|
||||
self._test_cache_enable_disable_state()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_double_enable_raises_error(self):
|
||||
self._test_cache_double_enable_raises_error()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_hooks_registered(self):
|
||||
self._test_cache_hooks_registered()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_inference(self):
|
||||
self._test_cache_inference()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_context_manager(self):
|
||||
self._test_cache_context_manager()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
@@ -37,6 +37,7 @@ from ..testing_utils import (
|
||||
IPAdapterTesterMixin,
|
||||
LoraHotSwappingForModelTesterMixin,
|
||||
LoraTesterMixin,
|
||||
MagCacheTesterMixin,
|
||||
MemoryTesterMixin,
|
||||
ModelOptCompileTesterMixin,
|
||||
ModelOptTesterMixin,
|
||||
@@ -45,6 +46,7 @@ from ..testing_utils import (
|
||||
QuantoCompileTesterMixin,
|
||||
QuantoTesterMixin,
|
||||
SingleFileTesterMixin,
|
||||
TaylorSeerCacheTesterMixin,
|
||||
TorchAoCompileTesterMixin,
|
||||
TorchAoTesterMixin,
|
||||
TorchCompileTesterMixin,
|
||||
@@ -430,3 +432,11 @@ class TestFluxTransformerFasterCache(FluxTransformerTesterConfig, FasterCacheTes
|
||||
"tensor_format": "BCHW",
|
||||
"is_guidance_distilled": True,
|
||||
}
|
||||
|
||||
|
||||
class TestFluxTransformerMagCache(FluxTransformerTesterConfig, MagCacheTesterMixin):
|
||||
"""MagCache tests for Flux Transformer."""
|
||||
|
||||
|
||||
class TestFluxTransformerTaylorSeerCache(FluxTransformerTesterConfig, TaylorSeerCacheTesterMixin):
|
||||
"""TaylorSeerCache tests for Flux Transformer."""
|
||||
|
||||
Reference in New Issue
Block a user