mirror of
https://github.com/huggingface/diffusers.git
synced 2026-02-13 14:25:27 +08:00
* add magcache * formatting * add magcache support with calibration mode * add imports * improvements * Apply style fixes * fix kandinsky errors * add tests and documentation * Apply style fixes * improvements * Apply style fixes * make fix-copies. * minor fixes --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Sayak Paul <spsayakpaul@gmail.com>
245 lines
8.8 KiB
Python
245 lines
8.8 KiB
Python
# Copyright 2025 HuggingFace Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
|
|
import numpy as np
|
|
import torch
|
|
|
|
from diffusers import MagCacheConfig, apply_mag_cache
|
|
from diffusers.hooks._helpers import TransformerBlockMetadata, TransformerBlockRegistry
|
|
from diffusers.models import ModelMixin
|
|
from diffusers.utils import logging
|
|
|
|
|
|
logger = logging.get_logger(__name__)
|
|
|
|
|
|
class DummyBlock(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
|
# Output is double input
|
|
# This ensures Residual = 2*Input - Input = Input
|
|
return hidden_states * 2.0
|
|
|
|
|
|
class DummyTransformer(ModelMixin):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.transformer_blocks = torch.nn.ModuleList([DummyBlock(), DummyBlock()])
|
|
|
|
def forward(self, hidden_states, encoder_hidden_states=None):
|
|
for block in self.transformer_blocks:
|
|
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
|
return hidden_states
|
|
|
|
|
|
class TupleOutputBlock(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
|
# Returns a tuple
|
|
return hidden_states * 2.0, encoder_hidden_states
|
|
|
|
|
|
class TupleTransformer(ModelMixin):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.transformer_blocks = torch.nn.ModuleList([TupleOutputBlock()])
|
|
|
|
def forward(self, hidden_states, encoder_hidden_states=None):
|
|
for block in self.transformer_blocks:
|
|
# Emulate Flux-like behavior
|
|
output = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
|
hidden_states = output[0]
|
|
encoder_hidden_states = output[1]
|
|
return hidden_states, encoder_hidden_states
|
|
|
|
|
|
class MagCacheTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# Register standard dummy block
|
|
TransformerBlockRegistry.register(
|
|
DummyBlock,
|
|
TransformerBlockMetadata(return_hidden_states_index=None, return_encoder_hidden_states_index=None),
|
|
)
|
|
# Register tuple block (Flux style)
|
|
TransformerBlockRegistry.register(
|
|
TupleOutputBlock,
|
|
TransformerBlockMetadata(return_hidden_states_index=0, return_encoder_hidden_states_index=1),
|
|
)
|
|
|
|
def _set_context(self, model, context_name):
|
|
"""Helper to set context on all hooks in the model."""
|
|
for module in model.modules():
|
|
if hasattr(module, "_diffusers_hook"):
|
|
module._diffusers_hook._set_context(context_name)
|
|
|
|
def _get_calibration_data(self, model):
|
|
for module in model.modules():
|
|
if hasattr(module, "_diffusers_hook"):
|
|
hook = module._diffusers_hook.get_hook("mag_cache_block_hook")
|
|
if hook:
|
|
return hook.state_manager.get_state().calibration_ratios
|
|
return []
|
|
|
|
def test_mag_cache_validation(self):
|
|
"""Test that missing mag_ratios raises ValueError."""
|
|
with self.assertRaises(ValueError):
|
|
MagCacheConfig(num_inference_steps=10, calibrate=False)
|
|
|
|
def test_mag_cache_skipping_logic(self):
|
|
"""
|
|
Tests that MagCache correctly calculates residuals and skips blocks when conditions are met.
|
|
"""
|
|
model = DummyTransformer()
|
|
|
|
# Dummy ratios: [1.0, 1.0] implies 0 accumulated error if we skip
|
|
ratios = np.array([1.0, 1.0])
|
|
|
|
config = MagCacheConfig(
|
|
threshold=100.0,
|
|
num_inference_steps=2,
|
|
retention_ratio=0.0, # Enable immediate skipping
|
|
max_skip_steps=5,
|
|
mag_ratios=ratios,
|
|
)
|
|
|
|
apply_mag_cache(model, config)
|
|
self._set_context(model, "test_context")
|
|
|
|
# Step 0: Input 10.0 -> Output 40.0 (2 blocks * 2x each)
|
|
# HeadInput=10. Output=40. Residual=30.
|
|
input_t0 = torch.tensor([[[10.0]]])
|
|
output_t0 = model(input_t0)
|
|
self.assertTrue(torch.allclose(output_t0, torch.tensor([[[40.0]]])), "Step 0 failed")
|
|
|
|
# Step 1: Input 11.0.
|
|
# If Skipped: Output = Input(11) + Residual(30) = 41.0
|
|
# If Computed: Output = 11 * 4 = 44.0
|
|
input_t1 = torch.tensor([[[11.0]]])
|
|
output_t1 = model(input_t1)
|
|
|
|
self.assertTrue(
|
|
torch.allclose(output_t1, torch.tensor([[[41.0]]])), f"Expected Skip (41.0), got {output_t1.item()}"
|
|
)
|
|
|
|
def test_mag_cache_retention(self):
|
|
"""Test that retention_ratio prevents skipping even if error is low."""
|
|
model = DummyTransformer()
|
|
# Ratios that imply 0 error, so it *would* skip if retention allowed it
|
|
ratios = np.array([1.0, 1.0])
|
|
|
|
config = MagCacheConfig(
|
|
threshold=100.0,
|
|
num_inference_steps=2,
|
|
retention_ratio=1.0, # Force retention for ALL steps
|
|
mag_ratios=ratios,
|
|
)
|
|
|
|
apply_mag_cache(model, config)
|
|
self._set_context(model, "test_context")
|
|
|
|
# Step 0
|
|
model(torch.tensor([[[10.0]]]))
|
|
|
|
# Step 1: Should COMPUTE (44.0) not SKIP (41.0) because of retention
|
|
input_t1 = torch.tensor([[[11.0]]])
|
|
output_t1 = model(input_t1)
|
|
|
|
self.assertTrue(
|
|
torch.allclose(output_t1, torch.tensor([[[44.0]]])),
|
|
f"Expected Compute (44.0) due to retention, got {output_t1.item()}",
|
|
)
|
|
|
|
def test_mag_cache_tuple_outputs(self):
|
|
"""Test compatibility with models returning (hidden, encoder_hidden) like Flux."""
|
|
model = TupleTransformer()
|
|
ratios = np.array([1.0, 1.0])
|
|
|
|
config = MagCacheConfig(threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=ratios)
|
|
|
|
apply_mag_cache(model, config)
|
|
self._set_context(model, "test_context")
|
|
|
|
# Step 0: Compute. Input 10.0 -> Output 20.0 (1 block * 2x)
|
|
# Residual = 10.0
|
|
input_t0 = torch.tensor([[[10.0]]])
|
|
enc_t0 = torch.tensor([[[1.0]]])
|
|
out_0, _ = model(input_t0, encoder_hidden_states=enc_t0)
|
|
self.assertTrue(torch.allclose(out_0, torch.tensor([[[20.0]]])))
|
|
|
|
# Step 1: Skip. Input 11.0.
|
|
# Skipped Output = 11 + 10 = 21.0
|
|
input_t1 = torch.tensor([[[11.0]]])
|
|
out_1, _ = model(input_t1, encoder_hidden_states=enc_t0)
|
|
|
|
self.assertTrue(
|
|
torch.allclose(out_1, torch.tensor([[[21.0]]])), f"Tuple skip failed. Expected 21.0, got {out_1.item()}"
|
|
)
|
|
|
|
def test_mag_cache_reset(self):
|
|
"""Test that state resets correctly after num_inference_steps."""
|
|
model = DummyTransformer()
|
|
config = MagCacheConfig(
|
|
threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=np.array([1.0, 1.0])
|
|
)
|
|
apply_mag_cache(model, config)
|
|
self._set_context(model, "test_context")
|
|
|
|
input_t = torch.ones(1, 1, 1)
|
|
|
|
model(input_t) # Step 0
|
|
model(input_t) # Step 1 (Skipped)
|
|
|
|
# Step 2 (Reset -> Step 0) -> Should Compute
|
|
# Input 2.0 -> Output 8.0
|
|
input_t2 = torch.tensor([[[2.0]]])
|
|
output_t2 = model(input_t2)
|
|
|
|
self.assertTrue(torch.allclose(output_t2, torch.tensor([[[8.0]]])), "State did not reset correctly")
|
|
|
|
def test_mag_cache_calibration(self):
|
|
"""Test that calibration mode records ratios."""
|
|
model = DummyTransformer()
|
|
config = MagCacheConfig(num_inference_steps=2, calibrate=True)
|
|
apply_mag_cache(model, config)
|
|
self._set_context(model, "test_context")
|
|
|
|
# Step 0
|
|
# HeadInput = 10. Output = 40. Residual = 30.
|
|
# Ratio 0 is placeholder 1.0
|
|
model(torch.tensor([[[10.0]]]))
|
|
|
|
# Check intermediate state
|
|
ratios = self._get_calibration_data(model)
|
|
self.assertEqual(len(ratios), 1)
|
|
self.assertEqual(ratios[0], 1.0)
|
|
|
|
# Step 1
|
|
# HeadInput = 10. Output = 40. Residual = 30.
|
|
# PrevResidual = 30. CurrResidual = 30.
|
|
# Ratio = 30/30 = 1.0
|
|
model(torch.tensor([[[10.0]]]))
|
|
|
|
# Verify it computes fully (no skip)
|
|
# If it skipped, output would be 41.0. It should be 40.0
|
|
# Actually in test setup, input is same (10.0) so output 40.0.
|
|
# Let's ensure list is empty after reset (end of step 1)
|
|
ratios_after = self._get_calibration_data(model)
|
|
self.assertEqual(ratios_after, [])
|