Compare commits

..

5 Commits

Author SHA1 Message Date
Sayak Paul
3b1ccd79a5 Merge branch 'main' into cp-fixes-attn-backends 2025-12-15 20:30:22 +08:00
sayakpaul
0c35ed4708 up 2025-12-12 15:26:43 +05:30
sayakpaul
738f278d93 gracefully error out when attn-backend x cp combo isn't supported. 2025-12-12 15:25:59 +05:30
sayakpaul
23251d6cf6 Revert "gracefully error out when attn-backend x cp combo isn't supported."
This reverts commit c8abb5d7c0.
2025-12-12 15:24:09 +05:30
sayakpaul
c8abb5d7c0 gracefully error out when attn-backend x cp combo isn't supported. 2025-12-12 15:20:18 +05:30
5 changed files with 70 additions and 308 deletions

View File

@@ -77,47 +77,63 @@ jobs:
run_fast_tests: run_fast_tests:
needs: [check_code_quality, check_repository_consistency] needs: [check_code_quality, check_repository_consistency]
name: Fast PyTorch Modular Pipeline CPU tests strategy:
fail-fast: false
matrix:
config:
- name: Fast PyTorch Modular Pipeline CPU tests
framework: pytorch_pipelines
runner: aws-highmemory-32-plus
image: diffusers/diffusers-pytorch-cpu
report: torch_cpu_modular_pipelines
name: ${{ matrix.config.name }}
runs-on: runs-on:
group: aws-highmemory-32-plus group: ${{ matrix.config.runner }}
container: container:
image: diffusers/diffusers-pytorch-cpu image: ${{ matrix.config.image }}
options: --shm-size "16gb" --ipc host -v /mnt/hf_cache:/mnt/cache/ options: --shm-size "16gb" --ipc host -v /mnt/hf_cache:/mnt/cache/
defaults: defaults:
run: run:
shell: bash shell: bash
steps: steps:
- name: Checkout diffusers - name: Checkout diffusers
uses: actions/checkout@v3 uses: actions/checkout@v3
with: with:
fetch-depth: 2 fetch-depth: 2
- name: Install dependencies - name: Install dependencies
run: | run: |
uv pip install -e ".[quality]" uv pip install -e ".[quality]"
#uv pip uninstall transformers huggingface_hub && uv pip install --prerelease allow -U transformers@git+https://github.com/huggingface/transformers.git #uv pip uninstall transformers huggingface_hub && uv pip install --prerelease allow -U transformers@git+https://github.com/huggingface/transformers.git
uv pip uninstall transformers huggingface_hub && uv pip install transformers==4.57.1 uv pip uninstall transformers huggingface_hub && uv pip install transformers==4.57.1
uv pip uninstall accelerate && uv pip install -U accelerate@git+https://github.com/huggingface/accelerate.git --no-deps uv pip uninstall accelerate && uv pip install -U accelerate@git+https://github.com/huggingface/accelerate.git --no-deps
- name: Environment - name: Environment
run: | run: |
python utils/print_env.py python utils/print_env.py
- name: Run fast PyTorch Pipeline CPU tests - name: Run fast PyTorch Pipeline CPU tests
run: | if: ${{ matrix.config.framework == 'pytorch_pipelines' }}
pytest -n 8 --max-worker-restart=0 --dist=loadfile \ run: |
-s -v \ pytest -n 8 --max-worker-restart=0 --dist=loadfile \
--make-reports=tests_torch_cpu_modular_pipelines \ -k "not Flax and not Onnx" \
tests/modular_pipelines --make-reports=tests_${{ matrix.config.report }} \
tests/modular_pipelines
- name: Failure short reports
if: ${{ failure() }}
run: cat reports/tests_${{ matrix.config.report }}_failures_short.txt
- name: Test suite reports artifacts
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: pr_${{ matrix.config.framework }}_${{ matrix.config.report }}_test_reports
path: reports
- name: Failure short reports
if: ${{ failure() }}
run: cat reports/tests_torch_cpu_modular_pipelines_failures_short.txt
- name: Test suite reports artifacts
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: pr_pytorch_pipelines_torch_cpu_modular_pipelines_test_reports
path: reports

