Compare commits

..

1 Commits

Author SHA1 Message Date
DN6
4f82a6f9a2 update 2026-03-26 09:36:10 +05:30
2 changed files with 218 additions and 215 deletions

View File

@@ -12,60 +12,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import torch
from diffusers import CosmosTransformer3DModel
from diffusers.utils.torch_utils import randn_tensor
from ...testing_utils import enable_full_determinism, torch_device
from ..test_modeling_common import ModelTesterMixin
from ..testing_utils import (
BaseModelTesterConfig,
MemoryTesterMixin,
ModelTesterMixin,
TrainingTesterMixin,
)
enable_full_determinism()
class CosmosTransformer3DModelTests(ModelTesterMixin, unittest.TestCase):
model_class = CosmosTransformer3DModel
main_input_name = "hidden_states"
uses_custom_attn_processor = True
class CosmosTransformerTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return CosmosTransformer3DModel
@property
def dummy_input(self):
batch_size = 1
num_channels = 4
num_frames = 1
height = 16
width = 16
text_embed_dim = 16
sequence_length = 12
fps = 30
def output_shape(self) -> tuple[int, ...]:
return (4, 1, 16, 16)
hidden_states = torch.randn((batch_size, num_channels, num_frames, height, width)).to(torch_device)
timestep = torch.randint(0, 1000, size=(batch_size,)).to(torch_device)
encoder_hidden_states = torch.randn((batch_size, sequence_length, text_embed_dim)).to(torch_device)
attention_mask = torch.ones((batch_size, sequence_length)).to(torch_device)
padding_mask = torch.zeros(batch_size, 1, height, width).to(torch_device)
@property
def input_shape(self) -> tuple[int, ...]:
return (4, 1, 16, 16)
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict[str, int | list | tuple | float | bool | str]:
return {
"hidden_states": hidden_states,
"timestep": timestep,
"encoder_hidden_states": encoder_hidden_states,
"attention_mask": attention_mask,
"fps": fps,
"padding_mask": padding_mask,
}
@property
def input_shape(self):
return (4, 1, 16, 16)
@property
def output_shape(self):
return (4, 1, 16, 16)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"in_channels": 4,
"out_channels": 4,
"num_attention_heads": 2,
@@ -80,57 +66,68 @@ class CosmosTransformer3DModelTests(ModelTesterMixin, unittest.TestCase):
"concat_padding_mask": True,
"extra_pos_embed_type": "learnable",
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def test_gradient_checkpointing_is_applied(self):
expected_set = {"CosmosTransformer3DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
class CosmosTransformer3DModelVideoToWorldTests(ModelTesterMixin, unittest.TestCase):
model_class = CosmosTransformer3DModel
main_input_name = "hidden_states"
uses_custom_attn_processor = True
@property
def dummy_input(self):
batch_size = 1
def get_dummy_inputs(self, batch_size: int = 1) -> dict[str, torch.Tensor]:
num_channels = 4
num_frames = 1
height = 16
width = 16
text_embed_dim = 16
sequence_length = 12
fps = 30
hidden_states = torch.randn((batch_size, num_channels, num_frames, height, width)).to(torch_device)
timestep = torch.randint(0, 1000, size=(batch_size,)).to(torch_device)
encoder_hidden_states = torch.randn((batch_size, sequence_length, text_embed_dim)).to(torch_device)
attention_mask = torch.ones((batch_size, sequence_length)).to(torch_device)
condition_mask = torch.ones(batch_size, 1, num_frames, height, width).to(torch_device)
padding_mask = torch.zeros(batch_size, 1, height, width).to(torch_device)
return {
"hidden_states": hidden_states,
"timestep": timestep,
"encoder_hidden_states": encoder_hidden_states,
"attention_mask": attention_mask,
"fps": fps,
"condition_mask": condition_mask,
"padding_mask": padding_mask,
"hidden_states": randn_tensor(
(batch_size, num_channels, num_frames, height, width), generator=self.generator, device=torch_device
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, text_embed_dim), generator=self.generator, device=torch_device
),
"attention_mask": torch.ones((batch_size, sequence_length)).to(torch_device),
"fps": 30,
"padding_mask": torch.zeros(batch_size, 1, height, width).to(torch_device),
}
class TestCosmosTransformer(CosmosTransformerTesterConfig, ModelTesterMixin):
"""Core model tests for Cosmos Transformer."""
class TestCosmosTransformerMemory(CosmosTransformerTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Cosmos Transformer."""
class TestCosmosTransformerTraining(CosmosTransformerTesterConfig, TrainingTesterMixin):
"""Training tests for Cosmos Transformer."""
def test_gradient_checkpointing_is_applied(self):
expected_set = {"CosmosTransformer3DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
class CosmosTransformerVideoToWorldTesterConfig(BaseModelTesterConfig):
@property
def input_shape(self):
def model_class(self):
return CosmosTransformer3DModel
@property
def output_shape(self) -> tuple[int, ...]:
return (4, 1, 16, 16)
@property
def output_shape(self):
def input_shape(self) -> tuple[int, ...]:
return (4, 1, 16, 16)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict[str, int | list | tuple | float | bool | str]:
return {
"in_channels": 4 + 1,
"out_channels": 4,
"num_attention_heads": 2,
@@ -145,8 +142,40 @@ class CosmosTransformer3DModelVideoToWorldTests(ModelTesterMixin, unittest.TestC
"concat_padding_mask": True,
"extra_pos_embed_type": "learnable",
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def get_dummy_inputs(self, batch_size: int = 1) -> dict[str, torch.Tensor]:
num_channels = 4
num_frames = 1
height = 16
width = 16
text_embed_dim = 16
sequence_length = 12
return {
"hidden_states": randn_tensor(
(batch_size, num_channels, num_frames, height, width), generator=self.generator, device=torch_device
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, text_embed_dim), generator=self.generator, device=torch_device
),
"attention_mask": torch.ones((batch_size, sequence_length)).to(torch_device),
"fps": 30,
"condition_mask": torch.ones(batch_size, 1, num_frames, height, width).to(torch_device),
"padding_mask": torch.zeros(batch_size, 1, height, width).to(torch_device),
}
class TestCosmosTransformerVideoToWorld(CosmosTransformerVideoToWorldTesterConfig, ModelTesterMixin):
"""Core model tests for Cosmos Transformer (Video-to-World)."""
class TestCosmosTransformerVideoToWorldMemory(CosmosTransformerVideoToWorldTesterConfig, MemoryTesterMixin):
"""Memory optimization tests for Cosmos Transformer (Video-to-World)."""
class TestCosmosTransformerVideoToWorldTraining(CosmosTransformerVideoToWorldTesterConfig, TrainingTesterMixin):
"""Training tests for Cosmos Transformer (Video-to-World)."""
def test_gradient_checkpointing_is_applied(self):
expected_set = {"CosmosTransformer3DModel"}

View File

@@ -13,63 +13,58 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import torch
from diffusers import SD3Transformer2DModel
from diffusers.utils.torch_utils import randn_tensor
from diffusers.utils.import_utils import is_xformers_available
from ...testing_utils import enable_full_determinism, torch_device
from ..testing_utils import (
BaseModelTesterConfig,
BitsAndBytesTesterMixin,
ModelTesterMixin,
TorchAoTesterMixin,
TorchCompileTesterMixin,
TrainingTesterMixin,
from ...testing_utils import (
enable_full_determinism,
torch_device,
)
from ..test_modeling_common import ModelTesterMixin
enable_full_determinism()
# ======================== SD3 Transformer ========================
class SD3TransformerTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return SD3Transformer2DModel
class SD3TransformerTests(ModelTesterMixin, unittest.TestCase):
model_class = SD3Transformer2DModel
main_input_name = "hidden_states"
model_split_percents = [0.8, 0.8, 0.9]
@property
def pretrained_model_name_or_path(self):
return "hf-internal-testing/tiny-sd3-pipe"
def dummy_input(self):
batch_size = 2
num_channels = 4
height = width = embedding_dim = 32
pooled_embedding_dim = embedding_dim * 2
sequence_length = 154
@property
def pretrained_model_kwargs(self):
return {"subfolder": "transformer"}
hidden_states = torch.randn((batch_size, num_channels, height, width)).to(torch_device)
encoder_hidden_states = torch.randn((batch_size, sequence_length, embedding_dim)).to(torch_device)
pooled_prompt_embeds = torch.randn((batch_size, pooled_embedding_dim)).to(torch_device)
timestep = torch.randint(0, 1000, size=(batch_size,)).to(torch_device)
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def model_split_percents(self) -> list:
return [0.8, 0.8, 0.9]
@property
def output_shape(self) -> tuple:
return (4, 32, 32)
@property
def input_shape(self) -> tuple:
return (4, 32, 32)
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict:
return {
"hidden_states": hidden_states,
"encoder_hidden_states": encoder_hidden_states,
"pooled_projections": pooled_prompt_embeds,
"timestep": timestep,
}
@property
def input_shape(self):
return (4, 32, 32)
@property
def output_shape(self):
return (4, 32, 32)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"sample_size": 32,
"patch_size": 1,
"in_channels": 4,
@@ -84,79 +79,67 @@ class SD3TransformerTesterConfig(BaseModelTesterConfig):
"dual_attention_layers": (),
"qk_norm": None,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def get_dummy_inputs(self, batch_size: int = 2) -> dict[str, torch.Tensor]:
num_channels = 4
height = width = embedding_dim = 32
pooled_embedding_dim = embedding_dim * 2
sequence_length = 154
@unittest.skipIf(
torch_device != "cuda" or not is_xformers_available(),
reason="XFormers attention is only available with CUDA and `xformers` installed",
)
def test_xformers_enable_works(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
return {
"hidden_states": randn_tensor(
(batch_size, num_channels, height, width), generator=self.generator, device=torch_device
),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, embedding_dim), generator=self.generator, device=torch_device
),
"pooled_projections": randn_tensor(
(batch_size, pooled_embedding_dim), generator=self.generator, device=torch_device
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
}
model.enable_xformers_memory_efficient_attention()
assert model.transformer_blocks[0].attn.processor.__class__.__name__ == "XFormersJointAttnProcessor", (
"xformers is not enabled"
)
class TestSD3Transformer(SD3TransformerTesterConfig, ModelTesterMixin):
pass
@unittest.skip("SD3Transformer2DModel uses a dedicated attention processor. This test doesn't apply")
def test_set_attn_processor_for_determinism(self):
pass
class TestSD3TransformerTraining(SD3TransformerTesterConfig, TrainingTesterMixin):
def test_gradient_checkpointing_is_applied(self):
expected_set = {"SD3Transformer2DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
class TestSD3TransformerCompile(SD3TransformerTesterConfig, TorchCompileTesterMixin):
pass
# ======================== SD3.5 Transformer ========================
class SD35TransformerTesterConfig(BaseModelTesterConfig):
@property
def model_class(self):
return SD3Transformer2DModel
class SD35TransformerTests(ModelTesterMixin, unittest.TestCase):
model_class = SD3Transformer2DModel
main_input_name = "hidden_states"
model_split_percents = [0.8, 0.8, 0.9]
@property
def pretrained_model_name_or_path(self):
return "hf-internal-testing/tiny-sd35-pipe"
def dummy_input(self):
batch_size = 2
num_channels = 4
height = width = embedding_dim = 32
pooled_embedding_dim = embedding_dim * 2
sequence_length = 154
@property
def pretrained_model_kwargs(self):
return {"subfolder": "transformer"}
hidden_states = torch.randn((batch_size, num_channels, height, width)).to(torch_device)
encoder_hidden_states = torch.randn((batch_size, sequence_length, embedding_dim)).to(torch_device)
pooled_prompt_embeds = torch.randn((batch_size, pooled_embedding_dim)).to(torch_device)
timestep = torch.randint(0, 1000, size=(batch_size,)).to(torch_device)
@property
def main_input_name(self) -> str:
return "hidden_states"
@property
def model_split_percents(self) -> list:
return [0.8, 0.8, 0.9]
@property
def output_shape(self) -> tuple:
return (4, 32, 32)
@property
def input_shape(self) -> tuple:
return (4, 32, 32)
@property
def generator(self):
return torch.Generator("cpu").manual_seed(0)
def get_init_dict(self) -> dict:
return {
"hidden_states": hidden_states,
"encoder_hidden_states": encoder_hidden_states,
"pooled_projections": pooled_prompt_embeds,
"timestep": timestep,
}
@property
def input_shape(self):
return (4, 32, 32)
@property
def output_shape(self):
return (4, 32, 32)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"sample_size": 32,
"patch_size": 1,
"in_channels": 4,
@@ -171,56 +154,47 @@ class SD35TransformerTesterConfig(BaseModelTesterConfig):
"dual_attention_layers": (0,),
"qk_norm": "rms_norm",
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def get_dummy_inputs(self, batch_size: int = 2) -> dict[str, torch.Tensor]:
num_channels = 4
height = width = embedding_dim = 32
pooled_embedding_dim = embedding_dim * 2
sequence_length = 154
@unittest.skipIf(
torch_device != "cuda" or not is_xformers_available(),
reason="XFormers attention is only available with CUDA and `xformers` installed",
)
def test_xformers_enable_works(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
return {
"hidden_states": randn_tensor(
(batch_size, num_channels, height, width), generator=self.generator, device=torch_device
),
"encoder_hidden_states": randn_tensor(
(batch_size, sequence_length, embedding_dim), generator=self.generator, device=torch_device
),
"pooled_projections": randn_tensor(
(batch_size, pooled_embedding_dim), generator=self.generator, device=torch_device
),
"timestep": torch.randint(0, 1000, size=(batch_size,), generator=self.generator).to(torch_device),
}
model.enable_xformers_memory_efficient_attention()
assert model.transformer_blocks[0].attn.processor.__class__.__name__ == "XFormersJointAttnProcessor", (
"xformers is not enabled"
)
class TestSD35Transformer(SD35TransformerTesterConfig, ModelTesterMixin):
def test_skip_layers(self):
init_dict = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**init_dict).to(torch_device)
@unittest.skip("SD3Transformer2DModel uses a dedicated attention processor. This test doesn't apply")
def test_set_attn_processor_for_determinism(self):
pass
output_full = model(**inputs_dict).sample
inputs_dict_with_skip = inputs_dict.copy()
inputs_dict_with_skip["skip_layers"] = [0]
output_skip = model(**inputs_dict_with_skip).sample
assert not torch.allclose(output_full, output_skip, atol=1e-5), "Outputs should differ when layers are skipped"
assert output_full.shape == output_skip.shape, "Outputs should have the same shape"
class TestSD35TransformerTraining(SD35TransformerTesterConfig, TrainingTesterMixin):
def test_gradient_checkpointing_is_applied(self):
expected_set = {"SD3Transformer2DModel"}
super().test_gradient_checkpointing_is_applied(expected_set=expected_set)
def test_skip_layers(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict).to(torch_device)
class TestSD35TransformerCompile(SD35TransformerTesterConfig, TorchCompileTesterMixin):
pass
# Forward pass without skipping layers
output_full = model(**inputs_dict).sample
# Forward pass with skipping layers 0 (since there's only one layer in this test setup)
inputs_dict_with_skip = inputs_dict.copy()
inputs_dict_with_skip["skip_layers"] = [0]
output_skip = model(**inputs_dict_with_skip).sample
class TestSD35TransformerBitsAndBytes(SD35TransformerTesterConfig, BitsAndBytesTesterMixin):
"""BitsAndBytes quantization tests for SD3.5 Transformer."""
# Check that the outputs are different
self.assertFalse(
torch.allclose(output_full, output_skip, atol=1e-5), "Outputs should differ when layers are skipped"
)
class TestSD35TransformerTorchAo(SD35TransformerTesterConfig, TorchAoTesterMixin):
"""TorchAO quantization tests for SD3.5 Transformer."""
# Check that the outputs have the same shape
self.assertEqual(output_full.shape, output_skip.shape, "Outputs should have the same shape")