Compare commits

..

4 Commits

Author SHA1 Message Date
DN6
73b23dc92e update 2026-03-11 14:02:31 +05:30
DN6
c15472d2c4 update 2026-03-11 11:32:28 +05:30
DN6
d15761686a update 2026-03-11 11:23:01 +05:30
DN6
5352999e14 update 2026-03-11 10:39:09 +05:30
2 changed files with 264 additions and 121 deletions

View File

@@ -13,8 +13,8 @@
# limitations under the License.
import gc
import unittest
import pytest
import torch
from diffusers.hooks import HookRegistry, ModelHook
@@ -134,18 +134,20 @@ class SkipLayerHook(ModelHook):
return output
class TestHooks:
class HookTests(unittest.TestCase):
in_features = 4
hidden_features = 8
out_features = 4
num_layers = 2
def setup_method(self):
def setUp(self):
params = self.get_module_parameters()
self.model = DummyModel(**params)
self.model.to(torch_device)
def teardown_method(self):
def tearDown(self):
super().tearDown()
del self.model
gc.collect()
free_memory()
@@ -169,20 +171,20 @@ class TestHooks:
registry_repr = repr(registry)
expected_repr = "HookRegistry(\n (0) add_hook - AddHook\n (1) multiply_hook - MultiplyHook(value=2)\n)"
assert len(registry.hooks) == 2
assert registry._hook_order == ["add_hook", "multiply_hook"]
assert registry_repr == expected_repr
self.assertEqual(len(registry.hooks), 2)
self.assertEqual(registry._hook_order, ["add_hook", "multiply_hook"])
self.assertEqual(registry_repr, expected_repr)
registry.remove_hook("add_hook")
assert len(registry.hooks) == 1
assert registry._hook_order == ["multiply_hook"]
self.assertEqual(len(registry.hooks), 1)
self.assertEqual(registry._hook_order, ["multiply_hook"])
def test_stateful_hook(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model)
registry.register_hook(StatefulAddHook(1), "stateful_add_hook")
assert registry.hooks["stateful_add_hook"].increment == 0
self.assertEqual(registry.hooks["stateful_add_hook"].increment, 0)
input = torch.randn(1, 4, device=torch_device, generator=self.get_generator())
num_repeats = 3
@@ -192,13 +194,13 @@ class TestHooks:
if i == 0:
output1 = result
assert registry.get_hook("stateful_add_hook").increment == num_repeats
self.assertEqual(registry.get_hook("stateful_add_hook").increment, num_repeats)
registry.reset_stateful_hooks()
output2 = self.model(input)
assert registry.get_hook("stateful_add_hook").increment == 1
assert torch.allclose(output1, output2)
self.assertEqual(registry.get_hook("stateful_add_hook").increment, 1)
self.assertTrue(torch.allclose(output1, output2))
def test_inference(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model)
@@ -216,9 +218,9 @@ class TestHooks:
new_input = input * 2 + 1
output3 = self.model(new_input).mean().detach().cpu().item()
assert output1 == pytest.approx(output2, abs=5e-6)
assert output1 == pytest.approx(output3, abs=5e-6)
assert output2 == pytest.approx(output3, abs=5e-6)
self.assertAlmostEqual(output1, output2, places=5)
self.assertAlmostEqual(output1, output3, places=5)
self.assertAlmostEqual(output2, output3, places=5)
def test_skip_layer_hook(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model)
@@ -226,29 +228,30 @@ class TestHooks:
input = torch.zeros(1, 4, device=torch_device)
output = self.model(input).mean().detach().cpu().item()
assert output == 0.0
self.assertEqual(output, 0.0)
registry.remove_hook("skip_layer_hook")
registry.register_hook(SkipLayerHook(skip_layer=False), "skip_layer_hook")
output = self.model(input).mean().detach().cpu().item()
assert output != 0.0
self.assertNotEqual(output, 0.0)
def test_skip_layer_internal_block(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model.linear_1)
input = torch.zeros(1, 4, device=torch_device)
registry.register_hook(SkipLayerHook(skip_layer=True), "skip_layer_hook")
with pytest.raises(RuntimeError, match="mat1 and mat2 shapes cannot be multiplied"):
with self.assertRaises(RuntimeError) as cm:
self.model(input).mean().detach().cpu().item()
self.assertIn("mat1 and mat2 shapes cannot be multiplied", str(cm.exception))
registry.remove_hook("skip_layer_hook")
output = self.model(input).mean().detach().cpu().item()
assert output != 0.0
self.assertNotEqual(output, 0.0)
registry = HookRegistry.check_if_exists_or_initialize(self.model.blocks[1])
registry.register_hook(SkipLayerHook(skip_layer=True), "skip_layer_hook")
output = self.model(input).mean().detach().cpu().item()
assert output != 0.0
self.assertNotEqual(output, 0.0)
def test_invocation_order_stateful_first(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model)
@@ -275,7 +278,7 @@ class TestHooks:
.replace(" ", "")
.replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)
registry.remove_hook("add_hook")
with CaptureLogger(logger) as cap_logger:
@@ -286,7 +289,7 @@ class TestHooks:
.replace(" ", "")
.replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)
def test_invocation_order_stateful_middle(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model)
@@ -313,7 +316,7 @@ class TestHooks:
.replace(" ", "")
.replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)
registry.remove_hook("add_hook")
with CaptureLogger(logger) as cap_logger:
@@ -324,7 +327,7 @@ class TestHooks:
.replace(" ", "")
.replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)
registry.remove_hook("add_hook_2")
with CaptureLogger(logger) as cap_logger:
@@ -333,7 +336,7 @@ class TestHooks:
expected_invocation_order_log = (
("MultiplyHook pre_forward\nMultiplyHook post_forward\n").replace(" ", "").replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)
def test_invocation_order_stateful_last(self):
registry = HookRegistry.check_if_exists_or_initialize(self.model)
@@ -360,7 +363,7 @@ class TestHooks:
.replace(" ", "")
.replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)
registry.remove_hook("add_hook")
with CaptureLogger(logger) as cap_logger:
@@ -371,4 +374,4 @@ class TestHooks:
.replace(" ", "")
.replace("\n", "")
)
assert output == expected_invocation_order_log
self.assertEqual(output, expected_invocation_order_log)

