mirror of
https://github.com/huggingface/diffusers.git
synced 2026-03-11 11:12:04 +08:00
Compare commits
2 Commits
refactor-c
...
tests-cond
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58c304595d | ||
|
|
55c563281a |
244
tests/hooks/test_mag_cache.py
Normal file
244
tests/hooks/test_mag_cache.py
Normal file
@@ -0,0 +1,244 @@
|
||||
# Copyright 2025 HuggingFace Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from diffusers import MagCacheConfig, apply_mag_cache
|
||||
from diffusers.hooks._helpers import TransformerBlockMetadata, TransformerBlockRegistry
|
||||
from diffusers.models import ModelMixin
|
||||
from diffusers.utils import logging
|
||||
|
||||
|
||||
logger = logging.get_logger(__name__)
|
||||
|
||||
|
||||
class DummyBlock(torch.nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
||||
# Output is double input
|
||||
# This ensures Residual = 2*Input - Input = Input
|
||||
return hidden_states * 2.0
|
||||
|
||||
|
||||
class DummyTransformer(ModelMixin):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.transformer_blocks = torch.nn.ModuleList([DummyBlock(), DummyBlock()])
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None):
|
||||
for block in self.transformer_blocks:
|
||||
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
||||
return hidden_states
|
||||
|
||||
|
||||
class TupleOutputBlock(torch.nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
||||
# Returns a tuple
|
||||
return hidden_states * 2.0, encoder_hidden_states
|
||||
|
||||
|
||||
class TupleTransformer(ModelMixin):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.transformer_blocks = torch.nn.ModuleList([TupleOutputBlock()])
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None):
|
||||
for block in self.transformer_blocks:
|
||||
# Emulate Flux-like behavior
|
||||
output = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
||||
hidden_states = output[0]
|
||||
encoder_hidden_states = output[1]
|
||||
return hidden_states, encoder_hidden_states
|
||||
|
||||
|
||||
class MagCacheTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Register standard dummy block
|
||||
TransformerBlockRegistry.register(
|
||||
DummyBlock,
|
||||
TransformerBlockMetadata(return_hidden_states_index=None, return_encoder_hidden_states_index=None),
|
||||
)
|
||||
# Register tuple block (Flux style)
|
||||
TransformerBlockRegistry.register(
|
||||
TupleOutputBlock,
|
||||
TransformerBlockMetadata(return_hidden_states_index=0, return_encoder_hidden_states_index=1),
|
||||
)
|
||||
|
||||
def _set_context(self, model, context_name):
|
||||
"""Helper to set context on all hooks in the model."""
|
||||
for module in model.modules():
|
||||
if hasattr(module, "_diffusers_hook"):
|
||||
module._diffusers_hook._set_context(context_name)
|
||||
|
||||
def _get_calibration_data(self, model):
|
||||
for module in model.modules():
|
||||
if hasattr(module, "_diffusers_hook"):
|
||||
hook = module._diffusers_hook.get_hook("mag_cache_block_hook")
|
||||
if hook:
|
||||
return hook.state_manager.get_state().calibration_ratios
|
||||
return []
|
||||
|
||||
def test_mag_cache_validation(self):
|
||||
"""Test that missing mag_ratios raises ValueError."""
|
||||
with self.assertRaises(ValueError):
|
||||
MagCacheConfig(num_inference_steps=10, calibrate=False)
|
||||
|
||||
def test_mag_cache_skipping_logic(self):
|
||||
"""
|
||||
Tests that MagCache correctly calculates residuals and skips blocks when conditions are met.
|
||||
"""
|
||||
model = DummyTransformer()
|
||||
|
||||
# Dummy ratios: [1.0, 1.0] implies 0 accumulated error if we skip
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0,
|
||||
num_inference_steps=2,
|
||||
retention_ratio=0.0, # Enable immediate skipping
|
||||
max_skip_steps=5,
|
||||
mag_ratios=ratios,
|
||||
)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0: Input 10.0 -> Output 40.0 (2 blocks * 2x each)
|
||||
# HeadInput=10. Output=40. Residual=30.
|
||||
input_t0 = torch.tensor([[[10.0]]])
|
||||
output_t0 = model(input_t0)
|
||||
self.assertTrue(torch.allclose(output_t0, torch.tensor([[[40.0]]])), "Step 0 failed")
|
||||
|
||||
# Step 1: Input 11.0.
|
||||
# If Skipped: Output = Input(11) + Residual(30) = 41.0
|
||||
# If Computed: Output = 11 * 4 = 44.0
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
output_t1 = model(input_t1)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(output_t1, torch.tensor([[[41.0]]])), f"Expected Skip (41.0), got {output_t1.item()}"
|
||||
)
|
||||
|
||||
def test_mag_cache_retention(self):
|
||||
"""Test that retention_ratio prevents skipping even if error is low."""
|
||||
model = DummyTransformer()
|
||||
# Ratios that imply 0 error, so it *would* skip if retention allowed it
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0,
|
||||
num_inference_steps=2,
|
||||
retention_ratio=1.0, # Force retention for ALL steps
|
||||
mag_ratios=ratios,
|
||||
)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Step 1: Should COMPUTE (44.0) not SKIP (41.0) because of retention
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
output_t1 = model(input_t1)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(output_t1, torch.tensor([[[44.0]]])),
|
||||
f"Expected Compute (44.0) due to retention, got {output_t1.item()}",
|
||||
)
|
||||
|
||||
def test_mag_cache_tuple_outputs(self):
|
||||
"""Test compatibility with models returning (hidden, encoder_hidden) like Flux."""
|
||||
model = TupleTransformer()
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=ratios)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0: Compute. Input 10.0 -> Output 20.0 (1 block * 2x)
|
||||
# Residual = 10.0
|
||||
input_t0 = torch.tensor([[[10.0]]])
|
||||
enc_t0 = torch.tensor([[[1.0]]])
|
||||
out_0, _ = model(input_t0, encoder_hidden_states=enc_t0)
|
||||
self.assertTrue(torch.allclose(out_0, torch.tensor([[[20.0]]])))
|
||||
|
||||
# Step 1: Skip. Input 11.0.
|
||||
# Skipped Output = 11 + 10 = 21.0
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
out_1, _ = model(input_t1, encoder_hidden_states=enc_t0)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(out_1, torch.tensor([[[21.0]]])), f"Tuple skip failed. Expected 21.0, got {out_1.item()}"
|
||||
)
|
||||
|
||||
def test_mag_cache_reset(self):
|
||||
"""Test that state resets correctly after num_inference_steps."""
|
||||
model = DummyTransformer()
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=np.array([1.0, 1.0])
|
||||
)
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
input_t = torch.ones(1, 1, 1)
|
||||
|
||||
model(input_t) # Step 0
|
||||
model(input_t) # Step 1 (Skipped)
|
||||
|
||||
# Step 2 (Reset -> Step 0) -> Should Compute
|
||||
# Input 2.0 -> Output 8.0
|
||||
input_t2 = torch.tensor([[[2.0]]])
|
||||
output_t2 = model(input_t2)
|
||||
|
||||
self.assertTrue(torch.allclose(output_t2, torch.tensor([[[8.0]]])), "State did not reset correctly")
|
||||
|
||||
def test_mag_cache_calibration(self):
|
||||
"""Test that calibration mode records ratios."""
|
||||
model = DummyTransformer()
|
||||
config = MagCacheConfig(num_inference_steps=2, calibrate=True)
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0
|
||||
# HeadInput = 10. Output = 40. Residual = 30.
|
||||
# Ratio 0 is placeholder 1.0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Check intermediate state
|
||||
ratios = self._get_calibration_data(model)
|
||||
self.assertEqual(len(ratios), 1)
|
||||
self.assertEqual(ratios[0], 1.0)
|
||||
|
||||
# Step 1
|
||||
# HeadInput = 10. Output = 40. Residual = 30.
|
||||
# PrevResidual = 30. CurrResidual = 30.
|
||||
# Ratio = 30/30 = 1.0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Verify it computes fully (no skip)
|
||||
# If it skipped, output would be 41.0. It should be 40.0
|
||||
# Actually in test setup, input is same (10.0) so output 40.0.
|
||||
# Let's ensure list is empty after reset (end of step 1)
|
||||
ratios_after = self._get_calibration_data(model)
|
||||
self.assertEqual(ratios_after, [])
|
||||
@@ -5,12 +5,8 @@ from .cache import (
|
||||
FasterCacheTesterMixin,
|
||||
FirstBlockCacheConfigMixin,
|
||||
FirstBlockCacheTesterMixin,
|
||||
MagCacheConfigMixin,
|
||||
MagCacheTesterMixin,
|
||||
PyramidAttentionBroadcastConfigMixin,
|
||||
PyramidAttentionBroadcastTesterMixin,
|
||||
TaylorSeerCacheConfigMixin,
|
||||
TaylorSeerCacheTesterMixin,
|
||||
)
|
||||
from .common import BaseModelTesterConfig, ModelTesterMixin
|
||||
from .compile import TorchCompileTesterMixin
|
||||
@@ -54,8 +50,6 @@ __all__ = [
|
||||
"FasterCacheTesterMixin",
|
||||
"FirstBlockCacheConfigMixin",
|
||||
"FirstBlockCacheTesterMixin",
|
||||
"MagCacheConfigMixin",
|
||||
"MagCacheTesterMixin",
|
||||
"GGUFCompileTesterMixin",
|
||||
"GGUFConfigMixin",
|
||||
"GGUFTesterMixin",
|
||||
@@ -71,8 +65,6 @@ __all__ = [
|
||||
"ModelTesterMixin",
|
||||
"PyramidAttentionBroadcastConfigMixin",
|
||||
"PyramidAttentionBroadcastTesterMixin",
|
||||
"TaylorSeerCacheConfigMixin",
|
||||
"TaylorSeerCacheTesterMixin",
|
||||
"QuantizationCompileTesterMixin",
|
||||
"QuantizationTesterMixin",
|
||||
"QuantoCompileTesterMixin",
|
||||
|
||||
@@ -18,18 +18,10 @@ import gc
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from diffusers.hooks import (
|
||||
FasterCacheConfig,
|
||||
FirstBlockCacheConfig,
|
||||
MagCacheConfig,
|
||||
PyramidAttentionBroadcastConfig,
|
||||
TaylorSeerCacheConfig,
|
||||
)
|
||||
from diffusers.hooks import FasterCacheConfig, FirstBlockCacheConfig, PyramidAttentionBroadcastConfig
|
||||
from diffusers.hooks.faster_cache import _FASTER_CACHE_BLOCK_HOOK, _FASTER_CACHE_DENOISER_HOOK
|
||||
from diffusers.hooks.first_block_cache import _FBC_BLOCK_HOOK, _FBC_LEADER_BLOCK_HOOK
|
||||
from diffusers.hooks.mag_cache import _MAG_CACHE_BLOCK_HOOK, _MAG_CACHE_LEADER_BLOCK_HOOK
|
||||
from diffusers.hooks.pyramid_attention_broadcast import _PYRAMID_ATTENTION_BROADCAST_HOOK
|
||||
from diffusers.hooks.taylorseer_cache import _TAYLORSEER_CACHE_HOOK
|
||||
from diffusers.models.cache_utils import CacheMixin
|
||||
|
||||
from ...testing_utils import assert_tensors_close, backend_empty_cache, is_cache, torch_device
|
||||
@@ -562,192 +554,3 @@ class FasterCacheTesterMixin(FasterCacheConfigMixin, CacheTesterMixin):
|
||||
@require_cache_mixin
|
||||
def test_faster_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
|
||||
@is_cache
|
||||
class MagCacheConfigMixin:
|
||||
"""
|
||||
Base mixin providing MagCache config.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
"""
|
||||
|
||||
# Default MagCache config - can be overridden by subclasses.
|
||||
# Uses neutral ratios [1.0, 1.0] and a high threshold so the second
|
||||
# inference step is always skipped, which is required by _test_cache_inference.
|
||||
MAG_CACHE_CONFIG = {
|
||||
"num_inference_steps": 2,
|
||||
"retention_ratio": 0.0,
|
||||
"threshold": 100.0,
|
||||
"mag_ratios": [1.0, 1.0],
|
||||
}
|
||||
|
||||
def _get_cache_config(self):
|
||||
return MagCacheConfig(**self.MAG_CACHE_CONFIG)
|
||||
|
||||
def _get_hook_names(self):
|
||||
return [_MAG_CACHE_LEADER_BLOCK_HOOK, _MAG_CACHE_BLOCK_HOOK]
|
||||
|
||||
|
||||
@is_cache
|
||||
class MagCacheTesterMixin(MagCacheConfigMixin, CacheTesterMixin):
|
||||
"""
|
||||
Mixin class for testing MagCache on models.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
|
||||
Expected methods to be implemented by subclasses:
|
||||
- get_init_dict(): Returns dict of arguments to initialize the model
|
||||
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
|
||||
|
||||
Pytest mark: cache
|
||||
Use `pytest -m "not cache"` to skip these tests
|
||||
"""
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_enable_disable_state(self):
|
||||
self._test_cache_enable_disable_state()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_double_enable_raises_error(self):
|
||||
self._test_cache_double_enable_raises_error()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_hooks_registered(self):
|
||||
self._test_cache_hooks_registered()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_inference(self):
|
||||
self._test_cache_inference()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_context_manager(self):
|
||||
self._test_cache_context_manager()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
|
||||
@is_cache
|
||||
class TaylorSeerCacheConfigMixin:
|
||||
"""
|
||||
Base mixin providing TaylorSeerCache config.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
"""
|
||||
|
||||
# Default TaylorSeerCache config - can be overridden by subclasses.
|
||||
# Uses a low cache_interval and disable_cache_before_step=0 so the second
|
||||
# inference step is always predicted, which is required by _test_cache_inference.
|
||||
TAYLORSEER_CACHE_CONFIG = {
|
||||
"cache_interval": 3,
|
||||
"disable_cache_before_step": 1,
|
||||
"max_order": 1,
|
||||
}
|
||||
|
||||
def _get_cache_config(self):
|
||||
return TaylorSeerCacheConfig(**self.TAYLORSEER_CACHE_CONFIG)
|
||||
|
||||
def _get_hook_names(self):
|
||||
return [_TAYLORSEER_CACHE_HOOK]
|
||||
|
||||
|
||||
@is_cache
|
||||
class TaylorSeerCacheTesterMixin(TaylorSeerCacheConfigMixin, CacheTesterMixin):
|
||||
"""
|
||||
Mixin class for testing TaylorSeerCache on models.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
|
||||
Expected methods to be implemented by subclasses:
|
||||
- get_init_dict(): Returns dict of arguments to initialize the model
|
||||
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
|
||||
|
||||
Pytest mark: cache
|
||||
Use `pytest -m "not cache"` to skip these tests
|
||||
"""
|
||||
|
||||
@torch.no_grad()
|
||||
def _test_cache_inference(self):
|
||||
"""Test that model can run inference with TaylorSeer cache enabled (requires cache_context)."""
|
||||
init_dict = self.get_init_dict()
|
||||
inputs_dict = self.get_dummy_inputs()
|
||||
model = self.model_class(**init_dict).to(torch_device)
|
||||
model.eval()
|
||||
|
||||
config = self._get_cache_config()
|
||||
model.enable_cache(config)
|
||||
|
||||
# TaylorSeer requires cache_context to be set for inference
|
||||
with model.cache_context("taylorseer_test"):
|
||||
# First pass populates the cache
|
||||
_ = model(**inputs_dict, return_dict=False)[0]
|
||||
|
||||
# Create modified inputs for second pass
|
||||
inputs_dict_step2 = inputs_dict.copy()
|
||||
if self.cache_input_key in inputs_dict_step2:
|
||||
inputs_dict_step2[self.cache_input_key] = inputs_dict_step2[self.cache_input_key] + torch.randn_like(
|
||||
inputs_dict_step2[self.cache_input_key]
|
||||
)
|
||||
|
||||
# Second pass - TaylorSeer should use cached Taylor series predictions
|
||||
output_with_cache = model(**inputs_dict_step2, return_dict=False)[0]
|
||||
|
||||
assert output_with_cache is not None, "Model output should not be None with cache enabled."
|
||||
assert not torch.isnan(output_with_cache).any(), "Model output contains NaN with cache enabled."
|
||||
|
||||
# Run same inputs without cache to compare
|
||||
model.disable_cache()
|
||||
output_without_cache = model(**inputs_dict_step2, return_dict=False)[0]
|
||||
|
||||
# Cached output should be different from non-cached output (due to approximation)
|
||||
assert not torch.allclose(output_without_cache, output_with_cache, atol=1e-5), (
|
||||
"Cached output should be different from non-cached output due to cache approximation."
|
||||
)
|
||||
|
||||
@torch.no_grad()
|
||||
def _test_reset_stateful_cache(self):
|
||||
"""Test that _reset_stateful_cache resets the TaylorSeer cache state (requires cache_context)."""
|
||||
init_dict = self.get_init_dict()
|
||||
inputs_dict = self.get_dummy_inputs()
|
||||
model = self.model_class(**init_dict).to(torch_device)
|
||||
model.eval()
|
||||
|
||||
config = self._get_cache_config()
|
||||
model.enable_cache(config)
|
||||
|
||||
with model.cache_context("taylorseer_test"):
|
||||
_ = model(**inputs_dict, return_dict=False)[0]
|
||||
|
||||
model._reset_stateful_cache()
|
||||
|
||||
model.disable_cache()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_enable_disable_state(self):
|
||||
self._test_cache_enable_disable_state()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_double_enable_raises_error(self):
|
||||
self._test_cache_double_enable_raises_error()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_hooks_registered(self):
|
||||
self._test_cache_hooks_registered()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_inference(self):
|
||||
self._test_cache_inference()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_context_manager(self):
|
||||
self._test_cache_context_manager()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_taylorseer_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
@@ -37,7 +37,6 @@ from ..testing_utils import (
|
||||
IPAdapterTesterMixin,
|
||||
LoraHotSwappingForModelTesterMixin,
|
||||
LoraTesterMixin,
|
||||
MagCacheTesterMixin,
|
||||
MemoryTesterMixin,
|
||||
ModelOptCompileTesterMixin,
|
||||
ModelOptTesterMixin,
|
||||
@@ -46,7 +45,6 @@ from ..testing_utils import (
|
||||
QuantoCompileTesterMixin,
|
||||
QuantoTesterMixin,
|
||||
SingleFileTesterMixin,
|
||||
TaylorSeerCacheTesterMixin,
|
||||
TorchAoCompileTesterMixin,
|
||||
TorchAoTesterMixin,
|
||||
TorchCompileTesterMixin,
|
||||
@@ -432,11 +430,3 @@ class TestFluxTransformerFasterCache(FluxTransformerTesterConfig, FasterCacheTes
|
||||
"tensor_format": "BCHW",
|
||||
"is_guidance_distilled": True,
|
||||
}
|
||||
|
||||
|
||||
class TestFluxTransformerMagCache(FluxTransformerTesterConfig, MagCacheTesterMixin):
|
||||
"""MagCache tests for Flux Transformer."""
|
||||
|
||||
|
||||
class TestFluxTransformerTaylorSeerCache(FluxTransformerTesterConfig, TaylorSeerCacheTesterMixin):
|
||||
"""TaylorSeerCache tests for Flux Transformer."""
|
||||
|
||||
242
tests/modular_pipelines/test_conditional_pipeline_blocks.py
Normal file
242
tests/modular_pipelines/test_conditional_pipeline_blocks.py
Normal file
@@ -0,0 +1,242 @@
|
||||
# Copyright 2025 The HuggingFace Team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from diffusers.modular_pipelines import (
|
||||
AutoPipelineBlocks,
|
||||
ConditionalPipelineBlocks,
|
||||
InputParam,
|
||||
ModularPipelineBlocks,
|
||||
)
|
||||
|
||||
|
||||
class TextToImageBlock(ModularPipelineBlocks):
|
||||
model_name = "text2img"
|
||||
|
||||
@property
|
||||
def inputs(self):
|
||||
return [InputParam(name="prompt")]
|
||||
|
||||
@property
|
||||
def intermediate_outputs(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "text-to-image workflow"
|
||||
|
||||
def __call__(self, components, state):
|
||||
block_state = self.get_block_state(state)
|
||||
block_state.workflow = "text2img"
|
||||
self.set_block_state(state, block_state)
|
||||
return components, state
|
||||
|
||||
|
||||
class ImageToImageBlock(ModularPipelineBlocks):
|
||||
model_name = "img2img"
|
||||
|
||||
@property
|
||||
def inputs(self):
|
||||
return [InputParam(name="prompt"), InputParam(name="image")]
|
||||
|
||||
@property
|
||||
def intermediate_outputs(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "image-to-image workflow"
|
||||
|
||||
def __call__(self, components, state):
|
||||
block_state = self.get_block_state(state)
|
||||
block_state.workflow = "img2img"
|
||||
self.set_block_state(state, block_state)
|
||||
return components, state
|
||||
|
||||
|
||||
class InpaintBlock(ModularPipelineBlocks):
|
||||
model_name = "inpaint"
|
||||
|
||||
@property
|
||||
def inputs(self):
|
||||
return [InputParam(name="prompt"), InputParam(name="image"), InputParam(name="mask")]
|
||||
|
||||
@property
|
||||
def intermediate_outputs(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "inpaint workflow"
|
||||
|
||||
def __call__(self, components, state):
|
||||
block_state = self.get_block_state(state)
|
||||
block_state.workflow = "inpaint"
|
||||
self.set_block_state(state, block_state)
|
||||
return components, state
|
||||
|
||||
|
||||
class ConditionalImageBlocks(ConditionalPipelineBlocks):
|
||||
block_classes = [InpaintBlock, ImageToImageBlock, TextToImageBlock]
|
||||
block_names = ["inpaint", "img2img", "text2img"]
|
||||
block_trigger_inputs = ["mask", "image"]
|
||||
default_block_name = "text2img"
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "Conditional image blocks for testing"
|
||||
|
||||
def select_block(self, mask=None, image=None) -> str | None:
|
||||
if mask is not None:
|
||||
return "inpaint"
|
||||
if image is not None:
|
||||
return "img2img"
|
||||
return None # falls back to default_block_name
|
||||
|
||||
|
||||
class OptionalConditionalBlocks(ConditionalPipelineBlocks):
|
||||
block_classes = [InpaintBlock, ImageToImageBlock]
|
||||
block_names = ["inpaint", "img2img"]
|
||||
block_trigger_inputs = ["mask", "image"]
|
||||
default_block_name = None # no default; block can be skipped
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "Optional conditional blocks (skippable)"
|
||||
|
||||
def select_block(self, mask=None, image=None) -> str | None:
|
||||
if mask is not None:
|
||||
return "inpaint"
|
||||
if image is not None:
|
||||
return "img2img"
|
||||
return None
|
||||
|
||||
|
||||
class AutoImageBlocks(AutoPipelineBlocks):
|
||||
block_classes = [InpaintBlock, ImageToImageBlock, TextToImageBlock]
|
||||
block_names = ["inpaint", "img2img", "text2img"]
|
||||
block_trigger_inputs = ["mask", "image", None]
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "Auto image blocks for testing"
|
||||
|
||||
|
||||
class TestConditionalPipelineBlocksSelectBlock:
|
||||
def test_select_block_with_mask(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
assert blocks.select_block(mask="something") == "inpaint"
|
||||
|
||||
def test_select_block_with_image(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
assert blocks.select_block(image="something") == "img2img"
|
||||
|
||||
def test_select_block_with_mask_and_image(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
assert blocks.select_block(mask="m", image="i") == "inpaint"
|
||||
|
||||
def test_select_block_no_triggers_returns_none(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
assert blocks.select_block() is None
|
||||
|
||||
def test_select_block_explicit_none_values(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
assert blocks.select_block(mask=None, image=None) is None
|
||||
|
||||
|
||||
class TestConditionalPipelineBlocksWorkflowSelection:
|
||||
def test_default_workflow_when_no_triggers(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
execution = blocks.get_execution_blocks()
|
||||
assert execution is not None
|
||||
assert isinstance(execution, TextToImageBlock)
|
||||
|
||||
def test_mask_trigger_selects_inpaint(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
execution = blocks.get_execution_blocks(mask=True)
|
||||
assert isinstance(execution, InpaintBlock)
|
||||
|
||||
def test_image_trigger_selects_img2img(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
execution = blocks.get_execution_blocks(image=True)
|
||||
assert isinstance(execution, ImageToImageBlock)
|
||||
|
||||
def test_mask_and_image_selects_inpaint(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
execution = blocks.get_execution_blocks(mask=True, image=True)
|
||||
assert isinstance(execution, InpaintBlock)
|
||||
|
||||
def test_skippable_block_returns_none(self):
|
||||
blocks = OptionalConditionalBlocks()
|
||||
execution = blocks.get_execution_blocks()
|
||||
assert execution is None
|
||||
|
||||
def test_skippable_block_still_selects_when_triggered(self):
|
||||
blocks = OptionalConditionalBlocks()
|
||||
execution = blocks.get_execution_blocks(image=True)
|
||||
assert isinstance(execution, ImageToImageBlock)
|
||||
|
||||
|
||||
class TestAutoPipelineBlocksSelectBlock:
|
||||
def test_auto_select_mask(self):
|
||||
blocks = AutoImageBlocks()
|
||||
assert blocks.select_block(mask="m") == "inpaint"
|
||||
|
||||
def test_auto_select_image(self):
|
||||
blocks = AutoImageBlocks()
|
||||
assert blocks.select_block(image="i") == "img2img"
|
||||
|
||||
def test_auto_select_default(self):
|
||||
blocks = AutoImageBlocks()
|
||||
# No trigger -> returns None -> falls back to default (text2img)
|
||||
assert blocks.select_block() is None
|
||||
|
||||
def test_auto_select_priority_order(self):
|
||||
blocks = AutoImageBlocks()
|
||||
assert blocks.select_block(mask="m", image="i") == "inpaint"
|
||||
|
||||
|
||||
class TestAutoPipelineBlocksWorkflowSelection:
|
||||
def test_auto_default_workflow(self):
|
||||
blocks = AutoImageBlocks()
|
||||
execution = blocks.get_execution_blocks()
|
||||
assert isinstance(execution, TextToImageBlock)
|
||||
|
||||
def test_auto_mask_workflow(self):
|
||||
blocks = AutoImageBlocks()
|
||||
execution = blocks.get_execution_blocks(mask=True)
|
||||
assert isinstance(execution, InpaintBlock)
|
||||
|
||||
def test_auto_image_workflow(self):
|
||||
blocks = AutoImageBlocks()
|
||||
execution = blocks.get_execution_blocks(image=True)
|
||||
assert isinstance(execution, ImageToImageBlock)
|
||||
|
||||
|
||||
class TestConditionalPipelineBlocksStructure:
|
||||
def test_block_names_accessible(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
sub = dict(blocks.sub_blocks)
|
||||
assert set(sub.keys()) == {"inpaint", "img2img", "text2img"}
|
||||
|
||||
def test_sub_block_types(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
sub = dict(blocks.sub_blocks)
|
||||
assert isinstance(sub["inpaint"], InpaintBlock)
|
||||
assert isinstance(sub["img2img"], ImageToImageBlock)
|
||||
assert isinstance(sub["text2img"], TextToImageBlock)
|
||||
|
||||
def test_description(self):
|
||||
blocks = ConditionalImageBlocks()
|
||||
assert "Conditional" in blocks.description
|
||||
@@ -9,11 +9,6 @@ import torch
|
||||
import diffusers
|
||||
from diffusers import AutoModel, ComponentsManager, ModularPipeline, ModularPipelineBlocks
|
||||
from diffusers.guiders import ClassifierFreeGuidance
|
||||
from diffusers.modular_pipelines import (
|
||||
ConditionalPipelineBlocks,
|
||||
LoopSequentialPipelineBlocks,
|
||||
SequentialPipelineBlocks,
|
||||
)
|
||||
from diffusers.modular_pipelines.modular_pipeline_utils import (
|
||||
ComponentSpec,
|
||||
ConfigSpec,
|
||||
@@ -24,7 +19,6 @@ from diffusers.modular_pipelines.modular_pipeline_utils import (
|
||||
from diffusers.utils import logging
|
||||
|
||||
from ..testing_utils import (
|
||||
CaptureLogger,
|
||||
backend_empty_cache,
|
||||
numpy_cosine_similarity_distance,
|
||||
require_accelerator,
|
||||
@@ -437,117 +431,6 @@ class ModularGuiderTesterMixin:
|
||||
assert max_diff > expected_max_diff, "Output with CFG must be different from normal inference"
|
||||
|
||||
|
||||
class TestCustomBlockRequirements:
|
||||
def get_dummy_block_pipe(self):
|
||||
class DummyBlockOne:
|
||||
# keep two arbitrary deps so that we can test warnings.
|
||||
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
|
||||
|
||||
class DummyBlockTwo:
|
||||
# keep two dependencies that will be available during testing.
|
||||
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
|
||||
|
||||
pipe = SequentialPipelineBlocks.from_blocks_dict(
|
||||
{"dummy_block_one": DummyBlockOne, "dummy_block_two": DummyBlockTwo}
|
||||
)
|
||||
return pipe
|
||||
|
||||
def get_dummy_conditional_block_pipe(self):
|
||||
class DummyBlockOne:
|
||||
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
|
||||
|
||||
class DummyBlockTwo:
|
||||
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
|
||||
|
||||
class DummyConditionalBlocks(ConditionalPipelineBlocks):
|
||||
block_classes = [DummyBlockOne, DummyBlockTwo]
|
||||
block_names = ["block_one", "block_two"]
|
||||
block_trigger_inputs = []
|
||||
|
||||
def select_block(self, **kwargs):
|
||||
return "block_one"
|
||||
|
||||
return DummyConditionalBlocks()
|
||||
|
||||
def get_dummy_loop_block_pipe(self):
|
||||
class DummyBlockOne:
|
||||
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
|
||||
|
||||
class DummyBlockTwo:
|
||||
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
|
||||
|
||||
return LoopSequentialPipelineBlocks.from_blocks_dict({"block_one": DummyBlockOne, "block_two": DummyBlockTwo})
|
||||
|
||||
def test_sequential_block_requirements_save_load(self, tmp_path):
|
||||
pipe = self.get_dummy_block_pipe()
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
config_path = tmp_path / "modular_config.json"
|
||||
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
assert "requirements" in config
|
||||
requirements = config["requirements"]
|
||||
|
||||
expected_requirements = {
|
||||
"xyz": ">=0.8.0",
|
||||
"abc": ">=10.0.0",
|
||||
"transformers": ">=4.44.0",
|
||||
"diffusers": ">=0.2.0",
|
||||
}
|
||||
assert expected_requirements == requirements
|
||||
|
||||
def test_sequential_block_requirements_warnings(self, tmp_path):
|
||||
pipe = self.get_dummy_block_pipe()
|
||||
|
||||
logger = logging.get_logger("diffusers.modular_pipelines.modular_pipeline_utils")
|
||||
logger.setLevel(30)
|
||||
|
||||
with CaptureLogger(logger) as cap_logger:
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
template = "{req} was specified in the requirements but wasn't found in the current environment"
|
||||
msg_xyz = template.format(req="xyz")
|
||||
msg_abc = template.format(req="abc")
|
||||
assert msg_xyz in str(cap_logger.out)
|
||||
assert msg_abc in str(cap_logger.out)
|
||||
|
||||
def test_conditional_block_requirements_save_load(self, tmp_path):
|
||||
pipe = self.get_dummy_conditional_block_pipe()
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
config_path = tmp_path / "modular_config.json"
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
assert "requirements" in config
|
||||
expected_requirements = {
|
||||
"xyz": ">=0.8.0",
|
||||
"abc": ">=10.0.0",
|
||||
"transformers": ">=4.44.0",
|
||||
"diffusers": ">=0.2.0",
|
||||
}
|
||||
assert expected_requirements == config["requirements"]
|
||||
|
||||
def test_loop_block_requirements_save_load(self, tmp_path):
|
||||
pipe = self.get_dummy_loop_block_pipe()
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
config_path = tmp_path / "modular_config.json"
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
assert "requirements" in config
|
||||
expected_requirements = {
|
||||
"xyz": ">=0.8.0",
|
||||
"abc": ">=10.0.0",
|
||||
"transformers": ">=4.44.0",
|
||||
"diffusers": ">=0.2.0",
|
||||
}
|
||||
assert expected_requirements == config["requirements"]
|
||||
|
||||
|
||||
class TestModularModelCardContent:
|
||||
def create_mock_block(self, name="TestBlock", description="Test block description"):
|
||||
class MockBlock:
|
||||
|
||||
@@ -24,14 +24,18 @@ import torch
|
||||
from diffusers import FluxTransformer2DModel
|
||||
from diffusers.modular_pipelines import (
|
||||
ComponentSpec,
|
||||
ConditionalPipelineBlocks,
|
||||
InputParam,
|
||||
LoopSequentialPipelineBlocks,
|
||||
ModularPipelineBlocks,
|
||||
OutputParam,
|
||||
PipelineState,
|
||||
SequentialPipelineBlocks,
|
||||
WanModularPipeline,
|
||||
)
|
||||
from diffusers.utils import logging
|
||||
|
||||
from ..testing_utils import nightly, require_torch, slow
|
||||
from ..testing_utils import CaptureLogger, nightly, require_torch, slow
|
||||
|
||||
|
||||
class DummyCustomBlockSimple(ModularPipelineBlocks):
|
||||
@@ -354,6 +358,117 @@ class TestModularCustomBlocks:
|
||||
assert output_prompt.startswith("Modular diffusers + ")
|
||||
|
||||
|
||||
class TestCustomBlockRequirements:
|
||||
def get_dummy_block_pipe(self):
|
||||
class DummyBlockOne:
|
||||
# keep two arbitrary deps so that we can test warnings.
|
||||
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
|
||||
|
||||
class DummyBlockTwo:
|
||||
# keep two dependencies that will be available during testing.
|
||||
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
|
||||
|
||||
pipe = SequentialPipelineBlocks.from_blocks_dict(
|
||||
{"dummy_block_one": DummyBlockOne, "dummy_block_two": DummyBlockTwo}
|
||||
)
|
||||
return pipe
|
||||
|
||||
def get_dummy_conditional_block_pipe(self):
|
||||
class DummyBlockOne:
|
||||
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
|
||||
|
||||
class DummyBlockTwo:
|
||||
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
|
||||
|
||||
class DummyConditionalBlocks(ConditionalPipelineBlocks):
|
||||
block_classes = [DummyBlockOne, DummyBlockTwo]
|
||||
block_names = ["block_one", "block_two"]
|
||||
block_trigger_inputs = []
|
||||
|
||||
def select_block(self, **kwargs):
|
||||
return "block_one"
|
||||
|
||||
return DummyConditionalBlocks()
|
||||
|
||||
def get_dummy_loop_block_pipe(self):
|
||||
class DummyBlockOne:
|
||||
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
|
||||
|
||||
class DummyBlockTwo:
|
||||
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
|
||||
|
||||
return LoopSequentialPipelineBlocks.from_blocks_dict({"block_one": DummyBlockOne, "block_two": DummyBlockTwo})
|
||||
|
||||
def test_sequential_block_requirements_save_load(self, tmp_path):
|
||||
pipe = self.get_dummy_block_pipe()
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
config_path = tmp_path / "modular_config.json"
|
||||
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
assert "requirements" in config
|
||||
requirements = config["requirements"]
|
||||
|
||||
expected_requirements = {
|
||||
"xyz": ">=0.8.0",
|
||||
"abc": ">=10.0.0",
|
||||
"transformers": ">=4.44.0",
|
||||
"diffusers": ">=0.2.0",
|
||||
}
|
||||
assert expected_requirements == requirements
|
||||
|
||||
def test_sequential_block_requirements_warnings(self, tmp_path):
|
||||
pipe = self.get_dummy_block_pipe()
|
||||
|
||||
logger = logging.get_logger("diffusers.modular_pipelines.modular_pipeline_utils")
|
||||
logger.setLevel(30)
|
||||
|
||||
with CaptureLogger(logger) as cap_logger:
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
template = "{req} was specified in the requirements but wasn't found in the current environment"
|
||||
msg_xyz = template.format(req="xyz")
|
||||
msg_abc = template.format(req="abc")
|
||||
assert msg_xyz in str(cap_logger.out)
|
||||
assert msg_abc in str(cap_logger.out)
|
||||
|
||||
def test_conditional_block_requirements_save_load(self, tmp_path):
|
||||
pipe = self.get_dummy_conditional_block_pipe()
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
config_path = tmp_path / "modular_config.json"
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
assert "requirements" in config
|
||||
expected_requirements = {
|
||||
"xyz": ">=0.8.0",
|
||||
"abc": ">=10.0.0",
|
||||
"transformers": ">=4.44.0",
|
||||
"diffusers": ">=0.2.0",
|
||||
}
|
||||
assert expected_requirements == config["requirements"]
|
||||
|
||||
def test_loop_block_requirements_save_load(self, tmp_path):
|
||||
pipe = self.get_dummy_loop_block_pipe()
|
||||
pipe.save_pretrained(str(tmp_path))
|
||||
|
||||
config_path = tmp_path / "modular_config.json"
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
assert "requirements" in config
|
||||
expected_requirements = {
|
||||
"xyz": ">=0.8.0",
|
||||
"abc": ">=10.0.0",
|
||||
"transformers": ">=4.44.0",
|
||||
"diffusers": ">=0.2.0",
|
||||
}
|
||||
assert expected_requirements == config["requirements"]
|
||||
|
||||
|
||||
@slow
|
||||
@nightly
|
||||
@require_torch
|
||||
|
||||
Reference in New Issue
Block a user