View File

@@ -235,6 +235,10 @@ class _AttentionBackendRegistry:
def get_active_backend(cls): def get_active_backend(cls):
return cls._active_backend, cls._backends[cls._active_backend] return cls._active_backend, cls._backends[cls._active_backend]
@classmethod
def set_active_backend(cls, backend: str):
cls._active_backend = backend
@classmethod @classmethod
def list_backends(cls): def list_backends(cls):
return list(cls._backends.keys()) return list(cls._backends.keys())
@@ -294,12 +298,12 @@ def attention_backend(backend: Union[str, AttentionBackendName] = AttentionBacke
_maybe_download_kernel_for_backend(backend) _maybe_download_kernel_for_backend(backend)
old_backend = _AttentionBackendRegistry._active_backend old_backend = _AttentionBackendRegistry._active_backend
_AttentionBackendRegistry._active_backend = backend _AttentionBackendRegistry.set_active_backend(backend)
try: try:
yield yield
finally: finally:
_AttentionBackendRegistry._active_backend = old_backend _AttentionBackendRegistry.set_active_backend(old_backend)
def dispatch_attention_fn( def dispatch_attention_fn(
@@ -348,6 +352,18 @@ def dispatch_attention_fn(
check(**kwargs) check(**kwargs)
kwargs = {k: v for k, v in kwargs.items() if k in _AttentionBackendRegistry._supported_arg_names[backend_name]} kwargs = {k: v for k, v in kwargs.items() if k in _AttentionBackendRegistry._supported_arg_names[backend_name]}
if "_parallel_config" in kwargs and kwargs["_parallel_config"] is not None:
attention_backend = AttentionBackendName(backend_name)
if not _AttentionBackendRegistry._is_context_parallel_available(attention_backend):
compatible_backends = sorted(_AttentionBackendRegistry._supports_context_parallel)
raise ValueError(
f"Context parallelism is enabled but backend '{attention_backend.value}' "
f"which does not support context parallelism. "
f"Please set a compatible attention backend: {compatible_backends} using `model.set_attention_backend()` before "
f"calling `model.enable_parallelism()`."
)
return backend_fn(**kwargs) return backend_fn(**kwargs)

View File

@@ -602,6 +602,7 @@ class ModelMixin(torch.nn.Module, PushToHubMixin):
from .attention import AttentionModuleMixin from .attention import AttentionModuleMixin
from .attention_dispatch import ( from .attention_dispatch import (
AttentionBackendName, AttentionBackendName,
_AttentionBackendRegistry,
_check_attention_backend_requirements, _check_attention_backend_requirements,
_maybe_download_kernel_for_backend, _maybe_download_kernel_for_backend,
) )
@@ -629,6 +630,9 @@ class ModelMixin(torch.nn.Module, PushToHubMixin):
continue continue
processor._attention_backend = backend processor._attention_backend = backend
# Important to set the active backend so that it propagates gracefully throughout.
_AttentionBackendRegistry.set_active_backend(backend)
def reset_attention_backend(self) -> None: def reset_attention_backend(self) -> None:
""" """
Resets the attention backend for the model. Following calls to `forward` will use the environment default, if Resets the attention backend for the model. Following calls to `forward` will use the environment default, if
@@ -1541,7 +1545,7 @@ class ModelMixin(torch.nn.Module, PushToHubMixin):
f"Context parallelism is enabled but the attention processor '{processor.__class__.__name__}' " f"Context parallelism is enabled but the attention processor '{processor.__class__.__name__}' "
f"is using backend '{attention_backend.value}' which does not support context parallelism. " f"is using backend '{attention_backend.value}' which does not support context parallelism. "
f"Please set a compatible attention backend: {compatible_backends} using `model.set_attention_backend()` before " f"Please set a compatible attention backend: {compatible_backends} using `model.set_attention_backend()` before "
f"calling `enable_parallelism()`." f"calling `model.enable_parallelism()`."
) )
# All modules use the same attention processor and backend. We don't need to # All modules use the same attention processor and backend. We don't need to

View File

@@ -32,8 +32,6 @@ warnings.simplefilter(action="ignore", category=FutureWarning)
def pytest_configure(config): def pytest_configure(config):
config.addinivalue_line("markers", "big_accelerator: marks tests as requiring big accelerator resources") config.addinivalue_line("markers", "big_accelerator: marks tests as requiring big accelerator resources")
config.addinivalue_line("markers", "slow: mark test as slow")
config.addinivalue_line("markers", "nightly: mark test as nightly")
def pytest_addoption(parser): def pytest_addoption(parser):

View File

@@ -1,272 +0,0 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import tempfile
from collections import deque
from typing import List
import numpy as np
import torch
from diffusers import FluxTransformer2DModel
from diffusers.modular_pipelines import (
ComponentSpec,
InputParam,
ModularPipelineBlocks,
OutputParam,
PipelineState,
WanModularPipeline,
)
from ..testing_utils import nightly, require_torch, slow
class DummyCustomBlockSimple(ModularPipelineBlocks):
def __init__(self, use_dummy_model_component=False):
self.use_dummy_model_component = use_dummy_model_component
super().__init__()
@property
def expected_components(self):
if self.use_dummy_model_component:
return [ComponentSpec("transformer", FluxTransformer2DModel)]
else:
return []
@property
def inputs(self) -> List[InputParam]:
return [InputParam("prompt", type_hint=str, required=True, description="Prompt to use")]
@property
def intermediate_inputs(self) -> List[InputParam]:
return []
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
"output_prompt",
type_hint=str,
description="Modified prompt",
)
]
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
old_prompt = block_state.prompt
block_state.output_prompt = "Modular diffusers + " + old_prompt
self.set_block_state(state, block_state)
return components, state
CODE_STR = """
from diffusers.modular_pipelines import (
ComponentSpec,
InputParam,
ModularPipelineBlocks,
OutputParam,
PipelineState,
WanModularPipeline,
)
from typing import List
class DummyCustomBlockSimple(ModularPipelineBlocks):
def __init__(self, use_dummy_model_component=False):
self.use_dummy_model_component = use_dummy_model_component
super().__init__()
@property
def expected_components(self):
if self.use_dummy_model_component:
return [ComponentSpec("transformer", FluxTransformer2DModel)]
else:
return []
@property
def inputs(self) -> List[InputParam]:
return [InputParam("prompt", type_hint=str, required=True, description="Prompt to use")]
@property
def intermediate_inputs(self) -> List[InputParam]:
return []
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
"output_prompt",
type_hint=str,
description="Modified prompt",
)
]
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
old_prompt = block_state.prompt
block_state.output_prompt = "Modular diffusers + " + old_prompt
self.set_block_state(state, block_state)
return components, state
"""
class TestModularCustomBlocks:
def _test_block_properties(self, block):
assert not block.expected_components
assert not block.intermediate_inputs
actual_inputs = [inp.name for inp in block.inputs]
actual_intermediate_outputs = [out.name for out in block.intermediate_outputs]
assert actual_inputs == ["prompt"]
assert actual_intermediate_outputs == ["output_prompt"]
def test_custom_block_properties(self):
custom_block = DummyCustomBlockSimple()
self._test_block_properties(custom_block)
def test_custom_block_output(self):
custom_block = DummyCustomBlockSimple()
pipe = custom_block.init_pipeline()
prompt = "Diffusers is nice"
output = pipe(prompt=prompt)
actual_inputs = [inp.name for inp in custom_block.inputs]
actual_intermediate_outputs = [out.name for out in custom_block.intermediate_outputs]
assert sorted(output.values) == sorted(actual_inputs + actual_intermediate_outputs)
output_prompt = output.values["output_prompt"]
assert output_prompt.startswith("Modular diffusers + ")
def test_custom_block_saving_loading(self):
custom_block = DummyCustomBlockSimple()
with tempfile.TemporaryDirectory() as tmpdir:
custom_block.save_pretrained(tmpdir)
assert any("modular_config.json" in k for k in os.listdir(tmpdir))
with open(os.path.join(tmpdir, "modular_config.json"), "r") as f:
config = json.load(f)
auto_map = config["auto_map"]
assert auto_map == {"ModularPipelineBlocks": "test_modular_pipelines_custom_blocks.DummyCustomBlockSimple"}
# For now, the Python script that implements the custom block has to be manually pushed to the Hub.
# This is why, we have to separately save the Python script here.
code_path = os.path.join(tmpdir, "test_modular_pipelines_custom_blocks.py")
with open(code_path, "w") as f:
f.write(CODE_STR)
loaded_custom_block = ModularPipelineBlocks.from_pretrained(tmpdir, trust_remote_code=True)
pipe = loaded_custom_block.init_pipeline()
prompt = "Diffusers is nice"
output = pipe(prompt=prompt)
actual_inputs = [inp.name for inp in loaded_custom_block.inputs]
actual_intermediate_outputs = [out.name for out in loaded_custom_block.intermediate_outputs]
assert sorted(output.values) == sorted(actual_inputs + actual_intermediate_outputs)
output_prompt = output.values["output_prompt"]
assert output_prompt.startswith("Modular diffusers + ")
def test_custom_block_supported_components(self):
custom_block = DummyCustomBlockSimple(use_dummy_model_component=True)
pipe = custom_block.init_pipeline("hf-internal-testing/tiny-flux-kontext-pipe")
pipe.load_components()
assert len(pipe.components) == 1
assert pipe.component_names[0] == "transformer"
def test_custom_block_loads_from_hub(self):
repo_id = "hf-internal-testing/tiny-modular-diffusers-block"
block = ModularPipelineBlocks.from_pretrained(repo_id, trust_remote_code=True)
self._test_block_properties(block)
pipe = block.init_pipeline()
prompt = "Diffusers is nice"
output = pipe(prompt=prompt)
output_prompt = output.values["output_prompt"]
assert output_prompt.startswith("Modular diffusers + ")
@slow
@nightly
@require_torch
class TestKreaCustomBlocksIntegration:
repo_id = "krea/krea-realtime-video"
def test_loading_from_hub(self):
blocks = ModularPipelineBlocks.from_pretrained(self.repo_id, trust_remote_code=True)
block_names = sorted(blocks.sub_blocks)
assert block_names == sorted(["text_encoder", "before_denoise", "denoise", "decode"])
pipe = WanModularPipeline(blocks, self.repo_id)
pipe.load_components(
trust_remote_code=True,
device_map="cuda",
torch_dtype={"default": torch.bfloat16, "vae": torch.float16},
)
assert len(pipe.components) == 7
assert sorted(pipe.components) == sorted(
["text_encoder", "tokenizer", "guider", "scheduler", "vae", "transformer", "video_processor"]
)
def test_forward(self):
blocks = ModularPipelineBlocks.from_pretrained(self.repo_id, trust_remote_code=True)
pipe = WanModularPipeline(blocks, self.repo_id)
pipe.load_components(
trust_remote_code=True,
device_map="cuda",
torch_dtype={"default": torch.bfloat16, "vae": torch.float16},
)
num_frames_per_block = 2
num_blocks = 2
state = PipelineState()
state.set("frame_cache_context", deque(maxlen=pipe.config.frame_cache_len))
prompt = ["a cat sitting on a boat"]
for block in pipe.transformer.blocks:
block.self_attn.fuse_projections()
for block_idx in range(num_blocks):
state = pipe(
state,
prompt=prompt,
num_inference_steps=2,
num_blocks=num_blocks,
num_frames_per_block=num_frames_per_block,
block_idx=block_idx,
generator=torch.manual_seed(42),
)
current_frames = np.array(state.values["videos"][0])
current_frames_flat = current_frames.flatten()
actual_slices = np.concatenate([current_frames_flat[:4], current_frames_flat[-4:]]).tolist()
if block_idx == 0:
assert current_frames.shape == (5, 480, 832, 3)
expected_slices = np.array([211, 229, 238, 208, 195, 180, 188, 193])
else:
assert current_frames.shape == (8, 480, 832, 3)
expected_slices = np.array([179, 203, 214, 176, 194, 181, 187, 191])
assert np.allclose(actual_slices, expected_slices)