View File

@@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,16 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import os
import unittest
import pytest
import torch
from diffusers import ZImageTransformer2DModel
from diffusers.utils.torch_utils import randn_tensor
from ...testing_utils import IS_GITHUB_ACTIONS, torch_device
from ..test_modeling_common import ModelTesterMixin, TorchCompileTesterMixin
from ...testing_utils import assert_tensors_close, torch_device
from ..testing_utils import (
BaseModelTesterConfig,
LoraTesterMixin,
MemoryTesterMixin,
ModelTesterMixin,
TorchCompileTesterMixin,
TrainingTesterMixin,
)
# Z-Image requires torch.use_deterministic_algorithms(False) due to complex64 RoPE operations
@@ -36,44 +42,38 @@ if hasattr(torch.backends, "cuda"):
torch.backends.cuda.matmul.allow_tf32 = False
@unittest.skipIf(
IS_GITHUB_ACTIONS,
reason="Skipping test-suite inside the CI because the model has `torch.empty()` inside of it during init and we don't have a clear way to override it in the modeling tests.",
)
class ZImageTransformerTests(ModelTesterMixin, unittest.TestCase):
model_class = ZImageTransformer2DModel
main_input_name = "x"
# We override the items here because the transformer under consideration is small.
model_split_percents = [0.9, 0.9, 0.9]
def _concat_list_output(output):
"""Model output `sample` is a list of tensors. Concatenate them for comparison."""
return torch.cat([t.flatten() for t in output])
def prepare_dummy_input(self, height=16, width=16):
batch_size = 1
num_channels = 16
embedding_dim = 16
sequence_length = 16
hidden_states = [torch.randn((num_channels, 1, height, width)).to(torch_device) for _ in range(batch_size)]
encoder_hidden_states = [
torch.randn((sequence_length, embedding_dim)).to(torch_device) for _ in range(batch_size)
]
timestep = torch.tensor([0.0]).to(torch_device)
return {"x": hidden_states, "cap_feats": encoder_hidden_states, "t": timestep}
class ZImageTransformerTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return ZImageTransformer2DModel
@property
def dummy_input(self):
return self.prepare_dummy_input()
@property
def input_shape(self):
def output_shape(self) -> tuple[int, ...]:
return (4, 32, 32)
@property
def output_shape(self):
def input_shape(self) -> tuple[int, ...]:
return (4, 32, 32)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
@property
def model_split_percents(self) -> list:
return [0.9, 0.9, 0.9]
@property
def main_input_name(self) -> str:
return "x"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self):
return {
"all_patch_size": (2,),
"all_f_patch_size": (1,),
"in_channels": 16,
@@ -89,83 +89,223 @@ class ZImageTransformerTests(ModelTesterMixin, unittest.TestCase):
"axes_dims": [8, 4, 4],
"axes_lens": [256, 32, 32],
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def setUp(self):
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
def get_dummy_inputs(self) -> dict[str, torch.Tensor | list]:
batch_size = 1
num_channels = 16
embedding_dim = 16
sequence_length = 16
height = 16
width = 16
hidden_states = [
randn_tensor((num_channels, 1, height, width), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
encoder_hidden_states = [
randn_tensor((sequence_length, embedding_dim), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
timestep = torch.tensor([0.0]).to(torch_device)
return {"x": hidden_states, "cap_feats": encoder_hidden_states, "t": timestep}
class TestZImageTransformer(ZImageTransformerTesterConfig, ModelTesterMixin):
"""Core model tests for Z-Image Transformer."""
@torch.no_grad()
def test_determinism(self, atol=1e-5, rtol=0):
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
model.eval()
inputs_dict = self.get_dummy_inputs()
first = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
second = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
mask = ~(torch.isnan(first) | torch.isnan(second))
assert_tensors_close(
first[mask], second[mask], atol=atol, rtol=rtol, msg="Model outputs are not deterministic"
)
def test_from_save_pretrained(self, tmp_path, atol=5e-5, rtol=5e-5):
torch.manual_seed(0)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
model.eval()
model.save_pretrained(tmp_path)
new_model = self.model_class.from_pretrained(tmp_path)
new_model.to(torch_device)
for param_name in model.state_dict().keys():
param_1 = model.state_dict()[param_name]
param_2 = new_model.state_dict()[param_name]
assert param_1.shape == param_2.shape
inputs_dict = self.get_dummy_inputs()
image = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
new_image = _concat_list_output(new_model(**inputs_dict, return_dict=False)[0])
assert_tensors_close(image, new_image, atol=atol, rtol=rtol, msg="Models give different forward passes.")
@torch.no_grad()
def test_from_save_pretrained_variant(self, tmp_path, atol=5e-5, rtol=0):
model = self.model_class(**self.get_init_dict())
model.to(torch_device)
model.eval()
model.save_pretrained(tmp_path, variant="fp16")
new_model = self.model_class.from_pretrained(tmp_path, variant="fp16")
with pytest.raises(OSError) as exc_info:
self.model_class.from_pretrained(tmp_path)
assert "Error no file named diffusion_pytorch_model.bin found in directory" in str(exc_info.value)
new_model.to(torch_device)
inputs_dict = self.get_dummy_inputs()
image = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
new_image = _concat_list_output(new_model(**inputs_dict, return_dict=False)[0])
assert_tensors_close(image, new_image, atol=atol, rtol=rtol, msg="Models give different forward passes.")
@pytest.mark.skip("Model output `sample` is a list of tensors, not a single tensor.")
def test_outputs_equivalence(self, atol=1e-5, rtol=0):
pass
def test_sharded_checkpoints_with_parallel_loading(self, tmp_path, atol=1e-5, rtol=0):
from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME, constants
from ..testing_utils.common import calculate_expected_num_shards, compute_module_persistent_sizes
def tearDown(self):
super().tearDown()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
torch.manual_seed(0)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
config = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**config).eval()
model = model.to(torch_device)
def test_gradient_checkpointing_is_applied(self):
expected_set = {"ZImageTransformer2DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
base_output = _concat_list_output(model(**inputs_dict, return_dict=False)[0])
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_training(self):
super().test_training()
model_size = compute_module_persistent_sizes(model)[""]
max_shard_size = int((model_size * 0.75) / (2**10))
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_ema_training(self):
super().test_ema_training()
original_parallel_loading = constants.HF_ENABLE_PARALLEL_LOADING
original_parallel_workers = getattr(constants, "HF_PARALLEL_WORKERS", None)
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_effective_gradient_checkpointing(self):
super().test_effective_gradient_checkpointing()
try:
model.cpu().save_pretrained(tmp_path, max_shard_size=f"{max_shard_size}KB")
assert os.path.exists(os.path.join(tmp_path, SAFE_WEIGHTS_INDEX_NAME))
@unittest.skip(
"Test needs to be revisited. But we need to ensure `x_pad_token` and `cap_pad_token` are cast to the same dtype as the destination tensor before they are assigned to the padding indices."
expected_num_shards = calculate_expected_num_shards(os.path.join(tmp_path, SAFE_WEIGHTS_INDEX_NAME))
actual_num_shards = len([file for file in os.listdir(tmp_path) if file.endswith(".safetensors")])
assert actual_num_shards == expected_num_shards
constants.HF_ENABLE_PARALLEL_LOADING = False
self.model_class.from_pretrained(tmp_path).eval().to(torch_device)
constants.HF_ENABLE_PARALLEL_LOADING = True
constants.DEFAULT_HF_PARALLEL_LOADING_WORKERS = 2
torch.manual_seed(0)
model_parallel = self.model_class.from_pretrained(tmp_path).eval()
model_parallel = model_parallel.to(torch_device)
output_parallel = _concat_list_output(model_parallel(**inputs_dict, return_dict=False)[0])
assert_tensors_close(
base_output, output_parallel, atol=atol, rtol=rtol, msg="Output should match with parallel loading"
)
finally:
constants.HF_ENABLE_PARALLEL_LOADING = original_parallel_loading
if original_parallel_workers is not None:
constants.HF_PARALLEL_WORKERS = original_parallel_workers
class TestZImageTransformerMemory(ZImageTransformerTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Z-Image Transformer."""
@pytest.mark.skip(
"Ensure `x_pad_token` and `cap_pad_token` are cast to the same dtype as the destination tensor before they are assigned to the padding indices."
)
def test_layerwise_casting_training(self):
super().test_layerwise_casting_training()
@unittest.skip("Test is not supported for handling main inputs that are lists.")
def test_outputs_equivalence(self):
super().test_outputs_equivalence()
@unittest.skip("Test will pass if we change to deterministic values instead of empty in the DiT.")
def test_group_offloading(self):
super().test_group_offloading()
@unittest.skip("Test will pass if we change to deterministic values instead of empty in the DiT.")
def test_group_offloading_with_disk(self):
super().test_group_offloading_with_disk()
pass
class ZImageTransformerCompileTests(TorchCompileTesterMixin, unittest.TestCase):
model_class = ZImageTransformer2DModel
different_shapes_for_compilation = [(4, 4), (4, 8), (8, 8)]
class TestZImageTransformerTraining(ZImageTransformerTesterConfig, TrainingTesterMixin):
"""Training tests for Z-Image Transformer."""
def prepare_init_args_and_inputs_for_common(self):
return ZImageTransformerTests().prepare_init_args_and_inputs_for_common()
def test_gradient_checkpointing_is_applied(self):
super().test_gradient_checkpointing_is_applied(expected_set={"ZImageTransformer2DModel"})
def prepare_dummy_input(self, height, width):
return ZImageTransformerTests().prepare_dummy_input(height=height, width=width)
@pytest.mark.skip("Test is not supported for handling main inputs that are lists.")
def test_training(self):
pass
@unittest.skip(
"The repeated block in this model is ZImageTransformerBlock, which is used for noise_refiner, context_refiner, and layers. As a consequence of this, the inputs recorded for the block would vary during compilation and full compilation with fullgraph=True would trigger recompilation at least thrice."
@pytest.mark.skip("Test is not supported for handling main inputs that are lists.")
def test_training_with_ema(self):
pass
@pytest.mark.skip("Test is not supported for handling main inputs that are lists.")
def test_gradient_checkpointing_equivalence(self, loss_tolerance=1e-5, param_grad_tol=5e-5, skip=None):
pass
class TestZImageTransformerLoRA(ZImageTransformerTesterConfig, LoraTesterMixin):
"""LoRA adapter tests for Z-Image Transformer."""
@pytest.mark.skip("Model output `sample` is a list of tensors, not a single tensor.")
def test_save_load_lora_adapter(self, tmp_path, rank=4, lora_alpha=4, use_dora=False, atol=1e-4, rtol=1e-4):
pass
# TODO: Add pretrained_model_name_or_path once a tiny Z-Image model is available on the Hub
# class TestZImageTransformerBitsAndBytes(ZImageTransformerTesterConfig, BitsAndBytesTesterMixin):
# """BitsAndBytes quantization tests for Z-Image Transformer."""
# TODO: Add pretrained_model_name_or_path once a tiny Z-Image model is available on the Hub
# class TestZImageTransformerTorchAo(ZImageTransformerTesterConfig, TorchAoTesterMixin):
# """TorchAo quantization tests for Z-Image Transformer."""
class TestZImageTransformerCompile(ZImageTransformerTesterConfig, TorchCompileTesterMixin):
"""Torch compile tests for Z-Image Transformer."""
@property
def different_shapes_for_compilation(self):
return [(4, 4), (4, 8), (8, 8)]
def get_dummy_inputs(self, height: int = 16, width: int = 16) -> dict[str, torch.Tensor | list]:
batch_size = 1
num_channels = 16
embedding_dim = 16
sequence_length = 16
hidden_states = [
randn_tensor((num_channels, 1, height, width), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
encoder_hidden_states = [
randn_tensor((sequence_length, embedding_dim), generator=self.generator, device=torch_device)
for _ in range(batch_size)
]
timestep = torch.tensor([0.0]).to(torch_device)
return {"x": hidden_states, "cap_feats": encoder_hidden_states, "t": timestep}
@pytest.mark.skip(
"The repeated block in this model is ZImageTransformerBlock, which is used for noise_refiner, context_refiner, and layers. The inputs recorded for the block would vary during compilation and full compilation with fullgraph=True would trigger recompilation at least thrice."
)
def test_torch_compile_recompilation_and_graph_break(self):
super().test_torch_compile_recompilation_and_graph_break()
pass
@unittest.skip("Fullgraph AoT is broken")
def test_compile_works_with_aot(self):
super().test_compile_works_with_aot()
@pytest.mark.skip("Fullgraph AoT is broken")
def test_compile_works_with_aot(self, tmp_path):
pass
@unittest.skip("Fullgraph is broken")
@pytest.mark.skip("Fullgraph is broken")
def test_compile_on_different_shapes(self):
super().test_compile_on_different_shapes()
pass