mirror of
https://github.com/huggingface/diffusers.git
synced 2026-03-10 10:41:53 +08:00
Compare commits
1 Commits
refactor-c
...
group-offl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7e73dc237 |
@@ -14,16 +14,15 @@
|
||||
|
||||
import contextlib
|
||||
import gc
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
from parameterized import parameterized
|
||||
|
||||
from diffusers import AutoencoderKL
|
||||
from diffusers.hooks import HookRegistry, ModelHook
|
||||
from diffusers.models import ModelMixin
|
||||
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
|
||||
from diffusers.utils import get_logger
|
||||
from diffusers.utils.import_utils import compare_versions
|
||||
|
||||
from ..testing_utils import (
|
||||
@@ -219,20 +218,18 @@ class NestedContainer(torch.nn.Module):
|
||||
|
||||
|
||||
@require_torch_accelerator
|
||||
class GroupOffloadTests(unittest.TestCase):
|
||||
class TestGroupOffload:
|
||||
in_features = 64
|
||||
hidden_features = 256
|
||||
out_features = 64
|
||||
num_layers = 4
|
||||
|
||||
def setUp(self):
|
||||
def setup_method(self):
|
||||
with torch.no_grad():
|
||||
self.model = self.get_model()
|
||||
self.input = torch.randn((4, self.in_features)).to(torch_device)
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
|
||||
def teardown_method(self):
|
||||
del self.model
|
||||
del self.input
|
||||
gc.collect()
|
||||
@@ -248,18 +245,20 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
num_layers=self.num_layers,
|
||||
)
|
||||
|
||||
@pytest.mark.skipif(
|
||||
torch.device(torch_device).type not in ["cuda", "xpu"],
|
||||
reason="Test requires a CUDA or XPU device.",
|
||||
)
|
||||
def test_offloading_forward_pass(self):
|
||||
@torch.no_grad()
|
||||
def run_forward(model):
|
||||
gc.collect()
|
||||
backend_empty_cache(torch_device)
|
||||
backend_reset_peak_memory_stats(torch_device)
|
||||
self.assertTrue(
|
||||
all(
|
||||
module._diffusers_hook.get_hook("group_offloading") is not None
|
||||
for module in model.modules()
|
||||
if hasattr(module, "_diffusers_hook")
|
||||
)
|
||||
assert all(
|
||||
module._diffusers_hook.get_hook("group_offloading") is not None
|
||||
for module in model.modules()
|
||||
if hasattr(module, "_diffusers_hook")
|
||||
)
|
||||
model.eval()
|
||||
output = model(self.input)[0].cpu()
|
||||
@@ -291,41 +290,37 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
output_with_group_offloading5, mem5 = run_forward(model)
|
||||
|
||||
# Precision assertions - offloading should not impact the output
|
||||
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading1, atol=1e-5))
|
||||
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading2, atol=1e-5))
|
||||
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading3, atol=1e-5))
|
||||
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading4, atol=1e-5))
|
||||
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading5, atol=1e-5))
|
||||
assert torch.allclose(output_without_group_offloading, output_with_group_offloading1, atol=1e-5)
|
||||
assert torch.allclose(output_without_group_offloading, output_with_group_offloading2, atol=1e-5)
|
||||
assert torch.allclose(output_without_group_offloading, output_with_group_offloading3, atol=1e-5)
|
||||
assert torch.allclose(output_without_group_offloading, output_with_group_offloading4, atol=1e-5)
|
||||
assert torch.allclose(output_without_group_offloading, output_with_group_offloading5, atol=1e-5)
|
||||
|
||||
# Memory assertions - offloading should reduce memory usage
|
||||
self.assertTrue(mem4 <= mem5 < mem2 <= mem3 < mem1 < mem_baseline)
|
||||
assert mem4 <= mem5 < mem2 <= mem3 < mem1 < mem_baseline
|
||||
|
||||
def test_warning_logged_if_group_offloaded_module_moved_to_accelerator(self):
|
||||
def test_warning_logged_if_group_offloaded_module_moved_to_accelerator(self, caplog):
|
||||
if torch.device(torch_device).type not in ["cuda", "xpu"]:
|
||||
return
|
||||
self.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
|
||||
logger = get_logger("diffusers.models.modeling_utils")
|
||||
logger.setLevel("INFO")
|
||||
with self.assertLogs(logger, level="WARNING") as cm:
|
||||
with caplog.at_level(logging.WARNING, logger="diffusers.models.modeling_utils"):
|
||||
self.model.to(torch_device)
|
||||
self.assertIn(f"The module '{self.model.__class__.__name__}' is group offloaded", cm.output[0])
|
||||
assert f"The module '{self.model.__class__.__name__}' is group offloaded" in caplog.text
|
||||
|
||||
def test_warning_logged_if_group_offloaded_pipe_moved_to_accelerator(self):
|
||||
def test_warning_logged_if_group_offloaded_pipe_moved_to_accelerator(self, caplog):
|
||||
if torch.device(torch_device).type not in ["cuda", "xpu"]:
|
||||
return
|
||||
pipe = DummyPipeline(self.model)
|
||||
self.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
|
||||
logger = get_logger("diffusers.pipelines.pipeline_utils")
|
||||
logger.setLevel("INFO")
|
||||
with self.assertLogs(logger, level="WARNING") as cm:
|
||||
with caplog.at_level(logging.WARNING, logger="diffusers.pipelines.pipeline_utils"):
|
||||
pipe.to(torch_device)
|
||||
self.assertIn(f"The module '{self.model.__class__.__name__}' is group offloaded", cm.output[0])
|
||||
assert f"The module '{self.model.__class__.__name__}' is group offloaded" in caplog.text
|
||||
|
||||
def test_error_raised_if_streams_used_and_no_accelerator_device(self):
|
||||
torch_accelerator_module = getattr(torch, torch_device, torch.cuda)
|
||||
original_is_available = torch_accelerator_module.is_available
|
||||
torch_accelerator_module.is_available = lambda: False
|
||||
with self.assertRaises(ValueError):
|
||||
with pytest.raises(ValueError):
|
||||
self.model.enable_group_offload(
|
||||
onload_device=torch.device(torch_device), offload_type="leaf_level", use_stream=True
|
||||
)
|
||||
@@ -333,31 +328,31 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
|
||||
def test_error_raised_if_supports_group_offloading_false(self):
|
||||
self.model._supports_group_offloading = False
|
||||
with self.assertRaisesRegex(ValueError, "does not support group offloading"):
|
||||
with pytest.raises(ValueError, match="does not support group offloading"):
|
||||
self.model.enable_group_offload(onload_device=torch.device(torch_device))
|
||||
|
||||
def test_error_raised_if_model_offloading_applied_on_group_offloaded_module(self):
|
||||
pipe = DummyPipeline(self.model)
|
||||
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
|
||||
with self.assertRaisesRegex(ValueError, "You are trying to apply model/sequential CPU offloading"):
|
||||
with pytest.raises(ValueError, match="You are trying to apply model/sequential CPU offloading"):
|
||||
pipe.enable_model_cpu_offload()
|
||||
|
||||
def test_error_raised_if_sequential_offloading_applied_on_group_offloaded_module(self):
|
||||
pipe = DummyPipeline(self.model)
|
||||
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
|
||||
with self.assertRaisesRegex(ValueError, "You are trying to apply model/sequential CPU offloading"):
|
||||
with pytest.raises(ValueError, match="You are trying to apply model/sequential CPU offloading"):
|
||||
pipe.enable_sequential_cpu_offload()
|
||||
|
||||
def test_error_raised_if_group_offloading_applied_on_model_offloaded_module(self):
|
||||
pipe = DummyPipeline(self.model)
|
||||
pipe.enable_model_cpu_offload()
|
||||
with self.assertRaisesRegex(ValueError, "Cannot apply group offloading"):
|
||||
with pytest.raises(ValueError, match="Cannot apply group offloading"):
|
||||
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
|
||||
|
||||
def test_error_raised_if_group_offloading_applied_on_sequential_offloaded_module(self):
|
||||
pipe = DummyPipeline(self.model)
|
||||
pipe.enable_sequential_cpu_offload()
|
||||
with self.assertRaisesRegex(ValueError, "Cannot apply group offloading"):
|
||||
with pytest.raises(ValueError, match="Cannot apply group offloading"):
|
||||
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
|
||||
|
||||
def test_block_level_stream_with_invocation_order_different_from_initialization_order(self):
|
||||
@@ -376,12 +371,12 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
context = contextlib.nullcontext()
|
||||
if compare_versions("diffusers", "<=", "0.33.0"):
|
||||
# Will raise a device mismatch RuntimeError mentioning weights are on CPU but input is on device
|
||||
context = self.assertRaisesRegex(RuntimeError, "Expected all tensors to be on the same device")
|
||||
context = pytest.raises(RuntimeError, match="Expected all tensors to be on the same device")
|
||||
|
||||
with context:
|
||||
model(self.input)
|
||||
|
||||
@parameterized.expand([("block_level",), ("leaf_level",)])
|
||||
@pytest.mark.parametrize("offload_type", ["block_level", "leaf_level"])
|
||||
def test_block_level_offloading_with_parameter_only_module_group(self, offload_type: str):
|
||||
if torch.device(torch_device).type not in ["cuda", "xpu"]:
|
||||
return
|
||||
@@ -407,14 +402,14 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
|
||||
out_ref = model_ref(x)
|
||||
out = model(x)
|
||||
self.assertTrue(torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match.")
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match."
|
||||
|
||||
num_repeats = 2
|
||||
for i in range(num_repeats):
|
||||
out_ref = model_ref(x)
|
||||
out = model(x)
|
||||
|
||||
self.assertTrue(torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match after multiple invocations.")
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match after multiple invocations."
|
||||
|
||||
for (ref_name, ref_module), (name, module) in zip(model_ref.named_modules(), model.named_modules()):
|
||||
assert ref_name == name
|
||||
@@ -428,9 +423,7 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
absdiff = diff.abs()
|
||||
absmax = absdiff.max().item()
|
||||
cumulated_absmax += absmax
|
||||
self.assertLess(
|
||||
cumulated_absmax, 1e-5, f"Output differences for {name} exceeded threshold: {cumulated_absmax:.5f}"
|
||||
)
|
||||
assert cumulated_absmax < 1e-5, f"Output differences for {name} exceeded threshold: {cumulated_absmax:.5f}"
|
||||
|
||||
def test_vae_like_model_without_streams(self):
|
||||
"""Test VAE-like model with block-level offloading but without streams."""
|
||||
@@ -452,9 +445,7 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
out_ref = model_ref(x).sample
|
||||
out = model(x).sample
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match for VAE-like model without streams."
|
||||
)
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match for VAE-like model without streams."
|
||||
|
||||
def test_model_with_only_standalone_layers(self):
|
||||
"""Test that models with only standalone layers (no ModuleList/Sequential) work with block-level offloading."""
|
||||
@@ -475,12 +466,11 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
for i in range(2):
|
||||
out_ref = model_ref(x)
|
||||
out = model(x)
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref, out, atol=1e-5),
|
||||
f"Outputs do not match at iteration {i} for model with standalone layers.",
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), (
|
||||
f"Outputs do not match at iteration {i} for model with standalone layers."
|
||||
)
|
||||
|
||||
@parameterized.expand([("block_level",), ("leaf_level",)])
|
||||
@pytest.mark.parametrize("offload_type", ["block_level", "leaf_level"])
|
||||
def test_standalone_conv_layers_with_both_offload_types(self, offload_type: str):
|
||||
"""Test that standalone Conv2d layers work correctly with both block-level and leaf-level offloading."""
|
||||
if torch.device(torch_device).type not in ["cuda", "xpu"]:
|
||||
@@ -501,9 +491,8 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
out_ref = model_ref(x).sample
|
||||
out = model(x).sample
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref, out, atol=1e-5),
|
||||
f"Outputs do not match for standalone Conv layers with {offload_type}.",
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), (
|
||||
f"Outputs do not match for standalone Conv layers with {offload_type}."
|
||||
)
|
||||
|
||||
def test_multiple_invocations_with_vae_like_model(self):
|
||||
@@ -526,7 +515,7 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
for i in range(2):
|
||||
out_ref = model_ref(x).sample
|
||||
out = model(x).sample
|
||||
self.assertTrue(torch.allclose(out_ref, out, atol=1e-5), f"Outputs do not match at iteration {i}.")
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), f"Outputs do not match at iteration {i}."
|
||||
|
||||
def test_nested_container_parameters_offloading(self):
|
||||
"""Test that parameters from non-computational layers in nested containers are handled correctly."""
|
||||
@@ -547,9 +536,8 @@ class GroupOffloadTests(unittest.TestCase):
|
||||
for i in range(2):
|
||||
out_ref = model_ref(x)
|
||||
out = model(x)
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref, out, atol=1e-5),
|
||||
f"Outputs do not match at iteration {i} for nested parameters.",
|
||||
assert torch.allclose(out_ref, out, atol=1e-5), (
|
||||
f"Outputs do not match at iteration {i} for nested parameters."
|
||||
)
|
||||
|
||||
def get_autoencoder_kl_config(self, block_out_channels=None, norm_num_groups=None):
|
||||
@@ -602,7 +590,7 @@ class DummyModelWithConditionalModules(ModelMixin):
|
||||
return x
|
||||
|
||||
|
||||
class ConditionalModuleGroupOffloadTests(GroupOffloadTests):
|
||||
class TestConditionalModuleGroupOffload(TestGroupOffload):
|
||||
"""Tests for conditionally-executed modules under group offloading with streams.
|
||||
|
||||
Regression tests for the case where a module is not executed during the first forward pass
|
||||
@@ -620,10 +608,10 @@ class ConditionalModuleGroupOffloadTests(GroupOffloadTests):
|
||||
num_layers=self.num_layers,
|
||||
)
|
||||
|
||||
@parameterized.expand([("leaf_level",), ("block_level",)])
|
||||
@unittest.skipIf(
|
||||
@pytest.mark.parametrize("offload_type", ["leaf_level", "block_level"])
|
||||
@pytest.mark.skipif(
|
||||
torch.device(torch_device).type not in ["cuda", "xpu"],
|
||||
"Test requires a CUDA or XPU device.",
|
||||
reason="Test requires a CUDA or XPU device.",
|
||||
)
|
||||
def test_conditional_modules_with_stream(self, offload_type: str):
|
||||
"""Regression test: conditionally-executed modules must not cause device mismatch when using streams.
|
||||
@@ -670,23 +658,20 @@ class ConditionalModuleGroupOffloadTests(GroupOffloadTests):
|
||||
# execution order is traced. optional_proj_1/2 are NOT in the traced order.
|
||||
out_ref_no_opt = model_ref(x, optional_input=None)
|
||||
out_no_opt = model(x, optional_input=None)
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref_no_opt, out_no_opt, atol=1e-5),
|
||||
f"[{offload_type}] Outputs do not match on first pass (no optional_input).",
|
||||
assert torch.allclose(out_ref_no_opt, out_no_opt, atol=1e-5), (
|
||||
f"[{offload_type}] Outputs do not match on first pass (no optional_input)."
|
||||
)
|
||||
|
||||
# Second forward pass WITH optional_input — optional_proj_1/2 ARE now called.
|
||||
out_ref_with_opt = model_ref(x, optional_input=optional_input)
|
||||
out_with_opt = model(x, optional_input=optional_input)
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref_with_opt, out_with_opt, atol=1e-5),
|
||||
f"[{offload_type}] Outputs do not match on second pass (with optional_input).",
|
||||
assert torch.allclose(out_ref_with_opt, out_with_opt, atol=1e-5), (
|
||||
f"[{offload_type}] Outputs do not match on second pass (with optional_input)."
|
||||
)
|
||||
|
||||
# Third pass again without optional_input — verify stable behavior.
|
||||
out_ref_no_opt2 = model_ref(x, optional_input=None)
|
||||
out_no_opt2 = model(x, optional_input=None)
|
||||
self.assertTrue(
|
||||
torch.allclose(out_ref_no_opt2, out_no_opt2, atol=1e-5),
|
||||
f"[{offload_type}] Outputs do not match on third pass (back to no optional_input).",
|
||||
assert torch.allclose(out_ref_no_opt2, out_no_opt2, atol=1e-5), (
|
||||
f"[{offload_type}] Outputs do not match on third pass (back to no optional_input)."
|
||||
)
|
||||
|
||||
244
tests/hooks/test_mag_cache.py
Normal file
244
tests/hooks/test_mag_cache.py
Normal file
@@ -0,0 +1,244 @@
|
||||
# Copyright 2025 HuggingFace Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from diffusers import MagCacheConfig, apply_mag_cache
|
||||
from diffusers.hooks._helpers import TransformerBlockMetadata, TransformerBlockRegistry
|
||||
from diffusers.models import ModelMixin
|
||||
from diffusers.utils import logging
|
||||
|
||||
|
||||
logger = logging.get_logger(__name__)
|
||||
|
||||
|
||||
class DummyBlock(torch.nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
||||
# Output is double input
|
||||
# This ensures Residual = 2*Input - Input = Input
|
||||
return hidden_states * 2.0
|
||||
|
||||
|
||||
class DummyTransformer(ModelMixin):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.transformer_blocks = torch.nn.ModuleList([DummyBlock(), DummyBlock()])
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None):
|
||||
for block in self.transformer_blocks:
|
||||
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
||||
return hidden_states
|
||||
|
||||
|
||||
class TupleOutputBlock(torch.nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
|
||||
# Returns a tuple
|
||||
return hidden_states * 2.0, encoder_hidden_states
|
||||
|
||||
|
||||
class TupleTransformer(ModelMixin):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.transformer_blocks = torch.nn.ModuleList([TupleOutputBlock()])
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None):
|
||||
for block in self.transformer_blocks:
|
||||
# Emulate Flux-like behavior
|
||||
output = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
|
||||
hidden_states = output[0]
|
||||
encoder_hidden_states = output[1]
|
||||
return hidden_states, encoder_hidden_states
|
||||
|
||||
|
||||
class MagCacheTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Register standard dummy block
|
||||
TransformerBlockRegistry.register(
|
||||
DummyBlock,
|
||||
TransformerBlockMetadata(return_hidden_states_index=None, return_encoder_hidden_states_index=None),
|
||||
)
|
||||
# Register tuple block (Flux style)
|
||||
TransformerBlockRegistry.register(
|
||||
TupleOutputBlock,
|
||||
TransformerBlockMetadata(return_hidden_states_index=0, return_encoder_hidden_states_index=1),
|
||||
)
|
||||
|
||||
def _set_context(self, model, context_name):
|
||||
"""Helper to set context on all hooks in the model."""
|
||||
for module in model.modules():
|
||||
if hasattr(module, "_diffusers_hook"):
|
||||
module._diffusers_hook._set_context(context_name)
|
||||
|
||||
def _get_calibration_data(self, model):
|
||||
for module in model.modules():
|
||||
if hasattr(module, "_diffusers_hook"):
|
||||
hook = module._diffusers_hook.get_hook("mag_cache_block_hook")
|
||||
if hook:
|
||||
return hook.state_manager.get_state().calibration_ratios
|
||||
return []
|
||||
|
||||
def test_mag_cache_validation(self):
|
||||
"""Test that missing mag_ratios raises ValueError."""
|
||||
with self.assertRaises(ValueError):
|
||||
MagCacheConfig(num_inference_steps=10, calibrate=False)
|
||||
|
||||
def test_mag_cache_skipping_logic(self):
|
||||
"""
|
||||
Tests that MagCache correctly calculates residuals and skips blocks when conditions are met.
|
||||
"""
|
||||
model = DummyTransformer()
|
||||
|
||||
# Dummy ratios: [1.0, 1.0] implies 0 accumulated error if we skip
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0,
|
||||
num_inference_steps=2,
|
||||
retention_ratio=0.0, # Enable immediate skipping
|
||||
max_skip_steps=5,
|
||||
mag_ratios=ratios,
|
||||
)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0: Input 10.0 -> Output 40.0 (2 blocks * 2x each)
|
||||
# HeadInput=10. Output=40. Residual=30.
|
||||
input_t0 = torch.tensor([[[10.0]]])
|
||||
output_t0 = model(input_t0)
|
||||
self.assertTrue(torch.allclose(output_t0, torch.tensor([[[40.0]]])), "Step 0 failed")
|
||||
|
||||
# Step 1: Input 11.0.
|
||||
# If Skipped: Output = Input(11) + Residual(30) = 41.0
|
||||
# If Computed: Output = 11 * 4 = 44.0
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
output_t1 = model(input_t1)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(output_t1, torch.tensor([[[41.0]]])), f"Expected Skip (41.0), got {output_t1.item()}"
|
||||
)
|
||||
|
||||
def test_mag_cache_retention(self):
|
||||
"""Test that retention_ratio prevents skipping even if error is low."""
|
||||
model = DummyTransformer()
|
||||
# Ratios that imply 0 error, so it *would* skip if retention allowed it
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0,
|
||||
num_inference_steps=2,
|
||||
retention_ratio=1.0, # Force retention for ALL steps
|
||||
mag_ratios=ratios,
|
||||
)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Step 1: Should COMPUTE (44.0) not SKIP (41.0) because of retention
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
output_t1 = model(input_t1)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(output_t1, torch.tensor([[[44.0]]])),
|
||||
f"Expected Compute (44.0) due to retention, got {output_t1.item()}",
|
||||
)
|
||||
|
||||
def test_mag_cache_tuple_outputs(self):
|
||||
"""Test compatibility with models returning (hidden, encoder_hidden) like Flux."""
|
||||
model = TupleTransformer()
|
||||
ratios = np.array([1.0, 1.0])
|
||||
|
||||
config = MagCacheConfig(threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=ratios)
|
||||
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0: Compute. Input 10.0 -> Output 20.0 (1 block * 2x)
|
||||
# Residual = 10.0
|
||||
input_t0 = torch.tensor([[[10.0]]])
|
||||
enc_t0 = torch.tensor([[[1.0]]])
|
||||
out_0, _ = model(input_t0, encoder_hidden_states=enc_t0)
|
||||
self.assertTrue(torch.allclose(out_0, torch.tensor([[[20.0]]])))
|
||||
|
||||
# Step 1: Skip. Input 11.0.
|
||||
# Skipped Output = 11 + 10 = 21.0
|
||||
input_t1 = torch.tensor([[[11.0]]])
|
||||
out_1, _ = model(input_t1, encoder_hidden_states=enc_t0)
|
||||
|
||||
self.assertTrue(
|
||||
torch.allclose(out_1, torch.tensor([[[21.0]]])), f"Tuple skip failed. Expected 21.0, got {out_1.item()}"
|
||||
)
|
||||
|
||||
def test_mag_cache_reset(self):
|
||||
"""Test that state resets correctly after num_inference_steps."""
|
||||
model = DummyTransformer()
|
||||
config = MagCacheConfig(
|
||||
threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=np.array([1.0, 1.0])
|
||||
)
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
input_t = torch.ones(1, 1, 1)
|
||||
|
||||
model(input_t) # Step 0
|
||||
model(input_t) # Step 1 (Skipped)
|
||||
|
||||
# Step 2 (Reset -> Step 0) -> Should Compute
|
||||
# Input 2.0 -> Output 8.0
|
||||
input_t2 = torch.tensor([[[2.0]]])
|
||||
output_t2 = model(input_t2)
|
||||
|
||||
self.assertTrue(torch.allclose(output_t2, torch.tensor([[[8.0]]])), "State did not reset correctly")
|
||||
|
||||
def test_mag_cache_calibration(self):
|
||||
"""Test that calibration mode records ratios."""
|
||||
model = DummyTransformer()
|
||||
config = MagCacheConfig(num_inference_steps=2, calibrate=True)
|
||||
apply_mag_cache(model, config)
|
||||
self._set_context(model, "test_context")
|
||||
|
||||
# Step 0
|
||||
# HeadInput = 10. Output = 40. Residual = 30.
|
||||
# Ratio 0 is placeholder 1.0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Check intermediate state
|
||||
ratios = self._get_calibration_data(model)
|
||||
self.assertEqual(len(ratios), 1)
|
||||
self.assertEqual(ratios[0], 1.0)
|
||||
|
||||
# Step 1
|
||||
# HeadInput = 10. Output = 40. Residual = 30.
|
||||
# PrevResidual = 30. CurrResidual = 30.
|
||||
# Ratio = 30/30 = 1.0
|
||||
model(torch.tensor([[[10.0]]]))
|
||||
|
||||
# Verify it computes fully (no skip)
|
||||
# If it skipped, output would be 41.0. It should be 40.0
|
||||
# Actually in test setup, input is same (10.0) so output 40.0.
|
||||
# Let's ensure list is empty after reset (end of step 1)
|
||||
ratios_after = self._get_calibration_data(model)
|
||||
self.assertEqual(ratios_after, [])
|
||||
@@ -5,8 +5,6 @@ from .cache import (
|
||||
FasterCacheTesterMixin,
|
||||
FirstBlockCacheConfigMixin,
|
||||
FirstBlockCacheTesterMixin,
|
||||
MagCacheConfigMixin,
|
||||
MagCacheTesterMixin,
|
||||
PyramidAttentionBroadcastConfigMixin,
|
||||
PyramidAttentionBroadcastTesterMixin,
|
||||
)
|
||||
@@ -52,8 +50,6 @@ __all__ = [
|
||||
"FasterCacheTesterMixin",
|
||||
"FirstBlockCacheConfigMixin",
|
||||
"FirstBlockCacheTesterMixin",
|
||||
"MagCacheConfigMixin",
|
||||
"MagCacheTesterMixin",
|
||||
"GGUFCompileTesterMixin",
|
||||
"GGUFConfigMixin",
|
||||
"GGUFTesterMixin",
|
||||
|
||||
@@ -18,10 +18,9 @@ import gc
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from diffusers.hooks import FasterCacheConfig, FirstBlockCacheConfig, MagCacheConfig, PyramidAttentionBroadcastConfig
|
||||
from diffusers.hooks import FasterCacheConfig, FirstBlockCacheConfig, PyramidAttentionBroadcastConfig
|
||||
from diffusers.hooks.faster_cache import _FASTER_CACHE_BLOCK_HOOK, _FASTER_CACHE_DENOISER_HOOK
|
||||
from diffusers.hooks.first_block_cache import _FBC_BLOCK_HOOK, _FBC_LEADER_BLOCK_HOOK
|
||||
from diffusers.hooks.mag_cache import _MAG_CACHE_BLOCK_HOOK, _MAG_CACHE_LEADER_BLOCK_HOOK
|
||||
from diffusers.hooks.pyramid_attention_broadcast import _PYRAMID_ATTENTION_BROADCAST_HOOK
|
||||
from diffusers.models.cache_utils import CacheMixin
|
||||
|
||||
@@ -555,70 +554,3 @@ class FasterCacheTesterMixin(FasterCacheConfigMixin, CacheTesterMixin):
|
||||
@require_cache_mixin
|
||||
def test_faster_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
|
||||
@is_cache
|
||||
class MagCacheConfigMixin:
|
||||
"""
|
||||
Base mixin providing MagCache config.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
"""
|
||||
|
||||
# Default MagCache config - can be overridden by subclasses.
|
||||
# Uses neutral ratios [1.0, 1.0] and a high threshold so the second
|
||||
# inference step is always skipped, which is required by _test_cache_inference.
|
||||
MAG_CACHE_CONFIG = {
|
||||
"num_inference_steps": 2,
|
||||
"retention_ratio": 0.0,
|
||||
"threshold": 100.0,
|
||||
"mag_ratios": [1.0, 1.0],
|
||||
}
|
||||
|
||||
def _get_cache_config(self):
|
||||
return MagCacheConfig(**self.MAG_CACHE_CONFIG)
|
||||
|
||||
def _get_hook_names(self):
|
||||
return [_MAG_CACHE_LEADER_BLOCK_HOOK, _MAG_CACHE_BLOCK_HOOK]
|
||||
|
||||
|
||||
@is_cache
|
||||
class MagCacheTesterMixin(MagCacheConfigMixin, CacheTesterMixin):
|
||||
"""
|
||||
Mixin class for testing MagCache on models.
|
||||
|
||||
Expected class attributes:
|
||||
- model_class: The model class to test (must use CacheMixin)
|
||||
|
||||
Expected methods to be implemented by subclasses:
|
||||
- get_init_dict(): Returns dict of arguments to initialize the model
|
||||
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
|
||||
|
||||
Pytest mark: cache
|
||||
Use `pytest -m "not cache"` to skip these tests
|
||||
"""
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_enable_disable_state(self):
|
||||
self._test_cache_enable_disable_state()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_double_enable_raises_error(self):
|
||||
self._test_cache_double_enable_raises_error()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_hooks_registered(self):
|
||||
self._test_cache_hooks_registered()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_inference(self):
|
||||
self._test_cache_inference()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_context_manager(self):
|
||||
self._test_cache_context_manager()
|
||||
|
||||
@require_cache_mixin
|
||||
def test_mag_cache_reset_stateful_cache(self):
|
||||
self._test_reset_stateful_cache()
|
||||
|
||||
@@ -37,7 +37,6 @@ from ..testing_utils import (
|
||||
IPAdapterTesterMixin,
|
||||
LoraHotSwappingForModelTesterMixin,
|
||||
LoraTesterMixin,
|
||||
MagCacheTesterMixin,
|
||||
MemoryTesterMixin,
|
||||
ModelOptCompileTesterMixin,
|
||||
ModelOptTesterMixin,
|
||||
@@ -431,7 +430,3 @@ class TestFluxTransformerFasterCache(FluxTransformerTesterConfig, FasterCacheTes
|
||||
"tensor_format": "BCHW",
|
||||
"is_guidance_distilled": True,
|
||||
}
|
||||
|
||||
|
||||
class TestFluxTransformerMagCache(FluxTransformerTesterConfig, MagCacheTesterMixin):
|
||||
"""MagCache tests for Flux Transformer."""
|
||||
|
||||
Reference in New Issue
Block a user