Compare commits

..

4 Commits

Author SHA1 Message Date
Sayak Paul
1e6578bbe3 Merge branch 'main' into refactor-caching-tests 2026-03-10 09:25:51 +05:30
Sayak Paul
81aa43271b Merge branch 'main' into refactor-caching-tests 2026-03-10 08:57:11 +05:30
sayakpaul
9239908f5d include taylorseer in the caching mixin. 2026-03-10 08:56:42 +05:30
sayakpaul
9cd3e6ba88 refactor magcache tests. 2026-03-09 19:26:42 +05:30
5 changed files with 310 additions and 479 deletions

View File

@@ -1,244 +0,0 @@
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from diffusers import MagCacheConfig, apply_mag_cache
from diffusers.hooks._helpers import TransformerBlockMetadata, TransformerBlockRegistry
from diffusers.models import ModelMixin
from diffusers.utils import logging
logger = logging.get_logger(__name__)
class DummyBlock(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
# Output is double input
# This ensures Residual = 2*Input - Input = Input
return hidden_states * 2.0
class DummyTransformer(ModelMixin):
def __init__(self):
super().__init__()
self.transformer_blocks = torch.nn.ModuleList([DummyBlock(), DummyBlock()])
def forward(self, hidden_states, encoder_hidden_states=None):
for block in self.transformer_blocks:
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
return hidden_states
class TupleOutputBlock(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
# Returns a tuple
return hidden_states * 2.0, encoder_hidden_states
class TupleTransformer(ModelMixin):
def __init__(self):
super().__init__()
self.transformer_blocks = torch.nn.ModuleList([TupleOutputBlock()])
def forward(self, hidden_states, encoder_hidden_states=None):
for block in self.transformer_blocks:
# Emulate Flux-like behavior
output = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
hidden_states = output[0]
encoder_hidden_states = output[1]
return hidden_states, encoder_hidden_states
class MagCacheTests(unittest.TestCase):
def setUp(self):
# Register standard dummy block
TransformerBlockRegistry.register(
DummyBlock,
TransformerBlockMetadata(return_hidden_states_index=None, return_encoder_hidden_states_index=None),
)
# Register tuple block (Flux style)
TransformerBlockRegistry.register(
TupleOutputBlock,
TransformerBlockMetadata(return_hidden_states_index=0, return_encoder_hidden_states_index=1),
)
def _set_context(self, model, context_name):
"""Helper to set context on all hooks in the model."""
for module in model.modules():
if hasattr(module, "_diffusers_hook"):
module._diffusers_hook._set_context(context_name)
def _get_calibration_data(self, model):
for module in model.modules():
if hasattr(module, "_diffusers_hook"):
hook = module._diffusers_hook.get_hook("mag_cache_block_hook")
if hook:
return hook.state_manager.get_state().calibration_ratios
return []
def test_mag_cache_validation(self):
"""Test that missing mag_ratios raises ValueError."""
with self.assertRaises(ValueError):
MagCacheConfig(num_inference_steps=10, calibrate=False)
def test_mag_cache_skipping_logic(self):
"""
Tests that MagCache correctly calculates residuals and skips blocks when conditions are met.
"""
model = DummyTransformer()
# Dummy ratios: [1.0, 1.0] implies 0 accumulated error if we skip
ratios = np.array([1.0, 1.0])
config = MagCacheConfig(
threshold=100.0,
num_inference_steps=2,
retention_ratio=0.0, # Enable immediate skipping
max_skip_steps=5,
mag_ratios=ratios,
)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0: Input 10.0 -> Output 40.0 (2 blocks * 2x each)
# HeadInput=10. Output=40. Residual=30.
input_t0 = torch.tensor([[[10.0]]])
output_t0 = model(input_t0)
self.assertTrue(torch.allclose(output_t0, torch.tensor([[[40.0]]])), "Step 0 failed")
# Step 1: Input 11.0.
# If Skipped: Output = Input(11) + Residual(30) = 41.0
# If Computed: Output = 11 * 4 = 44.0
input_t1 = torch.tensor([[[11.0]]])
output_t1 = model(input_t1)
self.assertTrue(
torch.allclose(output_t1, torch.tensor([[[41.0]]])), f"Expected Skip (41.0), got {output_t1.item()}"
)
def test_mag_cache_retention(self):
"""Test that retention_ratio prevents skipping even if error is low."""
model = DummyTransformer()
# Ratios that imply 0 error, so it *would* skip if retention allowed it
ratios = np.array([1.0, 1.0])
config = MagCacheConfig(
threshold=100.0,
num_inference_steps=2,
retention_ratio=1.0, # Force retention for ALL steps
mag_ratios=ratios,
)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0
model(torch.tensor([[[10.0]]]))
# Step 1: Should COMPUTE (44.0) not SKIP (41.0) because of retention
input_t1 = torch.tensor([[[11.0]]])
output_t1 = model(input_t1)
self.assertTrue(
torch.allclose(output_t1, torch.tensor([[[44.0]]])),
f"Expected Compute (44.0) due to retention, got {output_t1.item()}",
)
def test_mag_cache_tuple_outputs(self):
"""Test compatibility with models returning (hidden, encoder_hidden) like Flux."""
model = TupleTransformer()
ratios = np.array([1.0, 1.0])
config = MagCacheConfig(threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=ratios)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0: Compute. Input 10.0 -> Output 20.0 (1 block * 2x)
# Residual = 10.0
input_t0 = torch.tensor([[[10.0]]])
enc_t0 = torch.tensor([[[1.0]]])
out_0, _ = model(input_t0, encoder_hidden_states=enc_t0)
self.assertTrue(torch.allclose(out_0, torch.tensor([[[20.0]]])))
# Step 1: Skip. Input 11.0.
# Skipped Output = 11 + 10 = 21.0
input_t1 = torch.tensor([[[11.0]]])
out_1, _ = model(input_t1, encoder_hidden_states=enc_t0)
self.assertTrue(
torch.allclose(out_1, torch.tensor([[[21.0]]])), f"Tuple skip failed. Expected 21.0, got {out_1.item()}"
)
def test_mag_cache_reset(self):
"""Test that state resets correctly after num_inference_steps."""
model = DummyTransformer()
config = MagCacheConfig(
threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=np.array([1.0, 1.0])
)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
input_t = torch.ones(1, 1, 1)
model(input_t) # Step 0
model(input_t) # Step 1 (Skipped)
# Step 2 (Reset -> Step 0) -> Should Compute
# Input 2.0 -> Output 8.0
input_t2 = torch.tensor([[[2.0]]])
output_t2 = model(input_t2)
self.assertTrue(torch.allclose(output_t2, torch.tensor([[[8.0]]])), "State did not reset correctly")
def test_mag_cache_calibration(self):
"""Test that calibration mode records ratios."""
model = DummyTransformer()
config = MagCacheConfig(num_inference_steps=2, calibrate=True)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0
# HeadInput = 10. Output = 40. Residual = 30.
# Ratio 0 is placeholder 1.0
model(torch.tensor([[[10.0]]]))
# Check intermediate state
ratios = self._get_calibration_data(model)
self.assertEqual(len(ratios), 1)
self.assertEqual(ratios[0], 1.0)
# Step 1
# HeadInput = 10. Output = 40. Residual = 30.
# PrevResidual = 30. CurrResidual = 30.
# Ratio = 30/30 = 1.0
model(torch.tensor([[[10.0]]]))
# Verify it computes fully (no skip)
# If it skipped, output would be 41.0. It should be 40.0
# Actually in test setup, input is same (10.0) so output 40.0.
# Let's ensure list is empty after reset (end of step 1)
ratios_after = self._get_calibration_data(model)
self.assertEqual(ratios_after, [])

View File

@@ -5,8 +5,12 @@ from .cache import (
FasterCacheTesterMixin,
FirstBlockCacheConfigMixin,
FirstBlockCacheTesterMixin,
MagCacheConfigMixin,
MagCacheTesterMixin,
PyramidAttentionBroadcastConfigMixin,
PyramidAttentionBroadcastTesterMixin,
TaylorSeerCacheConfigMixin,
TaylorSeerCacheTesterMixin,
)
from .common import BaseModelTesterConfig, ModelTesterMixin
from .compile import TorchCompileTesterMixin
@@ -50,6 +54,8 @@ __all__ = [
"FasterCacheTesterMixin",
"FirstBlockCacheConfigMixin",
"FirstBlockCacheTesterMixin",
"MagCacheConfigMixin",
"MagCacheTesterMixin",
"GGUFCompileTesterMixin",
"GGUFConfigMixin",
"GGUFTesterMixin",
@@ -65,6 +71,8 @@ __all__ = [
"ModelTesterMixin",
"PyramidAttentionBroadcastConfigMixin",
"PyramidAttentionBroadcastTesterMixin",
"TaylorSeerCacheConfigMixin",
"TaylorSeerCacheTesterMixin",
"QuantizationCompileTesterMixin",
"QuantizationTesterMixin",
"QuantoCompileTesterMixin",

View File

@@ -18,10 +18,18 @@ import gc
import pytest
import torch
from diffusers.hooks import FasterCacheConfig, FirstBlockCacheConfig, PyramidAttentionBroadcastConfig
from diffusers.hooks import (
FasterCacheConfig,
FirstBlockCacheConfig,
MagCacheConfig,
PyramidAttentionBroadcastConfig,
TaylorSeerCacheConfig,
)
from diffusers.hooks.faster_cache import _FASTER_CACHE_BLOCK_HOOK, _FASTER_CACHE_DENOISER_HOOK
from diffusers.hooks.first_block_cache import _FBC_BLOCK_HOOK, _FBC_LEADER_BLOCK_HOOK
from diffusers.hooks.mag_cache import _MAG_CACHE_BLOCK_HOOK, _MAG_CACHE_LEADER_BLOCK_HOOK
from diffusers.hooks.pyramid_attention_broadcast import _PYRAMID_ATTENTION_BROADCAST_HOOK
from diffusers.hooks.taylorseer_cache import _TAYLORSEER_CACHE_HOOK
from diffusers.models.cache_utils import CacheMixin
from ...testing_utils import assert_tensors_close, backend_empty_cache, is_cache, torch_device
@@ -554,3 +562,192 @@ class FasterCacheTesterMixin(FasterCacheConfigMixin, CacheTesterMixin):
@require_cache_mixin
def test_faster_cache_reset_stateful_cache(self):
self._test_reset_stateful_cache()
@is_cache
class MagCacheConfigMixin:
"""
Base mixin providing MagCache config.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
"""
# Default MagCache config - can be overridden by subclasses.
# Uses neutral ratios [1.0, 1.0] and a high threshold so the second
# inference step is always skipped, which is required by _test_cache_inference.
MAG_CACHE_CONFIG = {
"num_inference_steps": 2,
"retention_ratio": 0.0,
"threshold": 100.0,
"mag_ratios": [1.0, 1.0],
}
def _get_cache_config(self):
return MagCacheConfig(**self.MAG_CACHE_CONFIG)
def _get_hook_names(self):
return [_MAG_CACHE_LEADER_BLOCK_HOOK, _MAG_CACHE_BLOCK_HOOK]
@is_cache
class MagCacheTesterMixin(MagCacheConfigMixin, CacheTesterMixin):
"""
Mixin class for testing MagCache on models.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
Expected methods to be implemented by subclasses:
- get_init_dict(): Returns dict of arguments to initialize the model
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
Pytest mark: cache
Use `pytest -m "not cache"` to skip these tests
"""
@require_cache_mixin
def test_mag_cache_enable_disable_state(self):
self._test_cache_enable_disable_state()
@require_cache_mixin
def test_mag_cache_double_enable_raises_error(self):
self._test_cache_double_enable_raises_error()
@require_cache_mixin
def test_mag_cache_hooks_registered(self):
self._test_cache_hooks_registered()
@require_cache_mixin
def test_mag_cache_inference(self):
self._test_cache_inference()
@require_cache_mixin
def test_mag_cache_context_manager(self):
self._test_cache_context_manager()
@require_cache_mixin
def test_mag_cache_reset_stateful_cache(self):
self._test_reset_stateful_cache()
@is_cache
class TaylorSeerCacheConfigMixin:
"""
Base mixin providing TaylorSeerCache config.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
"""
# Default TaylorSeerCache config - can be overridden by subclasses.
# Uses a low cache_interval and disable_cache_before_step=0 so the second
# inference step is always predicted, which is required by _test_cache_inference.
TAYLORSEER_CACHE_CONFIG = {
"cache_interval": 3,
"disable_cache_before_step": 1,
"max_order": 1,
}
def _get_cache_config(self):
return TaylorSeerCacheConfig(**self.TAYLORSEER_CACHE_CONFIG)
def _get_hook_names(self):
return [_TAYLORSEER_CACHE_HOOK]
@is_cache
class TaylorSeerCacheTesterMixin(TaylorSeerCacheConfigMixin, CacheTesterMixin):
"""
Mixin class for testing TaylorSeerCache on models.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
Expected methods to be implemented by subclasses:
- get_init_dict(): Returns dict of arguments to initialize the model
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
Pytest mark: cache
Use `pytest -m "not cache"` to skip these tests
"""
@torch.no_grad()
def _test_cache_inference(self):
"""Test that model can run inference with TaylorSeer cache enabled (requires cache_context)."""
init_dict = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**init_dict).to(torch_device)
model.eval()
config = self._get_cache_config()
model.enable_cache(config)
# TaylorSeer requires cache_context to be set for inference
with model.cache_context("taylorseer_test"):
# First pass populates the cache
_ = model(**inputs_dict, return_dict=False)[0]
# Create modified inputs for second pass
inputs_dict_step2 = inputs_dict.copy()
if self.cache_input_key in inputs_dict_step2:
inputs_dict_step2[self.cache_input_key] = inputs_dict_step2[self.cache_input_key] + torch.randn_like(
inputs_dict_step2[self.cache_input_key]
)
# Second pass - TaylorSeer should use cached Taylor series predictions
output_with_cache = model(**inputs_dict_step2, return_dict=False)[0]
assert output_with_cache is not None, "Model output should not be None with cache enabled."
assert not torch.isnan(output_with_cache).any(), "Model output contains NaN with cache enabled."
# Run same inputs without cache to compare
model.disable_cache()
output_without_cache = model(**inputs_dict_step2, return_dict=False)[0]
# Cached output should be different from non-cached output (due to approximation)
assert not torch.allclose(output_without_cache, output_with_cache, atol=1e-5), (
"Cached output should be different from non-cached output due to cache approximation."
)
@torch.no_grad()
def _test_reset_stateful_cache(self):
"""Test that _reset_stateful_cache resets the TaylorSeer cache state (requires cache_context)."""
init_dict = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**init_dict).to(torch_device)
model.eval()
config = self._get_cache_config()
model.enable_cache(config)
with model.cache_context("taylorseer_test"):
_ = model(**inputs_dict, return_dict=False)[0]
model._reset_stateful_cache()
model.disable_cache()
@require_cache_mixin
def test_taylorseer_cache_enable_disable_state(self):
self._test_cache_enable_disable_state()
@require_cache_mixin
def test_taylorseer_cache_double_enable_raises_error(self):
self._test_cache_double_enable_raises_error()
@require_cache_mixin
def test_taylorseer_cache_hooks_registered(self):
self._test_cache_hooks_registered()
@require_cache_mixin
def test_taylorseer_cache_inference(self):
self._test_cache_inference()
@require_cache_mixin
def test_taylorseer_cache_context_manager(self):
self._test_cache_context_manager()
@require_cache_mixin
def test_taylorseer_cache_reset_stateful_cache(self):
self._test_reset_stateful_cache()

View File

@@ -37,6 +37,7 @@ from ..testing_utils import (
IPAdapterTesterMixin,
LoraHotSwappingForModelTesterMixin,
LoraTesterMixin,
MagCacheTesterMixin,
MemoryTesterMixin,
ModelOptCompileTesterMixin,
ModelOptTesterMixin,
@@ -45,6 +46,7 @@ from ..testing_utils import (
QuantoCompileTesterMixin,
QuantoTesterMixin,
SingleFileTesterMixin,
TaylorSeerCacheTesterMixin,
TorchAoCompileTesterMixin,
TorchAoTesterMixin,
TorchCompileTesterMixin,
@@ -430,3 +432,11 @@ class TestFluxTransformerFasterCache(FluxTransformerTesterConfig, FasterCacheTes
"tensor_format": "BCHW",
"is_guidance_distilled": True,
}
class TestFluxTransformerMagCache(FluxTransformerTesterConfig, MagCacheTesterMixin):
"""MagCache tests for Flux Transformer."""
class TestFluxTransformerTaylorSeerCache(FluxTransformerTesterConfig, TaylorSeerCacheTesterMixin):
"""TaylorSeerCache tests for Flux Transformer."""

View File

@@ -1,3 +1,4 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,23 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import os
import unittest
import pytest
import torch
from diffusers import ZImageTransformer2DModel
from diffusers.utils.torch_utils import randn_tensor
from ...testing_utils import assert_tensors_close, torch_device
from ..testing_utils import (
BaseModelTesterConfig,
LoraTesterMixin,
MemoryTesterMixin,
ModelTesterMixin,
TorchCompileTesterMixin,
TrainingTesterMixin,
)
from ...testing_utils import IS_GITHUB_ACTIONS, torch_device
from ..test_modeling_common import ModelTesterMixin, TorchCompileTesterMixin
# Z-Image requires torch.use_deterministic_algorithms(False) due to complex64 RoPE operations
@@ -42,38 +36,44 @@ if hasattr(torch.backends, "cuda"):
torch.backends.cuda.matmul.allow_tf32 = False
def _concat_list_output(output):
"""Model output `sample` is a list of tensors. Concatenate them for comparison."""
return torch.cat([t.flatten() for t in output])
@unittest.skipIf(
IS_GITHUB_ACTIONS,
reason="Skipping test-suite inside the CI because the model has `torch.empty()` inside of it during init and we don't have a clear way to override it in the modeling tests.",
)
class ZImageTransformerTests(ModelTesterMixin, unittest.TestCase):
model_class = ZImageTransformer2DModel
main_input_name = "x"
# We override the items here because the transformer under consideration is small.
model_split_percents = [0.9, 0.9, 0.9]
def prepare_dummy_input(self, height=16, width=16):
batch_size = 1
num_channels = 16
embedding_dim = 16
sequence_length = 16
class ZImageTransformerTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return ZImageTransformer2DModel
hidden_states = [torch.randn((num_channels, 1, height, width)).to(torch_device) for _ in range(batch_size)]
encoder_hidden_states = [
torch.randn((sequence_length, embedding_dim)).to(torch_device) for _ in range(batch_size)
]
timestep = torch.tensor([0.0]).to(torch_device)
return {"x": hidden_states, "cap_feats": encoder_hidden_states, "t": timestep}
@property
def output_shape(self) -> tuple[int, ...]:
def dummy_input(self):
return self.prepare_dummy_input()
@property
def input_shape(self):
return (4, 32, 32)
@property
def input_shape(self) -> tuple[int, ...]:
def output_shape(self):
return (4, 32, 32)
@property
def model_split_percents(self) -> list:
return [0.9, 0.9, 0.9]
@property
def main_input_name(self) -> str:
return "x"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self):
return {
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"all_patch_size": (2,),
"all_f_patch_size": (1,),
"in_channels": 16,
@@ -89,223 +89,83 @@ class ZImageTransformerTesterConfig(BaseModelTesterConfig):
"axes_dims": [8, 4, 4],
"axes_lens": [256, 32, 32],
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def get_dummy_inputs(self) -> dict[str, torch.Tensor | list]:
batch_size = 1
num_channels = 16
embedding_dim = 16
sequence_length = 16
height = 16
width = 16
hidden_states = [
randn_tensor((num_channels, 1, height, width), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
encoder_hidden_states = [
randn_tensor((sequence_length, embedding_dim), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
timestep = torch.tensor([0.0]).to(torch_device)
return {"x": hidden_states, "cap_feats": encoder_hidden_states, "t": timestep}
class TestZImageTransformer(ZImageTransformerTesterConfig, ModelTesterMixin):
"""Core model tests for Z-Image Transformer."""
@torch.no_grad()
def test_determinism(self, atol=1e-5, rtol=0):
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
model.eval()
inputs_dict = self.get_dummy_inputs()
first = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
second = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
mask = ~(torch.isnan(first) | torch.isnan(second))
assert_tensors_close(
first[mask], second[mask], atol=atol, rtol=rtol, msg="Model outputs are not deterministic"
)
def test_from_save_pretrained(self, tmp_path, atol=5e-5, rtol=5e-5):
def setUp(self):
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
torch.manual_seed(0)
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
model.eval()
model.save_pretrained(tmp_path)
new_model = self.model_class.from_pretrained(tmp_path)
new_model.to(torch_device)
for param_name in model.state_dict().keys():
param_1 = model.state_dict()[param_name]
param_2 = new_model.state_dict()[param_name]
assert param_1.shape == param_2.shape
inputs_dict = self.get_dummy_inputs()
image = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
new_image = _concat_list_output(new_model(**inputs_dict, return_dict=False)[0])
assert_tensors_close(image, new_image, atol=atol, rtol=rtol, msg="Models give different forward passes.")
@torch.no_grad()
def test_from_save_pretrained_variant(self, tmp_path, atol=5e-5, rtol=0):
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
model.eval()
model.save_pretrained(tmp_path, variant="fp16")
new_model = self.model_class.from_pretrained(tmp_path, variant="fp16")
with pytest.raises(OSError) as exc_info:
self.model_class.from_pretrained(tmp_path)
assert "Error no file named diffusion_pytorch_model.bin found in directory" in str(exc_info.value)
new_model.to(torch_device)
inputs_dict = self.get_dummy_inputs()
image = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
new_image = _concat_list_output(new_model(**inputs_dict, return_dict=False)[0])
assert_tensors_close(image, new_image, atol=atol, rtol=rtol, msg="Models give different forward passes.")
@pytest.mark.skip("Model output `sample` is a list of tensors, not a single tensor.")
def test_outputs_equivalence(self, atol=1e-5, rtol=0):
pass
def test_sharded_checkpoints_with_parallel_loading(self, tmp_path, atol=1e-5, rtol=0):
from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME, constants
from ..testing_utils.common import calculate_expected_num_shards, compute_module_persistent_sizes
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
def tearDown(self):
super().tearDown()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
torch.manual_seed(0)
config = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**config).eval()
model = model.to(torch_device)
base_output = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
model_size = compute_module_persistent_sizes(model)[""]
max_shard_size = int((model_size * 0.75) / (2**10))
original_parallel_loading = constants.HF_ENABLE_PARALLEL_LOADING
original_parallel_workers = getattr(constants, "HF_PARALLEL_WORKERS", None)
try:
model.cpu().save_pretrained(tmp_path, max_shard_size=f"{max_shard_size}KB")
assert os.path.exists(os.path.join(tmp_path, SAFE_WEIGHTS_INDEX_NAME))
expected_num_shards = calculate_expected_num_shards(os.path.join(tmp_path, SAFE_WEIGHTS_INDEX_NAME))
actual_num_shards = len([file for file in os.listdir(tmp_path) if file.endswith(".safetensors")])
assert actual_num_shards == expected_num_shards
constants.HF_ENABLE_PARALLEL_LOADING = False
self.model_class.from_pretrained(tmp_path).eval().to(torch_device)
constants.HF_ENABLE_PARALLEL_LOADING = True
constants.DEFAULT_HF_PARALLEL_LOADING_WORKERS = 2
torch.manual_seed(0)
model_parallel = self.model_class.from_pretrained(tmp_path).eval()
model_parallel = model_parallel.to(torch_device)
output_parallel = _concat_list_output(model_parallel(**inputs_dict, return_dict=False)[0])
assert_tensors_close(
base_output, output_parallel, atol=atol, rtol=rtol, msg="Output should match with parallel loading"
)
finally:
constants.HF_ENABLE_PARALLEL_LOADING = original_parallel_loading
if original_parallel_workers is not None:
constants.HF_PARALLEL_WORKERS = original_parallel_workers
class TestZImageTransformerMemory(ZImageTransformerTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Z-Image Transformer."""
@pytest.mark.skip(
"Ensure `x_pad_token` and `cap_pad_token` are cast to the same dtype as the destination tensor before they are assigned to the padding indices."
)
def test_layerwise_casting_training(self):
pass
class TestZImageTransformerTraining(ZImageTransformerTesterConfig, TrainingTesterMixin):
"""Training tests for Z-Image Transformer."""
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
def test_gradient_checkpointing_is_applied(self):
super().test_gradient_checkpointing_is_applied(expected_set={"ZImageTransformer2DModel"})
expected_set = {"ZImageTransformer2DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
@pytest.mark.skip("Test is not supported for handling main inputs that are lists.")
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_training(self):
pass
super().test_training()
@pytest.mark.skip("Test is not supported for handling main inputs that are lists.")
def test_training_with_ema(self):
pass
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_ema_training(self):
super().test_ema_training()
@pytest.mark.skip("Test is not supported for handling main inputs that are lists.")
def test_gradient_checkpointing_equivalence(self, loss_tolerance=1e-5, param_grad_tol=5e-5, skip=None):
pass
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_effective_gradient_checkpointing(self):
super().test_effective_gradient_checkpointing()
@unittest.skip(
"Test needs to be revisited. But we need to ensure `x_pad_token` and `cap_pad_token` are cast to the same dtype as the destination tensor before they are assigned to the padding indices."
)
def test_layerwise_casting_training(self):
super().test_layerwise_casting_training()
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_outputs_equivalence(self):
super().test_outputs_equivalence()
@unittest.skip("Test will pass if we change to deterministic values instead of empty in the DiT.")
def test_group_offloading(self):
super().test_group_offloading()
@unittest.skip("Test will pass if we change to deterministic values instead of empty in the DiT.")
def test_group_offloading_with_disk(self):
super().test_group_offloading_with_disk()
class TestZImageTransformerLoRA(ZImageTransformerTesterConfig, LoraTesterMixin):
"""LoRA adapter tests for Z-Image Transformer."""
class ZImageTransformerCompileTests(TorchCompileTesterMixin, unittest.TestCase):
model_class = ZImageTransformer2DModel
different_shapes_for_compilation = [(4, 4), (4, 8), (8, 8)]
@pytest.mark.skip("Model output `sample` is a list of tensors, not a single tensor.")
def test_save_load_lora_adapter(self, tmp_path, rank=4, lora_alpha=4, use_dora=False, atol=1e-4, rtol=1e-4):
pass
def prepare_init_args_and_inputs_for_common(self):
return ZImageTransformerTests().prepare_init_args_and_inputs_for_common()
def prepare_dummy_input(self, height, width):
return ZImageTransformerTests().prepare_dummy_input(height=height, width=width)
# TODO: Add pretrained_model_name_or_path once a tiny Z-Image model is available on the Hub
# class TestZImageTransformerBitsAndBytes(ZImageTransformerTesterConfig, BitsAndBytesTesterMixin):
# """BitsAndBytes quantization tests for Z-Image Transformer."""
# TODO: Add pretrained_model_name_or_path once a tiny Z-Image model is available on the Hub
# class TestZImageTransformerTorchAo(ZImageTransformerTesterConfig, TorchAoTesterMixin):
# """TorchAo quantization tests for Z-Image Transformer."""
class TestZImageTransformerCompile(ZImageTransformerTesterConfig, TorchCompileTesterMixin):
"""Torch compile tests for Z-Image Transformer."""
@property
def different_shapes_for_compilation(self):
return [(4, 4), (4, 8), (8, 8)]
def get_dummy_inputs(self, height: int = 16, width: int = 16) -> dict[str, torch.Tensor | list]:
batch_size = 1
num_channels = 16
embedding_dim = 16
sequence_length = 16
hidden_states = [
randn_tensor((num_channels, 1, height, width), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
encoder_hidden_states = [
randn_tensor((sequence_length, embedding_dim), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
timestep = torch.tensor([0.0]).to(torch_device)
return {"x": hidden_states, "cap_feats": encoder_hidden_states, "t": timestep}
@pytest.mark.skip(
"The repeated block in this model is ZImageTransformerBlock, which is used for noise_refiner, context_refiner, and layers. The inputs recorded for the block would vary during compilation and full compilation with fullgraph=True would trigger recompilation at least thrice."
@unittest.skip(
"The repeated block in this model is ZImageTransformerBlock, which is used for noise_refiner, context_refiner, and layers. As a consequence of this, the inputs recorded for the block would vary during compilation and full compilation with fullgraph=True would trigger recompilation at least thrice."
)
def test_torch_compile_recompilation_and_graph_break(self):
pass
super().test_torch_compile_recompilation_and_graph_break()
@pytest.mark.skip("Fullgraph AoT is broken")
def test_compile_works_with_aot(self, tmp_path):
pass
@unittest.skip("Fullgraph AoT is broken")
def test_compile_works_with_aot(self):
super().test_compile_works_with_aot()
@pytest.mark.skip("Fullgraph is broken")
@unittest.skip("Fullgraph is broken")
def test_compile_on_different_shapes(self):
pass
super().test_compile_on_different_shapes()