Compare commits

..

7 Commits

Author SHA1 Message Date
Sayak Paul
1e6578bbe3 Merge branch 'main' into refactor-caching-tests 2026-03-10 09:25:51 +05:30
Dhruv Nair
07a63e197e [CI] Potential fix for code scanning alert no. 2150: Workflow does not contain permissions (#13230)
Potential fix for code scanning alert no. 2150: Workflow does not contain permissions

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: Sayak Paul <spsayakpaul@gmail.com>
2026-03-10 09:07:16 +05:30
Sayak Paul
81aa43271b Merge branch 'main' into refactor-caching-tests 2026-03-10 08:57:11 +05:30
sayakpaul
9239908f5d include taylorseer in the caching mixin. 2026-03-10 08:56:42 +05:30
YiYi Xu
068c6ef6c1 [modular] helios (#13216)
* add helios modular

* upup

* revert change in guider

* up

* fix for real

* fix batch test

* Apply suggestion from @yiyixuxu

---------

Co-authored-by: yiyi@huggingface.co <yiyi@ip-26-0-163-127.ec2.internal>
2026-03-09 10:37:56 -10:00
DefTruth
94bcb8941e fix: allow pass cpu generator for helios (#13228)
* allow pass cpu generator for helios

* allow pass cpu generator for helios

* allow pass cpu generator for helios

* patch
2026-03-09 10:30:56 -10:00
sayakpaul
9cd3e6ba88 refactor magcache tests. 2026-03-09 19:26:42 +05:30
22 changed files with 4734 additions and 301 deletions

View File

@@ -1,5 +1,8 @@
name: Fast GPU Tests on PR
permissions:
contents: read
on:
pull_request:
branches: main

View File

@@ -434,6 +434,12 @@ else:
"FluxKontextAutoBlocks",
"FluxKontextModularPipeline",
"FluxModularPipeline",
"HeliosAutoBlocks",
"HeliosModularPipeline",
"HeliosPyramidAutoBlocks",
"HeliosPyramidDistilledAutoBlocks",
"HeliosPyramidDistilledModularPipeline",
"HeliosPyramidModularPipeline",
"QwenImageAutoBlocks",
"QwenImageEditAutoBlocks",
"QwenImageEditModularPipeline",
@@ -1188,6 +1194,12 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
FluxKontextAutoBlocks,
FluxKontextModularPipeline,
FluxModularPipeline,
HeliosAutoBlocks,
HeliosModularPipeline,
HeliosPyramidAutoBlocks,
HeliosPyramidDistilledAutoBlocks,
HeliosPyramidDistilledModularPipeline,
HeliosPyramidModularPipeline,
QwenImageAutoBlocks,
QwenImageEditAutoBlocks,
QwenImageEditModularPipeline,

View File

@@ -56,6 +56,14 @@ else:
"WanImage2VideoModularPipeline",
"Wan22Image2VideoModularPipeline",
]
_import_structure["helios"] = [
"HeliosAutoBlocks",
"HeliosModularPipeline",
"HeliosPyramidAutoBlocks",
"HeliosPyramidDistilledAutoBlocks",
"HeliosPyramidDistilledModularPipeline",
"HeliosPyramidModularPipeline",
]
_import_structure["flux"] = [
"FluxAutoBlocks",
"FluxModularPipeline",
@@ -103,6 +111,14 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
Flux2KleinModularPipeline,
Flux2ModularPipeline,
)
from .helios import (
HeliosAutoBlocks,
HeliosModularPipeline,
HeliosPyramidAutoBlocks,
HeliosPyramidDistilledAutoBlocks,
HeliosPyramidDistilledModularPipeline,
HeliosPyramidModularPipeline,
)
from .modular_pipeline import (
AutoPipelineBlocks,
BlockState,

View File

@@ -0,0 +1,59 @@
from typing import TYPE_CHECKING
from ...utils import (
DIFFUSERS_SLOW_IMPORT,
OptionalDependencyNotAvailable,
_LazyModule,
get_objects_from_module,
is_torch_available,
is_transformers_available,
)
_dummy_objects = {}
_import_structure = {}
try:
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
from ...utils import dummy_torch_and_transformers_objects # noqa F403
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
_import_structure["modular_blocks_helios"] = ["HeliosAutoBlocks"]
_import_structure["modular_blocks_helios_pyramid"] = ["HeliosPyramidAutoBlocks"]
_import_structure["modular_blocks_helios_pyramid_distilled"] = ["HeliosPyramidDistilledAutoBlocks"]
_import_structure["modular_pipeline"] = [
"HeliosModularPipeline",
"HeliosPyramidDistilledModularPipeline",
"HeliosPyramidModularPipeline",
]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
from .modular_blocks_helios import HeliosAutoBlocks
from .modular_blocks_helios_pyramid import HeliosPyramidAutoBlocks
from .modular_blocks_helios_pyramid_distilled import HeliosPyramidDistilledAutoBlocks
from .modular_pipeline import (
HeliosModularPipeline,
HeliosPyramidDistilledModularPipeline,
HeliosPyramidModularPipeline,
)
else:
import sys
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)

View File

@@ -0,0 +1,836 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import torch
from ...models import HeliosTransformer3DModel
from ...schedulers import HeliosScheduler
from ...utils import logging
from ...utils.torch_utils import randn_tensor
from ..modular_pipeline import ModularPipelineBlocks, PipelineState
from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam
from .modular_pipeline import HeliosModularPipeline
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# Copied from diffusers.pipelines.flux.pipeline_flux.calculate_shift
def calculate_shift(
image_seq_len,
base_seq_len: int = 256,
max_seq_len: int = 4096,
base_shift: float = 0.5,
max_shift: float = 1.15,
):
m = (max_shift - base_shift) / (max_seq_len - base_seq_len)
b = base_shift - m * base_seq_len
mu = image_seq_len * m + b
return mu
class HeliosTextInputStep(ModularPipelineBlocks):
model_name = "helios"
@property
def description(self) -> str:
return (
"Input processing step that:\n"
" 1. Determines `batch_size` and `dtype` based on `prompt_embeds`\n"
" 2. Adjusts input tensor shapes based on `batch_size` (number of prompts) and `num_videos_per_prompt`\n\n"
"All input tensors are expected to have either batch_size=1 or match the batch_size\n"
"of prompt_embeds. The tensors will be duplicated across the batch dimension to\n"
"have a final batch_size of batch_size * num_videos_per_prompt."
)
@property
def inputs(self) -> list[InputParam]:
return [
InputParam(
"num_videos_per_prompt",
default=1,
type_hint=int,
description="Number of videos to generate per prompt.",
),
InputParam.template("prompt_embeds"),
InputParam.template("negative_prompt_embeds"),
]
@property
def intermediate_outputs(self) -> list[str]:
return [
OutputParam(
"batch_size",
type_hint=int,
description="Number of prompts, the final batch size of model inputs should be batch_size * num_videos_per_prompt",
),
OutputParam(
"dtype",
type_hint=torch.dtype,
description="Data type of model tensor inputs (determined by `prompt_embeds.dtype`)",
),
]
def check_inputs(self, components, block_state):
if block_state.prompt_embeds is not None and block_state.negative_prompt_embeds is not None:
if block_state.prompt_embeds.shape != block_state.negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {block_state.prompt_embeds.shape} != `negative_prompt_embeds`"
f" {block_state.negative_prompt_embeds.shape}."
)
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
self.check_inputs(components, block_state)
block_state.batch_size = block_state.prompt_embeds.shape[0]
block_state.dtype = block_state.prompt_embeds.dtype
_, seq_len, _ = block_state.prompt_embeds.shape
block_state.prompt_embeds = block_state.prompt_embeds.repeat(1, block_state.num_videos_per_prompt, 1)
block_state.prompt_embeds = block_state.prompt_embeds.view(
block_state.batch_size * block_state.num_videos_per_prompt, seq_len, -1
)
if block_state.negative_prompt_embeds is not None:
_, seq_len, _ = block_state.negative_prompt_embeds.shape
block_state.negative_prompt_embeds = block_state.negative_prompt_embeds.repeat(
1, block_state.num_videos_per_prompt, 1
)
block_state.negative_prompt_embeds = block_state.negative_prompt_embeds.view(
block_state.batch_size * block_state.num_videos_per_prompt, seq_len, -1
)
self.set_block_state(state, block_state)
return components, state
# Copied from diffusers.modular_pipelines.wan.before_denoise.repeat_tensor_to_batch_size
def repeat_tensor_to_batch_size(
input_name: str,
input_tensor: torch.Tensor,
batch_size: int,
num_videos_per_prompt: int = 1,
) -> torch.Tensor:
"""Repeat tensor elements to match the final batch size.
This function expands a tensor's batch dimension to match the final batch size (batch_size * num_videos_per_prompt)
by repeating each element along dimension 0.
The input tensor must have batch size 1 or batch_size. The function will:
- If batch size is 1: repeat each element (batch_size * num_videos_per_prompt) times
- If batch size equals batch_size: repeat each element num_videos_per_prompt times
Args:
input_name (str): Name of the input tensor (used for error messages)
input_tensor (torch.Tensor): The tensor to repeat. Must have batch size 1 or batch_size.
batch_size (int): The base batch size (number of prompts)
num_videos_per_prompt (int, optional): Number of videos to generate per prompt. Defaults to 1.
Returns:
torch.Tensor: The repeated tensor with final batch size (batch_size * num_videos_per_prompt)
Raises:
ValueError: If input_tensor is not a torch.Tensor or has invalid batch size
Examples:
tensor = torch.tensor([[1, 2, 3]]) # shape: [1, 3] repeated = repeat_tensor_to_batch_size("image", tensor,
batch_size=2, num_videos_per_prompt=2) repeated # tensor([[1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3]]) - shape:
[4, 3]
tensor = torch.tensor([[1, 2, 3], [4, 5, 6]]) # shape: [2, 3] repeated = repeat_tensor_to_batch_size("image",
tensor, batch_size=2, num_videos_per_prompt=2) repeated # tensor([[1, 2, 3], [1, 2, 3], [4, 5, 6], [4, 5, 6]])
- shape: [4, 3]
"""
# make sure input is a tensor
if not isinstance(input_tensor, torch.Tensor):
raise ValueError(f"`{input_name}` must be a tensor")
# make sure input tensor e.g. image_latents has batch size 1 or batch_size same as prompts
if input_tensor.shape[0] == 1:
repeat_by = batch_size * num_videos_per_prompt
elif input_tensor.shape[0] == batch_size:
repeat_by = num_videos_per_prompt
else:
raise ValueError(
f"`{input_name}` must have have batch size 1 or {batch_size}, but got {input_tensor.shape[0]}"
)
# expand the tensor to match the batch_size * num_videos_per_prompt
input_tensor = input_tensor.repeat_interleave(repeat_by, dim=0)
return input_tensor
# Copied from diffusers.modular_pipelines.wan.before_denoise.calculate_dimension_from_latents
def calculate_dimension_from_latents(
latents: torch.Tensor, vae_scale_factor_temporal: int, vae_scale_factor_spatial: int
) -> tuple[int, int]:
"""Calculate image dimensions from latent tensor dimensions.
This function converts latent temporal and spatial dimensions to image temporal and spatial dimensions by
multiplying the latent num_frames/height/width by the VAE scale factor.
Args:
latents (torch.Tensor): The latent tensor. Must have 4 or 5 dimensions.
Expected shapes: [batch, channels, height, width] or [batch, channels, frames, height, width]
vae_scale_factor_temporal (int): The scale factor used by the VAE to compress temporal dimension.
Typically 4 for most VAEs (video is 4x larger than latents in temporal dimension)
vae_scale_factor_spatial (int): The scale factor used by the VAE to compress spatial dimension.
Typically 8 for most VAEs (image is 8x larger than latents in each dimension)
Returns:
tuple[int, int]: The calculated image dimensions as (height, width)
Raises:
ValueError: If latents tensor doesn't have 4 or 5 dimensions
"""
if latents.ndim != 5:
raise ValueError(f"latents must have 5 dimensions, but got {latents.ndim}")
_, _, num_latent_frames, latent_height, latent_width = latents.shape
num_frames = (num_latent_frames - 1) * vae_scale_factor_temporal + 1
height = latent_height * vae_scale_factor_spatial
width = latent_width * vae_scale_factor_spatial
return num_frames, height, width
class HeliosAdditionalInputsStep(ModularPipelineBlocks):
"""Configurable step that standardizes inputs for the denoising step.
This step handles:
1. For encoded image latents: Computes height/width from latents and expands batch size
2. For additional_batch_inputs: Expands batch dimensions to match final batch size
"""
model_name = "helios"
def __init__(
self,
image_latent_inputs: list[InputParam] | None = None,
additional_batch_inputs: list[InputParam] | None = None,
):
if image_latent_inputs is None:
image_latent_inputs = [InputParam.template("image_latents")]
if additional_batch_inputs is None:
additional_batch_inputs = []
if not isinstance(image_latent_inputs, list):
raise ValueError(f"image_latent_inputs must be a list, but got {type(image_latent_inputs)}")
else:
for input_param in image_latent_inputs:
if not isinstance(input_param, InputParam):
raise ValueError(f"image_latent_inputs must be a list of InputParam, but got {type(input_param)}")
if not isinstance(additional_batch_inputs, list):
raise ValueError(f"additional_batch_inputs must be a list, but got {type(additional_batch_inputs)}")
else:
for input_param in additional_batch_inputs:
if not isinstance(input_param, InputParam):
raise ValueError(
f"additional_batch_inputs must be a list of InputParam, but got {type(input_param)}"
)
self._image_latent_inputs = image_latent_inputs
self._additional_batch_inputs = additional_batch_inputs
super().__init__()
@property
def description(self) -> str:
summary_section = (
"Input processing step that:\n"
" 1. For image latent inputs: Computes height/width from latents and expands batch size\n"
" 2. For additional batch inputs: Expands batch dimensions to match final batch size"
)
inputs_info = ""
if self._image_latent_inputs or self._additional_batch_inputs:
inputs_info = "\n\nConfigured inputs:"
if self._image_latent_inputs:
inputs_info += f"\n - Image latent inputs: {[p.name for p in self._image_latent_inputs]}"
if self._additional_batch_inputs:
inputs_info += f"\n - Additional batch inputs: {[p.name for p in self._additional_batch_inputs]}"
placement_section = "\n\nThis block should be placed after the encoder steps and the text input step."
return summary_section + inputs_info + placement_section
@property
def inputs(self) -> list[InputParam]:
inputs = [
InputParam(name="num_videos_per_prompt", default=1),
InputParam(name="batch_size", required=True),
]
inputs += self._image_latent_inputs + self._additional_batch_inputs
return inputs
@property
def intermediate_outputs(self) -> list[OutputParam]:
outputs = [
OutputParam("height", type_hint=int),
OutputParam("width", type_hint=int),
]
for input_param in self._image_latent_inputs:
outputs.append(OutputParam(input_param.name, type_hint=torch.Tensor))
for input_param in self._additional_batch_inputs:
outputs.append(OutputParam(input_param.name, type_hint=torch.Tensor))
return outputs
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
for input_param in self._image_latent_inputs:
image_latent_tensor = getattr(block_state, input_param.name)
if image_latent_tensor is None:
continue
# Calculate height/width from latents
_, height, width = calculate_dimension_from_latents(
image_latent_tensor, components.vae_scale_factor_temporal, components.vae_scale_factor_spatial
)
block_state.height = height
block_state.width = width
# Expand batch size
image_latent_tensor = repeat_tensor_to_batch_size(
input_name=input_param.name,
input_tensor=image_latent_tensor,
num_videos_per_prompt=block_state.num_videos_per_prompt,
batch_size=block_state.batch_size,
)
setattr(block_state, input_param.name, image_latent_tensor)
for input_param in self._additional_batch_inputs:
input_tensor = getattr(block_state, input_param.name)
if input_tensor is None:
continue
input_tensor = repeat_tensor_to_batch_size(
input_name=input_param.name,
input_tensor=input_tensor,
num_videos_per_prompt=block_state.num_videos_per_prompt,
batch_size=block_state.batch_size,
)
setattr(block_state, input_param.name, input_tensor)
self.set_block_state(state, block_state)
return components, state
class HeliosAddNoiseToImageLatentsStep(ModularPipelineBlocks):
"""Adds noise to image_latents and fake_image_latents for I2V conditioning.
Applies single-sigma noise to image_latents (using image_noise_sigma range) and single-sigma noise to
fake_image_latents (using video_noise_sigma range).
"""
model_name = "helios"
@property
def description(self) -> str:
return (
"Adds noise to image_latents and fake_image_latents for I2V conditioning. "
"Uses random sigma from configured ranges for each."
)
@property
def inputs(self) -> list[InputParam]:
return [
InputParam.template("image_latents"),
InputParam(
"fake_image_latents",
required=True,
type_hint=torch.Tensor,
description="Fake image latents used as history seed for I2V generation.",
),
InputParam(
"image_noise_sigma_min",
default=0.111,
type_hint=float,
description="Minimum sigma for image latent noise.",
),
InputParam(
"image_noise_sigma_max",
default=0.135,
type_hint=float,
description="Maximum sigma for image latent noise.",
),
InputParam(
"video_noise_sigma_min",
default=0.111,
type_hint=float,
description="Minimum sigma for video/fake-image latent noise.",
),
InputParam(
"video_noise_sigma_max",
default=0.135,
type_hint=float,
description="Maximum sigma for video/fake-image latent noise.",
),
InputParam.template("generator"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam.template("image_latents"),
OutputParam("fake_image_latents", type_hint=torch.Tensor, description="Noisy fake image latents"),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
device = components._execution_device
image_latents = block_state.image_latents
fake_image_latents = block_state.fake_image_latents
# Add noise to image_latents
image_noise_sigma = (
torch.rand(1, device=device, generator=block_state.generator)
* (block_state.image_noise_sigma_max - block_state.image_noise_sigma_min)
+ block_state.image_noise_sigma_min
)
image_latents = (
image_noise_sigma * randn_tensor(image_latents.shape, generator=block_state.generator, device=device)
+ (1 - image_noise_sigma) * image_latents
)
# Add noise to fake_image_latents
fake_image_noise_sigma = (
torch.rand(1, device=device, generator=block_state.generator)
* (block_state.video_noise_sigma_max - block_state.video_noise_sigma_min)
+ block_state.video_noise_sigma_min
)
fake_image_latents = (
fake_image_noise_sigma
* randn_tensor(fake_image_latents.shape, generator=block_state.generator, device=device)
+ (1 - fake_image_noise_sigma) * fake_image_latents
)
block_state.image_latents = image_latents.to(device=device, dtype=torch.float32)
block_state.fake_image_latents = fake_image_latents.to(device=device, dtype=torch.float32)
self.set_block_state(state, block_state)
return components, state
class HeliosAddNoiseToVideoLatentsStep(ModularPipelineBlocks):
"""Adds noise to image_latents and video_latents for V2V conditioning.
Applies single-sigma noise to image_latents (using image_noise_sigma range) and per-frame noise to video_latents in
chunks (using video_noise_sigma range).
"""
model_name = "helios"
@property
def description(self) -> str:
return (
"Adds noise to image_latents and video_latents for V2V conditioning. "
"Uses single-sigma noise for image_latents and per-frame noise for video chunks."
)
@property
def inputs(self) -> list[InputParam]:
return [
InputParam.template("image_latents"),
InputParam(
"video_latents",
required=True,
type_hint=torch.Tensor,
description="Encoded video latents for V2V generation.",
),
InputParam(
"num_latent_frames_per_chunk",
default=9,
type_hint=int,
description="Number of latent frames per temporal chunk.",
),
InputParam(
"image_noise_sigma_min",
default=0.111,
type_hint=float,
description="Minimum sigma for image latent noise.",
),
InputParam(
"image_noise_sigma_max",
default=0.135,
type_hint=float,
description="Maximum sigma for image latent noise.",
),
InputParam(
"video_noise_sigma_min",
default=0.111,
type_hint=float,
description="Minimum sigma for video latent noise.",
),
InputParam(
"video_noise_sigma_max",
default=0.135,
type_hint=float,
description="Maximum sigma for video latent noise.",
),
InputParam.template("generator"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam.template("image_latents"),
OutputParam("video_latents", type_hint=torch.Tensor, description="Noisy video latents"),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
device = components._execution_device
image_latents = block_state.image_latents
video_latents = block_state.video_latents
num_latent_frames_per_chunk = block_state.num_latent_frames_per_chunk
# Add noise to first frame (single sigma)
image_noise_sigma = (
torch.rand(1, device=device, generator=block_state.generator)
* (block_state.image_noise_sigma_max - block_state.image_noise_sigma_min)
+ block_state.image_noise_sigma_min
)
image_latents = (
image_noise_sigma * randn_tensor(image_latents.shape, generator=block_state.generator, device=device)
+ (1 - image_noise_sigma) * image_latents
)
# Add per-frame noise to video chunks
noisy_latents_chunks = []
num_latent_chunks = video_latents.shape[2] // num_latent_frames_per_chunk
for i in range(num_latent_chunks):
chunk_start = i * num_latent_frames_per_chunk
chunk_end = chunk_start + num_latent_frames_per_chunk
latent_chunk = video_latents[:, :, chunk_start:chunk_end, :, :]
chunk_frames = latent_chunk.shape[2]
frame_sigmas = (
torch.rand(chunk_frames, device=device, generator=block_state.generator)
* (block_state.video_noise_sigma_max - block_state.video_noise_sigma_min)
+ block_state.video_noise_sigma_min
)
frame_sigmas = frame_sigmas.view(1, 1, chunk_frames, 1, 1)
noisy_chunk = (
frame_sigmas * randn_tensor(latent_chunk.shape, generator=block_state.generator, device=device)
+ (1 - frame_sigmas) * latent_chunk
)
noisy_latents_chunks.append(noisy_chunk)
video_latents = torch.cat(noisy_latents_chunks, dim=2)
block_state.image_latents = image_latents.to(device=device, dtype=torch.float32)
block_state.video_latents = video_latents.to(device=device, dtype=torch.float32)
self.set_block_state(state, block_state)
return components, state
class HeliosPrepareHistoryStep(ModularPipelineBlocks):
"""Prepares chunk/history indices and initializes history state for the chunk loop."""
model_name = "helios"
@property
def description(self) -> str:
return (
"Prepares the chunk loop by computing latent dimensions, number of chunks, "
"history indices, and initializing history state (history_latents, image_latents, latent_chunks)."
)
@property
def expected_components(self) -> list[ComponentSpec]:
return [
ComponentSpec("transformer", HeliosTransformer3DModel),
]
@property
def inputs(self) -> list[InputParam]:
return [
InputParam.template("height", default=384),
InputParam.template("width", default=640),
InputParam(
"num_frames", default=132, type_hint=int, description="Total number of video frames to generate."
),
InputParam("batch_size", required=True, type_hint=int),
InputParam(
"num_latent_frames_per_chunk",
default=9,
type_hint=int,
description="Number of latent frames per temporal chunk.",
),
InputParam(
"history_sizes",
default=[16, 2, 1],
type_hint=list,
description="Sizes of long/mid/short history buffers for temporal context.",
),
InputParam(
"keep_first_frame",
default=True,
type_hint=bool,
description="Whether to keep the first frame as a prefix in history.",
),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam("num_latent_chunk", type_hint=int, description="Number of temporal chunks"),
OutputParam("latent_shape", type_hint=tuple, description="Shape of latent tensor per chunk"),
OutputParam("history_sizes", type_hint=list, description="Adjusted history sizes (sorted, descending)"),
OutputParam("indices_hidden_states", type_hint=torch.Tensor, kwargs_type="denoiser_input_fields"),
OutputParam("indices_latents_history_short", type_hint=torch.Tensor, kwargs_type="denoiser_input_fields"),
OutputParam("indices_latents_history_mid", type_hint=torch.Tensor, kwargs_type="denoiser_input_fields"),
OutputParam("indices_latents_history_long", type_hint=torch.Tensor, kwargs_type="denoiser_input_fields"),
OutputParam("history_latents", type_hint=torch.Tensor, description="Initialized zero history latents"),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
batch_size = block_state.batch_size
device = components._execution_device
block_state.num_frames = max(block_state.num_frames, 1)
history_sizes = sorted(block_state.history_sizes, reverse=True)
num_channels_latents = components.num_channels_latents
h_latent = block_state.height // components.vae_scale_factor_spatial
w_latent = block_state.width // components.vae_scale_factor_spatial
# Compute number of chunks
block_state.window_num_frames = (
block_state.num_latent_frames_per_chunk - 1
) * components.vae_scale_factor_temporal + 1
block_state.num_latent_chunk = max(
1, (block_state.num_frames + block_state.window_num_frames - 1) // block_state.window_num_frames
)
# Modify history_sizes for non-keep_first_frame (matching pipeline behavior)
if not block_state.keep_first_frame:
history_sizes = history_sizes.copy()
history_sizes[-1] = history_sizes[-1] + 1
# Compute indices ONCE (same structure for all chunks)
if block_state.keep_first_frame:
indices = torch.arange(0, sum([1, *history_sizes, block_state.num_latent_frames_per_chunk]))
(
indices_prefix,
indices_latents_history_long,
indices_latents_history_mid,
indices_latents_history_1x,
indices_hidden_states,
) = indices.split([1, *history_sizes, block_state.num_latent_frames_per_chunk], dim=0)
indices_latents_history_short = torch.cat([indices_prefix, indices_latents_history_1x], dim=0)
else:
indices = torch.arange(0, sum([*history_sizes, block_state.num_latent_frames_per_chunk]))
(
indices_latents_history_long,
indices_latents_history_mid,
indices_latents_history_short,
indices_hidden_states,
) = indices.split([*history_sizes, block_state.num_latent_frames_per_chunk], dim=0)
# Latent shape per chunk
block_state.latent_shape = (
batch_size,
num_channels_latents,
block_state.num_latent_frames_per_chunk,
h_latent,
w_latent,
)
# Set outputs
block_state.history_sizes = history_sizes
block_state.indices_hidden_states = indices_hidden_states.unsqueeze(0)
block_state.indices_latents_history_short = indices_latents_history_short.unsqueeze(0)
block_state.indices_latents_history_mid = indices_latents_history_mid.unsqueeze(0)
block_state.indices_latents_history_long = indices_latents_history_long.unsqueeze(0)
block_state.history_latents = torch.zeros(
batch_size,
num_channels_latents,
sum(history_sizes),
h_latent,
w_latent,
device=device,
dtype=torch.float32,
)
self.set_block_state(state, block_state)
return components, state
class HeliosI2VSeedHistoryStep(ModularPipelineBlocks):
"""Seeds history_latents with fake_image_latents for I2V pipelines.
This small additive step runs after HeliosPrepareHistoryStep and appends fake_image_latents to the initialized
history_latents tensor.
"""
model_name = "helios"
@property
def description(self) -> str:
return "I2V history seeding: appends fake_image_latents to history_latents."
@property
def inputs(self) -> list[InputParam]:
return [
InputParam("history_latents", required=True, type_hint=torch.Tensor),
InputParam("fake_image_latents", required=True, type_hint=torch.Tensor),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam(
"history_latents", type_hint=torch.Tensor, description="History latents seeded with fake_image_latents"
),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
block_state.history_latents = torch.cat([block_state.history_latents, block_state.fake_image_latents], dim=2)
self.set_block_state(state, block_state)
return components, state
class HeliosV2VSeedHistoryStep(ModularPipelineBlocks):
"""Seeds history_latents with video_latents for V2V pipelines.
This step runs after HeliosPrepareHistoryStep and replaces the tail of history_latents with video_latents. If the
video has fewer frames than the history, the beginning of history is preserved.
"""
model_name = "helios"
@property
def description(self) -> str:
return "V2V history seeding: replaces the tail of history_latents with video_latents."
@property
def inputs(self) -> list[InputParam]:
return [
InputParam("history_latents", required=True, type_hint=torch.Tensor),
InputParam("video_latents", required=True, type_hint=torch.Tensor),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam(
"history_latents", type_hint=torch.Tensor, description="History latents seeded with video_latents"
),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
history_latents = block_state.history_latents
video_latents = block_state.video_latents
history_frames = history_latents.shape[2]
video_frames = video_latents.shape[2]
if video_frames < history_frames:
keep_frames = history_frames - video_frames
history_latents = torch.cat([history_latents[:, :, :keep_frames, :, :], video_latents], dim=2)
else:
history_latents = video_latents
block_state.history_latents = history_latents
self.set_block_state(state, block_state)
return components, state
class HeliosSetTimestepsStep(ModularPipelineBlocks):
"""Computes scheduler parameters (mu, sigmas) for the chunk loop."""
model_name = "helios"
@property
def description(self) -> str:
return "Computes scheduler shift parameter (mu) and default sigmas for the Helios chunk loop."
@property
def expected_components(self) -> list[ComponentSpec]:
return [
ComponentSpec("transformer", HeliosTransformer3DModel),
ComponentSpec("scheduler", HeliosScheduler),
]
@property
def inputs(self) -> list[InputParam]:
return [
InputParam("latent_shape", required=True, type_hint=tuple),
InputParam.template("num_inference_steps"),
InputParam.template("sigmas"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam("mu", type_hint=float, description="Scheduler shift parameter"),
OutputParam("sigmas", type_hint=list, description="Sigma schedule for diffusion"),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
patch_size = components.transformer.config.patch_size
latent_shape = block_state.latent_shape
image_seq_len = (latent_shape[-1] * latent_shape[-2] * latent_shape[-3]) // (
patch_size[0] * patch_size[1] * patch_size[2]
)
if block_state.sigmas is None:
block_state.sigmas = np.linspace(0.999, 0.0, block_state.num_inference_steps + 1)[:-1]
block_state.mu = calculate_shift(
image_seq_len,
components.scheduler.config.get("base_image_seq_len", 256),
components.scheduler.config.get("max_image_seq_len", 4096),
components.scheduler.config.get("base_shift", 0.5),
components.scheduler.config.get("max_shift", 1.15),
)
self.set_block_state(state, block_state)
return components, state

View File

@@ -0,0 +1,110 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import PIL
import torch
from ...configuration_utils import FrozenDict
from ...models import AutoencoderKLWan
from ...utils import logging
from ...video_processor import VideoProcessor
from ..modular_pipeline import ModularPipelineBlocks, PipelineState
from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
class HeliosDecodeStep(ModularPipelineBlocks):
"""Decode all chunk latents with VAE, trim frames, and postprocess into final video output."""
model_name = "helios"
@property
def description(self) -> str:
return (
"Decodes all chunk latents with the VAE, concatenates them, "
"trims to the target frame count, and postprocesses into the final video output."
)
@property
def expected_components(self) -> list[ComponentSpec]:
return [
ComponentSpec("vae", AutoencoderKLWan),
ComponentSpec(
"video_processor",
VideoProcessor,
config=FrozenDict({"vae_scale_factor": 8}),
default_creation_method="from_config",
),
]
@property
def inputs(self) -> list[InputParam]:
return [
InputParam(
"latent_chunks", required=True, type_hint=list, description="List of per-chunk denoised latent tensors"
),
InputParam("num_frames", required=True, type_hint=int, description="The target number of output frames"),
InputParam.template("output_type", default="np"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam(
"videos",
type_hint=list[list[PIL.Image.Image]] | list[torch.Tensor] | list[np.ndarray],
description="The generated videos, can be a PIL.Image.Image, torch.Tensor or a numpy array",
),
]
@torch.no_grad()
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
vae = components.vae
latents_mean = (
torch.tensor(vae.config.latents_mean).view(1, vae.config.z_dim, 1, 1, 1).to(vae.device, vae.dtype)
)
latents_std = 1.0 / torch.tensor(vae.config.latents_std).view(1, vae.config.z_dim, 1, 1, 1).to(
vae.device, vae.dtype
)
history_video = None
for chunk_latents in block_state.latent_chunks:
current_latents = chunk_latents.to(vae.dtype) / latents_std + latents_mean
current_video = vae.decode(current_latents, return_dict=False)[0]
if history_video is None:
history_video = current_video
else:
history_video = torch.cat([history_video, current_video], dim=2)
# Trim to proper frame count
generated_frames = history_video.size(2)
generated_frames = (
generated_frames - 1
) // components.vae_scale_factor_temporal * components.vae_scale_factor_temporal + 1
history_video = history_video[:, :, :generated_frames]
block_state.videos = components.video_processor.postprocess_video(
history_video, output_type=block_state.output_type
)
self.set_block_state(state, block_state)
return components, state

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,392 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import html
import regex as re
import torch
from transformers import AutoTokenizer, UMT5EncoderModel
from ...configuration_utils import FrozenDict
from ...guiders import ClassifierFreeGuidance
from ...models import AutoencoderKLWan
from ...utils import is_ftfy_available, logging
from ...video_processor import VideoProcessor
from ..modular_pipeline import ModularPipelineBlocks, PipelineState
from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam
from .modular_pipeline import HeliosModularPipeline
if is_ftfy_available():
import ftfy
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
def basic_clean(text):
text = ftfy.fix_text(text)
text = html.unescape(html.unescape(text))
return text.strip()
def whitespace_clean(text):
text = re.sub(r"\s+", " ", text)
text = text.strip()
return text
def prompt_clean(text):
text = whitespace_clean(basic_clean(text))
return text
def get_t5_prompt_embeds(
text_encoder: UMT5EncoderModel,
tokenizer: AutoTokenizer,
prompt: str | list[str],
max_sequence_length: int,
device: torch.device,
dtype: torch.dtype | None = None,
):
"""Encode text prompts into T5 embeddings for Helios.
Args:
text_encoder: The T5 text encoder model.
tokenizer: The tokenizer for the text encoder.
prompt: The prompt or prompts to encode.
max_sequence_length: Maximum sequence length for tokenization.
device: Device to place tensors on.
dtype: Optional dtype override. Defaults to `text_encoder.dtype`.
Returns:
A tuple of `(prompt_embeds, attention_mask)` where `prompt_embeds` is the encoded text embeddings and
`attention_mask` is a boolean mask.
"""
dtype = dtype or text_encoder.dtype
prompt = [prompt] if isinstance(prompt, str) else prompt
prompt = [prompt_clean(u) for u in prompt]
text_inputs = tokenizer(
prompt,
padding="max_length",
max_length=max_sequence_length,
truncation=True,
add_special_tokens=True,
return_attention_mask=True,
return_tensors="pt",
)
text_input_ids, mask = text_inputs.input_ids, text_inputs.attention_mask
seq_lens = mask.gt(0).sum(dim=1).long()
prompt_embeds = text_encoder(text_input_ids.to(device), mask.to(device)).last_hidden_state
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
prompt_embeds = [u[:v] for u, v in zip(prompt_embeds, seq_lens)]
prompt_embeds = torch.stack(
[torch.cat([u, u.new_zeros(max_sequence_length - u.size(0), u.size(1))]) for u in prompt_embeds], dim=0
)
return prompt_embeds, text_inputs.attention_mask.bool()
class HeliosTextEncoderStep(ModularPipelineBlocks):
model_name = "helios"
@property
def description(self) -> str:
return "Text Encoder step that generates text embeddings to guide the video generation"
@property
def expected_components(self) -> list[ComponentSpec]:
return [
ComponentSpec("text_encoder", UMT5EncoderModel),
ComponentSpec("tokenizer", AutoTokenizer),
ComponentSpec(
"guider",
ClassifierFreeGuidance,
config=FrozenDict({"guidance_scale": 5.0}),
default_creation_method="from_config",
),
]
@property
def inputs(self) -> list[InputParam]:
return [
InputParam.template("prompt"),
InputParam.template("negative_prompt"),
InputParam.template("max_sequence_length"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam.template("prompt_embeds"),
OutputParam.template("negative_prompt_embeds"),
]
@staticmethod
def check_inputs(prompt, negative_prompt):
if prompt is not None and not isinstance(prompt, (str, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
if negative_prompt is not None and not isinstance(negative_prompt, (str, list)):
raise ValueError(f"`negative_prompt` has to be of type `str` or `list` but is {type(negative_prompt)}")
if prompt is not None and negative_prompt is not None:
prompt_list = [prompt] if isinstance(prompt, str) else prompt
neg_list = [negative_prompt] if isinstance(negative_prompt, str) else negative_prompt
if type(prompt_list) is not type(neg_list):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
if len(prompt_list) != len(neg_list):
raise ValueError(
f"`negative_prompt` has batch size {len(neg_list)}, but `prompt` has batch size"
f" {len(prompt_list)}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
prompt = block_state.prompt
negative_prompt = block_state.negative_prompt
max_sequence_length = block_state.max_sequence_length
device = components._execution_device
self.check_inputs(prompt, negative_prompt)
# Encode prompt
block_state.prompt_embeds, _ = get_t5_prompt_embeds(
text_encoder=components.text_encoder,
tokenizer=components.tokenizer,
prompt=prompt,
max_sequence_length=max_sequence_length,
device=device,
)
# Encode negative prompt
block_state.negative_prompt_embeds = None
if components.requires_unconditional_embeds:
negative_prompt = negative_prompt or ""
if isinstance(prompt, list) and isinstance(negative_prompt, str):
negative_prompt = len(prompt) * [negative_prompt]
block_state.negative_prompt_embeds, _ = get_t5_prompt_embeds(
text_encoder=components.text_encoder,
tokenizer=components.tokenizer,
prompt=negative_prompt,
max_sequence_length=max_sequence_length,
device=device,
)
self.set_block_state(state, block_state)
return components, state
class HeliosImageVaeEncoderStep(ModularPipelineBlocks):
"""Encodes an input image into VAE latent space for image-to-video generation."""
model_name = "helios"
@property
def description(self) -> str:
return (
"Image Encoder step that encodes an input image into VAE latent space, "
"producing image_latents (first frame prefix) and fake_image_latents (history seed) "
"for image-to-video generation."
)
@property
def expected_components(self) -> list[ComponentSpec]:
return [
ComponentSpec("vae", AutoencoderKLWan),
ComponentSpec(
"video_processor",
VideoProcessor,
config=FrozenDict({"vae_scale_factor": 8}),
default_creation_method="from_config",
),
]
@property
def inputs(self) -> list[InputParam]:
return [
InputParam.template("image"),
InputParam.template("height", default=384),
InputParam.template("width", default=640),
InputParam(
"num_latent_frames_per_chunk",
default=9,
type_hint=int,
description="Number of latent frames per temporal chunk.",
),
InputParam.template("generator"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam.template("image_latents"),
OutputParam(
"fake_image_latents", type_hint=torch.Tensor, description="Fake image latents for history seeding"
),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
vae = components.vae
device = components._execution_device
latents_mean = (
torch.tensor(vae.config.latents_mean).view(1, vae.config.z_dim, 1, 1, 1).to(vae.device, vae.dtype)
)
latents_std = 1.0 / torch.tensor(vae.config.latents_std).view(1, vae.config.z_dim, 1, 1, 1).to(
vae.device, vae.dtype
)
# Preprocess image to 4D tensor (B, C, H, W)
image = components.video_processor.preprocess(
block_state.image, height=block_state.height, width=block_state.width
)
image_5d = image.unsqueeze(2).to(device=device, dtype=vae.dtype) # (B, C, 1, H, W)
# Encode image to get image_latents
image_latents = vae.encode(image_5d).latent_dist.sample(generator=block_state.generator)
image_latents = (image_latents - latents_mean) * latents_std
# Encode fake video to get fake_image_latents
min_frames = (block_state.num_latent_frames_per_chunk - 1) * components.vae_scale_factor_temporal + 1
fake_video = image_5d.repeat(1, 1, min_frames, 1, 1) # (B, C, min_frames, H, W)
fake_latents_full = vae.encode(fake_video).latent_dist.sample(generator=block_state.generator)
fake_latents_full = (fake_latents_full - latents_mean) * latents_std
fake_image_latents = fake_latents_full[:, :, -1:, :, :]
block_state.image_latents = image_latents.to(device=device, dtype=torch.float32)
block_state.fake_image_latents = fake_image_latents.to(device=device, dtype=torch.float32)
self.set_block_state(state, block_state)
return components, state
class HeliosVideoVaeEncoderStep(ModularPipelineBlocks):
"""Encodes an input video into VAE latent space for video-to-video generation.
Produces `image_latents` (first frame) and `video_latents` (remaining frames encoded in chunks).
"""
model_name = "helios"
@property
def description(self) -> str:
return (
"Video Encoder step that encodes an input video into VAE latent space, "
"producing image_latents (first frame) and video_latents (chunked video frames) "
"for video-to-video generation."
)
@property
def expected_components(self) -> list[ComponentSpec]:
return [
ComponentSpec("vae", AutoencoderKLWan),
ComponentSpec(
"video_processor",
VideoProcessor,
config=FrozenDict({"vae_scale_factor": 8}),
default_creation_method="from_config",
),
]
@property
def inputs(self) -> list[InputParam]:
return [
InputParam("video", required=True, description="Input video for video-to-video generation"),
InputParam.template("height", default=384),
InputParam.template("width", default=640),
InputParam(
"num_latent_frames_per_chunk",
default=9,
type_hint=int,
description="Number of latent frames per temporal chunk.",
),
InputParam.template("generator"),
]
@property
def intermediate_outputs(self) -> list[OutputParam]:
return [
OutputParam.template("image_latents"),
OutputParam("video_latents", type_hint=torch.Tensor, description="Encoded video latents (chunked)"),
]
@torch.no_grad()
def __call__(self, components: HeliosModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
vae = components.vae
device = components._execution_device
num_latent_frames_per_chunk = block_state.num_latent_frames_per_chunk
latents_mean = (
torch.tensor(vae.config.latents_mean).view(1, vae.config.z_dim, 1, 1, 1).to(vae.device, vae.dtype)
)
latents_std = 1.0 / torch.tensor(vae.config.latents_std).view(1, vae.config.z_dim, 1, 1, 1).to(
vae.device, vae.dtype
)
# Preprocess video
video = components.video_processor.preprocess_video(
block_state.video, height=block_state.height, width=block_state.width
)
video = video.to(device=device, dtype=vae.dtype)
# Encode video into latents
num_frames = video.shape[2]
min_frames = (num_latent_frames_per_chunk - 1) * 4 + 1
num_chunks = num_frames // min_frames
if num_chunks == 0:
raise ValueError(
f"Video must have at least {min_frames} frames "
f"(got {num_frames} frames). "
f"Required: (num_latent_frames_per_chunk - 1) * 4 + 1 = ({num_latent_frames_per_chunk} - 1) * 4 + 1 = {min_frames}"
)
total_valid_frames = num_chunks * min_frames
start_frame = num_frames - total_valid_frames
# Encode first frame
first_frame = video[:, :, 0:1, :, :]
image_latents = vae.encode(first_frame).latent_dist.sample(generator=block_state.generator)
image_latents = (image_latents - latents_mean) * latents_std
# Encode remaining frames in chunks
latents_chunks = []
for i in range(num_chunks):
chunk_start = start_frame + i * min_frames
chunk_end = chunk_start + min_frames
video_chunk = video[:, :, chunk_start:chunk_end, :, :]
chunk_latents = vae.encode(video_chunk).latent_dist.sample(generator=block_state.generator)
chunk_latents = (chunk_latents - latents_mean) * latents_std
latents_chunks.append(chunk_latents)
video_latents = torch.cat(latents_chunks, dim=2)
block_state.image_latents = image_latents.to(device=device, dtype=torch.float32)
block_state.video_latents = video_latents.to(device=device, dtype=torch.float32)
self.set_block_state(state, block_state)
return components, state

View File

@@ -0,0 +1,542 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, ConditionalPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InputParam, InsertableDict, OutputParam
from .before_denoise import (
HeliosAdditionalInputsStep,
HeliosAddNoiseToImageLatentsStep,
HeliosAddNoiseToVideoLatentsStep,
HeliosI2VSeedHistoryStep,
HeliosPrepareHistoryStep,
HeliosSetTimestepsStep,
HeliosTextInputStep,
HeliosV2VSeedHistoryStep,
)
from .decoders import HeliosDecodeStep
from .denoise import HeliosChunkDenoiseStep, HeliosI2VChunkDenoiseStep
from .encoders import HeliosImageVaeEncoderStep, HeliosTextEncoderStep, HeliosVideoVaeEncoderStep
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# ====================
# 1. Vae Encoder
# ====================
# auto_docstring
class HeliosAutoVaeEncoderStep(AutoPipelineBlocks):
"""
Encoder step that encodes video or image inputs. This is an auto pipeline block.
- `HeliosVideoVaeEncoderStep` (video_encoder) is used when `video` is provided.
- `HeliosImageVaeEncoderStep` (image_encoder) is used when `image` is provided.
- If neither is provided, step will be skipped.
Components:
vae (`AutoencoderKLWan`) video_processor (`VideoProcessor`)
Inputs:
video (`None`, *optional*):
Input video for video-to-video generation
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
image (`Image | list`, *optional*):
Reference image(s) for denoising. Can be a single image or list of images.
Outputs:
image_latents (`Tensor`):
The latent representation of the input image.
video_latents (`Tensor`):
Encoded video latents (chunked)
fake_image_latents (`Tensor`):
Fake image latents for history seeding
"""
block_classes = [HeliosVideoVaeEncoderStep, HeliosImageVaeEncoderStep]
block_names = ["video_encoder", "image_encoder"]
block_trigger_inputs = ["video", "image"]
@property
def description(self):
return (
"Encoder step that encodes video or image inputs. This is an auto pipeline block.\n"
" - `HeliosVideoVaeEncoderStep` (video_encoder) is used when `video` is provided.\n"
" - `HeliosImageVaeEncoderStep` (image_encoder) is used when `image` is provided.\n"
" - If neither is provided, step will be skipped."
)
# ====================
# 2. DENOISE
# ====================
# DENOISE (T2V)
# auto_docstring
class HeliosCoreDenoiseStep(SequentialPipelineBlocks):
"""
Denoise block that takes encoded conditions and runs the chunk-based denoising process.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps.
sigmas (`list`, *optional*):
Custom sigmas for the denoising process.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
timesteps (`Tensor`, *optional*):
Timesteps for the denoising process.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios"
block_classes = [
HeliosTextInputStep,
HeliosPrepareHistoryStep,
HeliosSetTimestepsStep,
HeliosChunkDenoiseStep,
]
block_names = ["input", "prepare_history", "set_timesteps", "chunk_denoise"]
@property
def description(self):
return "Denoise block that takes encoded conditions and runs the chunk-based denoising process."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# DENOISE (I2V)
# auto_docstring
class HeliosI2VCoreDenoiseStep(SequentialPipelineBlocks):
"""
I2V denoise block that seeds history with image latents and uses I2V-aware chunk preparation.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`):
image latents used to guide the image generation. Can be generated from vae_encoder step.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video/fake-image latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video/fake-image latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps.
sigmas (`list`, *optional*):
Custom sigmas for the denoising process.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
timesteps (`Tensor`, *optional*):
Timesteps for the denoising process.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios"
block_classes = [
HeliosTextInputStep,
HeliosAdditionalInputsStep(
image_latent_inputs=[InputParam.template("image_latents")],
additional_batch_inputs=[
InputParam(
"fake_image_latents",
type_hint=torch.Tensor,
description="Fake image latents used as history seed for I2V generation.",
),
],
),
HeliosAddNoiseToImageLatentsStep,
HeliosPrepareHistoryStep,
HeliosI2VSeedHistoryStep,
HeliosSetTimestepsStep,
HeliosI2VChunkDenoiseStep,
]
block_names = [
"input",
"additional_inputs",
"add_noise_image",
"prepare_history",
"seed_history",
"set_timesteps",
"chunk_denoise",
]
@property
def description(self):
return "I2V denoise block that seeds history with image latents and uses I2V-aware chunk preparation."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# DENOISE (V2V)
# auto_docstring
class HeliosV2VCoreDenoiseStep(SequentialPipelineBlocks):
"""
V2V denoise block that seeds history with video latents and uses I2V-aware chunk preparation.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps.
sigmas (`list`, *optional*):
Custom sigmas for the denoising process.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
timesteps (`Tensor`, *optional*):
Timesteps for the denoising process.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios"
block_classes = [
HeliosTextInputStep,
HeliosAdditionalInputsStep(
image_latent_inputs=[InputParam.template("image_latents")],
additional_batch_inputs=[
InputParam(
"video_latents", type_hint=torch.Tensor, description="Encoded video latents for V2V generation."
),
],
),
HeliosAddNoiseToVideoLatentsStep,
HeliosPrepareHistoryStep,
HeliosV2VSeedHistoryStep,
HeliosSetTimestepsStep,
HeliosI2VChunkDenoiseStep,
]
block_names = [
"input",
"additional_inputs",
"add_noise_video",
"prepare_history",
"seed_history",
"set_timesteps",
"chunk_denoise",
]
@property
def description(self):
return "V2V denoise block that seeds history with video latents and uses I2V-aware chunk preparation."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# AUTO DENOISE
# auto_docstring
class HeliosAutoCoreDenoiseStep(ConditionalPipelineBlocks):
"""
Core denoise step that selects the appropriate denoising block.
- `HeliosV2VCoreDenoiseStep` (video2video) for video-to-video tasks.
- `HeliosI2VCoreDenoiseStep` (image2video) for image-to-video tasks.
- `HeliosCoreDenoiseStep` (text2video) for text-to-video tasks.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps.
sigmas (`list`):
Custom sigmas for the denoising process.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
timesteps (`Tensor`, *optional*):
Timesteps for the denoising process.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
block_classes = [HeliosV2VCoreDenoiseStep, HeliosI2VCoreDenoiseStep, HeliosCoreDenoiseStep]
block_names = ["video2video", "image2video", "text2video"]
block_trigger_inputs = ["video_latents", "fake_image_latents"]
default_block_name = "text2video"
def select_block(self, video_latents=None, fake_image_latents=None):
if video_latents is not None:
return "video2video"
elif fake_image_latents is not None:
return "image2video"
return None
@property
def description(self):
return (
"Core denoise step that selects the appropriate denoising block.\n"
" - `HeliosV2VCoreDenoiseStep` (video2video) for video-to-video tasks.\n"
" - `HeliosI2VCoreDenoiseStep` (image2video) for image-to-video tasks.\n"
" - `HeliosCoreDenoiseStep` (text2video) for text-to-video tasks."
)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", HeliosTextEncoderStep()),
("vae_encoder", HeliosAutoVaeEncoderStep()),
("denoise", HeliosAutoCoreDenoiseStep()),
("decode", HeliosDecodeStep()),
]
)
# ====================
# 3. Auto Blocks
# ====================
# auto_docstring
class HeliosAutoBlocks(SequentialPipelineBlocks):
"""
Auto Modular pipeline for text-to-video, image-to-video, and video-to-video tasks using Helios.
Supported workflows:
- `text2video`: requires `prompt`
- `image2video`: requires `prompt`, `image`
- `video2video`: requires `prompt`, `video`
Components:
text_encoder (`UMT5EncoderModel`) tokenizer (`AutoTokenizer`) guider (`ClassifierFreeGuidance`) vae
(`AutoencoderKLWan`) video_processor (`VideoProcessor`) transformer (`HeliosTransformer3DModel`) scheduler
(`HeliosScheduler`)
Inputs:
prompt (`str`):
The prompt or prompts to guide image generation.
negative_prompt (`str`, *optional*):
The prompt or prompts not to guide the image generation.
max_sequence_length (`int`, *optional*, defaults to 512):
Maximum sequence length for prompt encoding.
video (`None`, *optional*):
Input video for video-to-video generation
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
image (`Image | list`, *optional*):
Reference image(s) for denoising. Can be a single image or list of images.
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps.
sigmas (`list`):
Custom sigmas for the denoising process.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
timesteps (`Tensor`, *optional*):
Timesteps for the denoising process.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
output_type (`str`, *optional*, defaults to np):
Output format: 'pil', 'np', 'pt'.
Outputs:
videos (`list`):
The generated videos.
"""
model_name = "helios"
block_classes = AUTO_BLOCKS.values()
block_names = AUTO_BLOCKS.keys()
_workflow_map = {
"text2video": {"prompt": True},
"image2video": {"prompt": True, "image": True},
"video2video": {"prompt": True, "video": True},
}
@property
def description(self):
return "Auto Modular pipeline for text-to-video, image-to-video, and video-to-video tasks using Helios."
@property
def outputs(self):
return [OutputParam.template("videos")]

View File

@@ -0,0 +1,520 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, ConditionalPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InputParam, InsertableDict, OutputParam
from .before_denoise import (
HeliosAdditionalInputsStep,
HeliosAddNoiseToImageLatentsStep,
HeliosAddNoiseToVideoLatentsStep,
HeliosI2VSeedHistoryStep,
HeliosPrepareHistoryStep,
HeliosTextInputStep,
HeliosV2VSeedHistoryStep,
)
from .decoders import HeliosDecodeStep
from .denoise import HeliosPyramidChunkDenoiseStep, HeliosPyramidI2VChunkDenoiseStep
from .encoders import HeliosImageVaeEncoderStep, HeliosTextEncoderStep, HeliosVideoVaeEncoderStep
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# ====================
# 1. Vae Encoder
# ====================
# auto_docstring
class HeliosPyramidAutoVaeEncoderStep(AutoPipelineBlocks):
"""
Encoder step that encodes video or image inputs. This is an auto pipeline block.
- `HeliosVideoVaeEncoderStep` (video_encoder) is used when `video` is provided.
- `HeliosImageVaeEncoderStep` (image_encoder) is used when `image` is provided.
- If neither is provided, step will be skipped.
Components:
vae (`AutoencoderKLWan`) video_processor (`VideoProcessor`)
Inputs:
video (`None`, *optional*):
Input video for video-to-video generation
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
image (`Image | list`, *optional*):
Reference image(s) for denoising. Can be a single image or list of images.
Outputs:
image_latents (`Tensor`):
The latent representation of the input image.
video_latents (`Tensor`):
Encoded video latents (chunked)
fake_image_latents (`Tensor`):
Fake image latents for history seeding
"""
block_classes = [HeliosVideoVaeEncoderStep, HeliosImageVaeEncoderStep]
block_names = ["video_encoder", "image_encoder"]
block_trigger_inputs = ["video", "image"]
@property
def description(self):
return (
"Encoder step that encodes video or image inputs. This is an auto pipeline block.\n"
" - `HeliosVideoVaeEncoderStep` (video_encoder) is used when `video` is provided.\n"
" - `HeliosImageVaeEncoderStep` (image_encoder) is used when `image` is provided.\n"
" - If neither is provided, step will be skipped."
)
# ====================
# 2. DENOISE
# ====================
# DENOISE (T2V)
# auto_docstring
class HeliosPyramidCoreDenoiseStep(SequentialPipelineBlocks):
"""
T2V pyramid denoise block with progressive multi-resolution denoising.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider
(`ClassifierFreeZeroStarGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios-pyramid"
block_classes = [
HeliosTextInputStep,
HeliosPrepareHistoryStep,
HeliosPyramidChunkDenoiseStep,
]
block_names = ["input", "prepare_history", "pyramid_chunk_denoise"]
@property
def description(self):
return "T2V pyramid denoise block with progressive multi-resolution denoising."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# DENOISE (I2V)
# auto_docstring
class HeliosPyramidI2VCoreDenoiseStep(SequentialPipelineBlocks):
"""
I2V pyramid denoise block with progressive multi-resolution denoising.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider
(`ClassifierFreeZeroStarGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`):
image latents used to guide the image generation. Can be generated from vae_encoder step.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video/fake-image latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video/fake-image latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios-pyramid"
block_classes = [
HeliosTextInputStep,
HeliosAdditionalInputsStep(
image_latent_inputs=[InputParam.template("image_latents")],
additional_batch_inputs=[
InputParam(
"fake_image_latents",
type_hint=torch.Tensor,
description="Fake image latents used as history seed for I2V generation.",
),
],
),
HeliosAddNoiseToImageLatentsStep,
HeliosPrepareHistoryStep,
HeliosI2VSeedHistoryStep,
HeliosPyramidI2VChunkDenoiseStep,
]
block_names = [
"input",
"additional_inputs",
"add_noise_image",
"prepare_history",
"seed_history",
"pyramid_chunk_denoise",
]
@property
def description(self):
return "I2V pyramid denoise block with progressive multi-resolution denoising."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# DENOISE (V2V)
# auto_docstring
class HeliosPyramidV2VCoreDenoiseStep(SequentialPipelineBlocks):
"""
V2V pyramid denoise block with progressive multi-resolution denoising.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider
(`ClassifierFreeZeroStarGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios-pyramid"
block_classes = [
HeliosTextInputStep,
HeliosAdditionalInputsStep(
image_latent_inputs=[InputParam.template("image_latents")],
additional_batch_inputs=[
InputParam(
"video_latents", type_hint=torch.Tensor, description="Encoded video latents for V2V generation."
),
],
),
HeliosAddNoiseToVideoLatentsStep,
HeliosPrepareHistoryStep,
HeliosV2VSeedHistoryStep,
HeliosPyramidI2VChunkDenoiseStep,
]
block_names = [
"input",
"additional_inputs",
"add_noise_video",
"prepare_history",
"seed_history",
"pyramid_chunk_denoise",
]
@property
def description(self):
return "V2V pyramid denoise block with progressive multi-resolution denoising."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# AUTO DENOISE
# auto_docstring
class HeliosPyramidAutoCoreDenoiseStep(ConditionalPipelineBlocks):
"""
Pyramid core denoise step that selects the appropriate denoising block.
- `HeliosPyramidV2VCoreDenoiseStep` (video2video) for video-to-video tasks.
- `HeliosPyramidI2VCoreDenoiseStep` (image2video) for image-to-video tasks.
- `HeliosPyramidCoreDenoiseStep` (text2video) for text-to-video tasks.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider
(`ClassifierFreeZeroStarGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
block_classes = [HeliosPyramidV2VCoreDenoiseStep, HeliosPyramidI2VCoreDenoiseStep, HeliosPyramidCoreDenoiseStep]
block_names = ["video2video", "image2video", "text2video"]
block_trigger_inputs = ["video_latents", "fake_image_latents"]
default_block_name = "text2video"
def select_block(self, video_latents=None, fake_image_latents=None):
if video_latents is not None:
return "video2video"
elif fake_image_latents is not None:
return "image2video"
return None
@property
def description(self):
return (
"Pyramid core denoise step that selects the appropriate denoising block.\n"
" - `HeliosPyramidV2VCoreDenoiseStep` (video2video) for video-to-video tasks.\n"
" - `HeliosPyramidI2VCoreDenoiseStep` (image2video) for image-to-video tasks.\n"
" - `HeliosPyramidCoreDenoiseStep` (text2video) for text-to-video tasks."
)
# ====================
# 3. Auto Blocks
# ====================
PYRAMID_AUTO_BLOCKS = InsertableDict(
[
("text_encoder", HeliosTextEncoderStep()),
("vae_encoder", HeliosPyramidAutoVaeEncoderStep()),
("denoise", HeliosPyramidAutoCoreDenoiseStep()),
("decode", HeliosDecodeStep()),
]
)
# auto_docstring
class HeliosPyramidAutoBlocks(SequentialPipelineBlocks):
"""
Auto Modular pipeline for pyramid progressive generation (T2V/I2V/V2V) using Helios.
Supported workflows:
- `text2video`: requires `prompt`
- `image2video`: requires `prompt`, `image`
- `video2video`: requires `prompt`, `video`
Components:
text_encoder (`UMT5EncoderModel`) tokenizer (`AutoTokenizer`) guider (`ClassifierFreeGuidance`) vae
(`AutoencoderKLWan`) video_processor (`VideoProcessor`) transformer (`HeliosTransformer3DModel`) scheduler
(`HeliosScheduler`)
Inputs:
prompt (`str`):
The prompt or prompts to guide image generation.
negative_prompt (`str`, *optional*):
The prompt or prompts not to guide the image generation.
max_sequence_length (`int`, *optional*, defaults to 512):
Maximum sequence length for prompt encoding.
video (`None`, *optional*):
Input video for video-to-video generation
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
image (`Image | list`, *optional*):
Reference image(s) for denoising. Can be a single image or list of images.
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
output_type (`str`, *optional*, defaults to np):
Output format: 'pil', 'np', 'pt'.
Outputs:
videos (`list`):
The generated videos.
"""
model_name = "helios-pyramid"
block_classes = PYRAMID_AUTO_BLOCKS.values()
block_names = PYRAMID_AUTO_BLOCKS.keys()
_workflow_map = {
"text2video": {"prompt": True},
"image2video": {"prompt": True, "image": True},
"video2video": {"prompt": True, "video": True},
}
@property
def description(self):
return "Auto Modular pipeline for pyramid progressive generation (T2V/I2V/V2V) using Helios."
@property
def outputs(self):
return [OutputParam.template("videos")]

View File

@@ -0,0 +1,530 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, ConditionalPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InputParam, InsertableDict, OutputParam
from .before_denoise import (
HeliosAdditionalInputsStep,
HeliosAddNoiseToImageLatentsStep,
HeliosAddNoiseToVideoLatentsStep,
HeliosI2VSeedHistoryStep,
HeliosPrepareHistoryStep,
HeliosTextInputStep,
HeliosV2VSeedHistoryStep,
)
from .decoders import HeliosDecodeStep
from .denoise import HeliosPyramidDistilledChunkDenoiseStep, HeliosPyramidDistilledI2VChunkDenoiseStep
from .encoders import HeliosImageVaeEncoderStep, HeliosTextEncoderStep, HeliosVideoVaeEncoderStep
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# ====================
# 1. Vae Encoder
# ====================
# auto_docstring
class HeliosPyramidDistilledAutoVaeEncoderStep(AutoPipelineBlocks):
"""
Encoder step for distilled pyramid pipeline.
- `HeliosVideoVaeEncoderStep` (video_encoder) is used when `video` is provided.
- `HeliosImageVaeEncoderStep` (image_encoder) is used when `image` is provided.
- If neither is provided, step will be skipped.
Components:
vae (`AutoencoderKLWan`) video_processor (`VideoProcessor`)
Inputs:
video (`None`, *optional*):
Input video for video-to-video generation
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
image (`Image | list`, *optional*):
Reference image(s) for denoising. Can be a single image or list of images.
Outputs:
image_latents (`Tensor`):
The latent representation of the input image.
video_latents (`Tensor`):
Encoded video latents (chunked)
fake_image_latents (`Tensor`):
Fake image latents for history seeding
"""
block_classes = [HeliosVideoVaeEncoderStep, HeliosImageVaeEncoderStep]
block_names = ["video_encoder", "image_encoder"]
block_trigger_inputs = ["video", "image"]
@property
def description(self):
return (
"Encoder step for distilled pyramid pipeline.\n"
" - `HeliosVideoVaeEncoderStep` (video_encoder) is used when `video` is provided.\n"
" - `HeliosImageVaeEncoderStep` (image_encoder) is used when `image` is provided.\n"
" - If neither is provided, step will be skipped."
)
# ====================
# 2. DENOISE
# ====================
# DENOISE (T2V)
# auto_docstring
class HeliosPyramidDistilledCoreDenoiseStep(SequentialPipelineBlocks):
"""
T2V distilled pyramid denoise block with DMD scheduler and no CFG.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
is_amplify_first_chunk (`bool`, *optional*, defaults to True):
Whether to double the first chunk's timesteps via the scheduler for amplified generation.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios-pyramid"
block_classes = [
HeliosTextInputStep,
HeliosPrepareHistoryStep,
HeliosPyramidDistilledChunkDenoiseStep,
]
block_names = ["input", "prepare_history", "pyramid_chunk_denoise"]
@property
def description(self):
return "T2V distilled pyramid denoise block with DMD scheduler and no CFG."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# DENOISE (I2V)
# auto_docstring
class HeliosPyramidDistilledI2VCoreDenoiseStep(SequentialPipelineBlocks):
"""
I2V distilled pyramid denoise block with DMD scheduler and no CFG.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`):
image latents used to guide the image generation. Can be generated from vae_encoder step.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video/fake-image latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video/fake-image latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
is_amplify_first_chunk (`bool`, *optional*, defaults to True):
Whether to double the first chunk's timesteps via the scheduler for amplified generation.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios-pyramid"
block_classes = [
HeliosTextInputStep,
HeliosAdditionalInputsStep(
image_latent_inputs=[InputParam.template("image_latents")],
additional_batch_inputs=[
InputParam(
"fake_image_latents",
type_hint=torch.Tensor,
description="Fake image latents used as history seed for I2V generation.",
),
],
),
HeliosAddNoiseToImageLatentsStep,
HeliosPrepareHistoryStep,
HeliosI2VSeedHistoryStep,
HeliosPyramidDistilledI2VChunkDenoiseStep,
]
block_names = [
"input",
"additional_inputs",
"add_noise_image",
"prepare_history",
"seed_history",
"pyramid_chunk_denoise",
]
@property
def description(self):
return "I2V distilled pyramid denoise block with DMD scheduler and no CFG."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# DENOISE (V2V)
# auto_docstring
class HeliosPyramidDistilledV2VCoreDenoiseStep(SequentialPipelineBlocks):
"""
V2V distilled pyramid denoise block with DMD scheduler and no CFG.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`, *optional*, defaults to [16, 2, 1]):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
is_amplify_first_chunk (`bool`, *optional*, defaults to True):
Whether to double the first chunk's timesteps via the scheduler for amplified generation.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
model_name = "helios-pyramid"
block_classes = [
HeliosTextInputStep,
HeliosAdditionalInputsStep(
image_latent_inputs=[InputParam.template("image_latents")],
additional_batch_inputs=[
InputParam(
"video_latents", type_hint=torch.Tensor, description="Encoded video latents for V2V generation."
),
],
),
HeliosAddNoiseToVideoLatentsStep,
HeliosPrepareHistoryStep,
HeliosV2VSeedHistoryStep,
HeliosPyramidDistilledI2VChunkDenoiseStep,
]
block_names = [
"input",
"additional_inputs",
"add_noise_video",
"prepare_history",
"seed_history",
"pyramid_chunk_denoise",
]
@property
def description(self):
return "V2V distilled pyramid denoise block with DMD scheduler and no CFG."
@property
def outputs(self):
return [OutputParam("latent_chunks", type_hint=list, description="List of per-chunk denoised latent tensors")]
# AUTO DENOISE
# auto_docstring
class HeliosPyramidDistilledAutoCoreDenoiseStep(ConditionalPipelineBlocks):
"""
Distilled pyramid core denoise step that selects the appropriate denoising block.
- `HeliosPyramidDistilledV2VCoreDenoiseStep` (video2video) for video-to-video tasks.
- `HeliosPyramidDistilledI2VCoreDenoiseStep` (image2video) for image-to-video tasks.
- `HeliosPyramidDistilledCoreDenoiseStep` (text2video) for text-to-video tasks.
Components:
transformer (`HeliosTransformer3DModel`) scheduler (`HeliosScheduler`) guider (`ClassifierFreeGuidance`)
Inputs:
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
prompt_embeds (`Tensor`):
text embeddings used to guide the image generation. Can be generated from text_encoder step.
negative_prompt_embeds (`Tensor`, *optional*):
negative text embeddings used to guide the image generation. Can be generated from text_encoder step.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
is_amplify_first_chunk (`bool`, *optional*, defaults to True):
Whether to double the first chunk's timesteps via the scheduler for amplified generation.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
Outputs:
latent_chunks (`list`):
List of per-chunk denoised latent tensors
"""
block_classes = [
HeliosPyramidDistilledV2VCoreDenoiseStep,
HeliosPyramidDistilledI2VCoreDenoiseStep,
HeliosPyramidDistilledCoreDenoiseStep,
]
block_names = ["video2video", "image2video", "text2video"]
block_trigger_inputs = ["video_latents", "fake_image_latents"]
default_block_name = "text2video"
def select_block(self, video_latents=None, fake_image_latents=None):
if video_latents is not None:
return "video2video"
elif fake_image_latents is not None:
return "image2video"
return None
@property
def description(self):
return (
"Distilled pyramid core denoise step that selects the appropriate denoising block.\n"
" - `HeliosPyramidDistilledV2VCoreDenoiseStep` (video2video) for video-to-video tasks.\n"
" - `HeliosPyramidDistilledI2VCoreDenoiseStep` (image2video) for image-to-video tasks.\n"
" - `HeliosPyramidDistilledCoreDenoiseStep` (text2video) for text-to-video tasks."
)
# ====================
# 3. Auto Blocks
# ====================
DISTILLED_PYRAMID_AUTO_BLOCKS = InsertableDict(
[
("text_encoder", HeliosTextEncoderStep()),
("vae_encoder", HeliosPyramidDistilledAutoVaeEncoderStep()),
("denoise", HeliosPyramidDistilledAutoCoreDenoiseStep()),
("decode", HeliosDecodeStep()),
]
)
# auto_docstring
class HeliosPyramidDistilledAutoBlocks(SequentialPipelineBlocks):
"""
Auto Modular pipeline for distilled pyramid progressive generation (T2V/I2V/V2V) using Helios.
Supported workflows:
- `text2video`: requires `prompt`
- `image2video`: requires `prompt`, `image`
- `video2video`: requires `prompt`, `video`
Components:
text_encoder (`UMT5EncoderModel`) tokenizer (`AutoTokenizer`) guider (`ClassifierFreeGuidance`) vae
(`AutoencoderKLWan`) video_processor (`VideoProcessor`) transformer (`HeliosTransformer3DModel`) scheduler
(`HeliosScheduler`)
Inputs:
prompt (`str`):
The prompt or prompts to guide image generation.
negative_prompt (`str`, *optional*):
The prompt or prompts not to guide the image generation.
max_sequence_length (`int`, *optional*, defaults to 512):
Maximum sequence length for prompt encoding.
video (`None`, *optional*):
Input video for video-to-video generation
height (`int`, *optional*, defaults to 384):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 640):
The width in pixels of the generated image.
num_latent_frames_per_chunk (`int`, *optional*, defaults to 9):
Number of latent frames per temporal chunk.
generator (`Generator`, *optional*):
Torch generator for deterministic generation.
image (`Image | list`, *optional*):
Reference image(s) for denoising. Can be a single image or list of images.
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos to generate per prompt.
image_latents (`Tensor`, *optional*):
image latents used to guide the image generation. Can be generated from vae_encoder step.
video_latents (`Tensor`, *optional*):
Encoded video latents for V2V generation.
image_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for image latent noise.
image_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for image latent noise.
video_noise_sigma_min (`float`, *optional*, defaults to 0.111):
Minimum sigma for video latent noise.
video_noise_sigma_max (`float`, *optional*, defaults to 0.135):
Maximum sigma for video latent noise.
num_frames (`int`, *optional*, defaults to 132):
Total number of video frames to generate.
history_sizes (`list`):
Sizes of long/mid/short history buffers for temporal context.
keep_first_frame (`bool`, *optional*, defaults to True):
Whether to keep the first frame as a prefix in history.
pyramid_num_inference_steps_list (`list`, *optional*, defaults to [10, 10, 10]):
Number of denoising steps per pyramid stage.
latents (`Tensor`, *optional*):
Pre-generated noisy latents for image generation.
**denoiser_input_fields (`None`, *optional*):
conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.
is_amplify_first_chunk (`bool`, *optional*, defaults to True):
Whether to double the first chunk's timesteps via the scheduler for amplified generation.
attention_kwargs (`dict`, *optional*):
Additional kwargs for attention processors.
fake_image_latents (`Tensor`, *optional*):
Fake image latents used as history seed for I2V generation.
output_type (`str`, *optional*, defaults to np):
Output format: 'pil', 'np', 'pt'.
Outputs:
videos (`list`):
The generated videos.
"""
model_name = "helios-pyramid"
block_classes = DISTILLED_PYRAMID_AUTO_BLOCKS.values()
block_names = DISTILLED_PYRAMID_AUTO_BLOCKS.keys()
_workflow_map = {
"text2video": {"prompt": True},
"image2video": {"prompt": True, "image": True},
"video2video": {"prompt": True, "video": True},
}
@property
def description(self):
return "Auto Modular pipeline for distilled pyramid progressive generation (T2V/I2V/V2V) using Helios."
@property
def outputs(self):
return [OutputParam.template("videos")]

View File

@@ -0,0 +1,87 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...loaders import HeliosLoraLoaderMixin
from ...utils import logging
from ..modular_pipeline import ModularPipeline
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
class HeliosModularPipeline(
ModularPipeline,
HeliosLoraLoaderMixin,
):
"""
A ModularPipeline for Helios text-to-video generation.
> [!WARNING] > This is an experimental feature and is likely to change in the future.
"""
default_blocks_name = "HeliosAutoBlocks"
@property
def vae_scale_factor_spatial(self):
vae_scale_factor = 8
if hasattr(self, "vae") and self.vae is not None:
vae_scale_factor = self.vae.config.scale_factor_spatial
return vae_scale_factor
@property
def vae_scale_factor_temporal(self):
vae_scale_factor = 4
if hasattr(self, "vae") and self.vae is not None:
vae_scale_factor = self.vae.config.scale_factor_temporal
return vae_scale_factor
@property
def num_channels_latents(self):
# YiYi TODO: find out default value
num_channels_latents = 16
if hasattr(self, "transformer") and self.transformer is not None:
num_channels_latents = self.transformer.config.in_channels
return num_channels_latents
@property
def requires_unconditional_embeds(self):
requires_unconditional_embeds = False
if hasattr(self, "guider") and self.guider is not None:
requires_unconditional_embeds = self.guider._enabled and self.guider.num_conditions > 1
return requires_unconditional_embeds
class HeliosPyramidModularPipeline(HeliosModularPipeline):
"""
A ModularPipeline for Helios pyramid (progressive resolution) video generation.
> [!WARNING] > This is an experimental feature and is likely to change in the future.
"""
default_blocks_name = "HeliosPyramidAutoBlocks"
class HeliosPyramidDistilledModularPipeline(HeliosModularPipeline):
"""
A ModularPipeline for Helios distilled pyramid video generation using DMD scheduler.
Uses guidance_scale=1.0 (no CFG) and supports is_amplify_first_chunk for the DMD scheduler.
> [!WARNING] > This is an experimental feature and is likely to change in the future.
"""
default_blocks_name = "HeliosPyramidDistilledAutoBlocks"

View File

@@ -106,6 +106,16 @@ def _wan_i2v_map_fn(config_dict=None):
return "WanImage2VideoModularPipeline"
def _helios_pyramid_map_fn(config_dict=None):
if config_dict is None:
return "HeliosPyramidModularPipeline"
if config_dict.get("is_distilled", False):
return "HeliosPyramidDistilledModularPipeline"
else:
return "HeliosPyramidModularPipeline"
MODULAR_PIPELINE_MAPPING = OrderedDict(
[
("stable-diffusion-xl", _create_default_map_fn("StableDiffusionXLModularPipeline")),
@@ -120,6 +130,8 @@ MODULAR_PIPELINE_MAPPING = OrderedDict(
("qwenimage-edit-plus", _create_default_map_fn("QwenImageEditPlusModularPipeline")),
("qwenimage-layered", _create_default_map_fn("QwenImageLayeredModularPipeline")),
("z-image", _create_default_map_fn("ZImageModularPipeline")),
("helios", _create_default_map_fn("HeliosModularPipeline")),
("helios-pyramid", _helios_pyramid_map_fn),
]
)

View File

@@ -456,6 +456,8 @@ class HeliosPyramidPipeline(DiffusionPipeline, HeliosLoraLoaderMixin):
# the output will be non-deterministic and may produce incorrect results in CP context.
if generator is None:
generator = torch.Generator(device=device)
elif isinstance(generator, list):
generator = generator[0]
gamma = self.scheduler.config.gamma
_, ph, pw = patch_size
@@ -470,7 +472,8 @@ class HeliosPyramidPipeline(DiffusionPipeline, HeliosLoraLoaderMixin):
L = torch.linalg.cholesky(cov)
block_number = batch_size * channel * num_frames * (height // ph) * (width // pw)
z = torch.randn(block_number, block_size, device=device, generator=generator)
z = torch.randn(block_number, block_size, generator=generator, device=generator.device)
z = z.to(device=device)
noise = z @ L.T
noise = noise.view(batch_size, channel, num_frames, height // ph, width // pw, ph, pw)

View File

@@ -152,6 +152,96 @@ class FluxModularPipeline(metaclass=DummyObject):
requires_backends(cls, ["torch", "transformers"])
class HeliosAutoBlocks(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class HeliosModularPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class HeliosPyramidAutoBlocks(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class HeliosPyramidDistilledAutoBlocks(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class HeliosPyramidDistilledModularPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class HeliosPyramidModularPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class QwenImageAutoBlocks(metaclass=DummyObject):
_backends = ["torch", "transformers"]

View File

@@ -14,15 +14,16 @@
import contextlib
import gc
import logging
import unittest
import pytest
import torch
from parameterized import parameterized
from diffusers import AutoencoderKL
from diffusers.hooks import HookRegistry, ModelHook
from diffusers.models import ModelMixin
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from diffusers.utils import get_logger
from diffusers.utils.import_utils import compare_versions
from ..testing_utils import (
@@ -218,18 +219,20 @@ class NestedContainer(torch.nn.Module):
@require_torch_accelerator
class TestGroupOffload:
class GroupOffloadTests(unittest.TestCase):
in_features = 64
hidden_features = 256
out_features = 64
num_layers = 4
def setup_method(self):
def setUp(self):
with torch.no_grad():
self.model = self.get_model()
self.input = torch.randn((4, self.in_features)).to(torch_device)
def teardown_method(self):
def tearDown(self):
super().tearDown()
del self.model
del self.input
gc.collect()
@@ -245,20 +248,18 @@ class TestGroupOffload:
num_layers=self.num_layers,
)
@pytest.mark.skipif(
torch.device(torch_device).type not in ["cuda", "xpu"],
reason="Test requires a CUDA or XPU device.",
)
def test_offloading_forward_pass(self):
@torch.no_grad()
def run_forward(model):
gc.collect()
backend_empty_cache(torch_device)
backend_reset_peak_memory_stats(torch_device)
assert all(
module._diffusers_hook.get_hook("group_offloading") is not None
for module in model.modules()
if hasattr(module, "_diffusers_hook")
self.assertTrue(
all(
module._diffusers_hook.get_hook("group_offloading") is not None
for module in model.modules()
if hasattr(module, "_diffusers_hook")
)
)
model.eval()
output = model(self.input)[0].cpu()
@@ -290,37 +291,41 @@ class TestGroupOffload:
output_with_group_offloading5, mem5 = run_forward(model)
# Precision assertions - offloading should not impact the output
assert torch.allclose(output_without_group_offloading, output_with_group_offloading1, atol=1e-5)
assert torch.allclose(output_without_group_offloading, output_with_group_offloading2, atol=1e-5)
assert torch.allclose(output_without_group_offloading, output_with_group_offloading3, atol=1e-5)
assert torch.allclose(output_without_group_offloading, output_with_group_offloading4, atol=1e-5)
assert torch.allclose(output_without_group_offloading, output_with_group_offloading5, atol=1e-5)
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading1, atol=1e-5))
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading2, atol=1e-5))
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading3, atol=1e-5))
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading4, atol=1e-5))
self.assertTrue(torch.allclose(output_without_group_offloading, output_with_group_offloading5, atol=1e-5))
# Memory assertions - offloading should reduce memory usage
assert mem4 <= mem5 < mem2 <= mem3 < mem1 < mem_baseline
self.assertTrue(mem4 <= mem5 < mem2 <= mem3 < mem1 < mem_baseline)
def test_warning_logged_if_group_offloaded_module_moved_to_accelerator(self, caplog):
def test_warning_logged_if_group_offloaded_module_moved_to_accelerator(self):
if torch.device(torch_device).type not in ["cuda", "xpu"]:
return
self.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
with caplog.at_level(logging.WARNING, logger="diffusers.models.modeling_utils"):
logger = get_logger("diffusers.models.modeling_utils")
logger.setLevel("INFO")
with self.assertLogs(logger, level="WARNING") as cm:
self.model.to(torch_device)
assert f"The module '{self.model.__class__.__name__}' is group offloaded" in caplog.text
self.assertIn(f"The module '{self.model.__class__.__name__}' is group offloaded", cm.output[0])
def test_warning_logged_if_group_offloaded_pipe_moved_to_accelerator(self, caplog):
def test_warning_logged_if_group_offloaded_pipe_moved_to_accelerator(self):
if torch.device(torch_device).type not in ["cuda", "xpu"]:
return
pipe = DummyPipeline(self.model)
self.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
with caplog.at_level(logging.WARNING, logger="diffusers.pipelines.pipeline_utils"):
logger = get_logger("diffusers.pipelines.pipeline_utils")
logger.setLevel("INFO")
with self.assertLogs(logger, level="WARNING") as cm:
pipe.to(torch_device)
assert f"The module '{self.model.__class__.__name__}' is group offloaded" in caplog.text
self.assertIn(f"The module '{self.model.__class__.__name__}' is group offloaded", cm.output[0])
def test_error_raised_if_streams_used_and_no_accelerator_device(self):
torch_accelerator_module = getattr(torch, torch_device, torch.cuda)
original_is_available = torch_accelerator_module.is_available
torch_accelerator_module.is_available = lambda: False
with pytest.raises(ValueError):
with self.assertRaises(ValueError):
self.model.enable_group_offload(
onload_device=torch.device(torch_device), offload_type="leaf_level", use_stream=True
)
@@ -328,31 +333,31 @@ class TestGroupOffload:
def test_error_raised_if_supports_group_offloading_false(self):
self.model._supports_group_offloading = False
with pytest.raises(ValueError, match="does not support group offloading"):
with self.assertRaisesRegex(ValueError, "does not support group offloading"):
self.model.enable_group_offload(onload_device=torch.device(torch_device))
def test_error_raised_if_model_offloading_applied_on_group_offloaded_module(self):
pipe = DummyPipeline(self.model)
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
with pytest.raises(ValueError, match="You are trying to apply model/sequential CPU offloading"):
with self.assertRaisesRegex(ValueError, "You are trying to apply model/sequential CPU offloading"):
pipe.enable_model_cpu_offload()
def test_error_raised_if_sequential_offloading_applied_on_group_offloaded_module(self):
pipe = DummyPipeline(self.model)
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
with pytest.raises(ValueError, match="You are trying to apply model/sequential CPU offloading"):
with self.assertRaisesRegex(ValueError, "You are trying to apply model/sequential CPU offloading"):
pipe.enable_sequential_cpu_offload()
def test_error_raised_if_group_offloading_applied_on_model_offloaded_module(self):
pipe = DummyPipeline(self.model)
pipe.enable_model_cpu_offload()
with pytest.raises(ValueError, match="Cannot apply group offloading"):
with self.assertRaisesRegex(ValueError, "Cannot apply group offloading"):
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
def test_error_raised_if_group_offloading_applied_on_sequential_offloaded_module(self):
pipe = DummyPipeline(self.model)
pipe.enable_sequential_cpu_offload()
with pytest.raises(ValueError, match="Cannot apply group offloading"):
with self.assertRaisesRegex(ValueError, "Cannot apply group offloading"):
pipe.model.enable_group_offload(torch_device, offload_type="block_level", num_blocks_per_group=3)
def test_block_level_stream_with_invocation_order_different_from_initialization_order(self):
@@ -371,12 +376,12 @@ class TestGroupOffload:
context = contextlib.nullcontext()
if compare_versions("diffusers", "<=", "0.33.0"):
# Will raise a device mismatch RuntimeError mentioning weights are on CPU but input is on device
context = pytest.raises(RuntimeError, match="Expected all tensors to be on the same device")
context = self.assertRaisesRegex(RuntimeError, "Expected all tensors to be on the same device")
with context:
model(self.input)
@pytest.mark.parametrize("offload_type", ["block_level", "leaf_level"])
@parameterized.expand([("block_level",), ("leaf_level",)])
def test_block_level_offloading_with_parameter_only_module_group(self, offload_type: str):
if torch.device(torch_device).type not in ["cuda", "xpu"]:
return
@@ -402,14 +407,14 @@ class TestGroupOffload:
out_ref = model_ref(x)
out = model(x)
assert torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match."
self.assertTrue(torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match.")
num_repeats = 2
for i in range(num_repeats):
out_ref = model_ref(x)
out = model(x)
assert torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match after multiple invocations."
self.assertTrue(torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match after multiple invocations.")
for (ref_name, ref_module), (name, module) in zip(model_ref.named_modules(), model.named_modules()):
assert ref_name == name
@@ -423,7 +428,9 @@ class TestGroupOffload:
absdiff = diff.abs()
absmax = absdiff.max().item()
cumulated_absmax += absmax
assert cumulated_absmax < 1e-5, f"Output differences for {name} exceeded threshold: {cumulated_absmax:.5f}"
self.assertLess(
cumulated_absmax, 1e-5, f"Output differences for {name} exceeded threshold: {cumulated_absmax:.5f}"
)
def test_vae_like_model_without_streams(self):
"""Test VAE-like model with block-level offloading but without streams."""
@@ -445,7 +452,9 @@ class TestGroupOffload:
out_ref = model_ref(x).sample
out = model(x).sample
assert torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match for VAE-like model without streams."
self.assertTrue(
torch.allclose(out_ref, out, atol=1e-5), "Outputs do not match for VAE-like model without streams."
)
def test_model_with_only_standalone_layers(self):
"""Test that models with only standalone layers (no ModuleList/Sequential) work with block-level offloading."""
@@ -466,11 +475,12 @@ class TestGroupOffload:
for i in range(2):
out_ref = model_ref(x)
out = model(x)
assert torch.allclose(out_ref, out, atol=1e-5), (
f"Outputs do not match at iteration {i} for model with standalone layers."
self.assertTrue(
torch.allclose(out_ref, out, atol=1e-5),
f"Outputs do not match at iteration {i} for model with standalone layers.",
)
@pytest.mark.parametrize("offload_type", ["block_level", "leaf_level"])
@parameterized.expand([("block_level",), ("leaf_level",)])
def test_standalone_conv_layers_with_both_offload_types(self, offload_type: str):
"""Test that standalone Conv2d layers work correctly with both block-level and leaf-level offloading."""
if torch.device(torch_device).type not in ["cuda", "xpu"]:
@@ -491,8 +501,9 @@ class TestGroupOffload:
out_ref = model_ref(x).sample
out = model(x).sample
assert torch.allclose(out_ref, out, atol=1e-5), (
f"Outputs do not match for standalone Conv layers with {offload_type}."
self.assertTrue(
torch.allclose(out_ref, out, atol=1e-5),
f"Outputs do not match for standalone Conv layers with {offload_type}.",
)
def test_multiple_invocations_with_vae_like_model(self):
@@ -515,7 +526,7 @@ class TestGroupOffload:
for i in range(2):
out_ref = model_ref(x).sample
out = model(x).sample
assert torch.allclose(out_ref, out, atol=1e-5), f"Outputs do not match at iteration {i}."
self.assertTrue(torch.allclose(out_ref, out, atol=1e-5), f"Outputs do not match at iteration {i}.")
def test_nested_container_parameters_offloading(self):
"""Test that parameters from non-computational layers in nested containers are handled correctly."""
@@ -536,8 +547,9 @@ class TestGroupOffload:
for i in range(2):
out_ref = model_ref(x)
out = model(x)
assert torch.allclose(out_ref, out, atol=1e-5), (
f"Outputs do not match at iteration {i} for nested parameters."
self.assertTrue(
torch.allclose(out_ref, out, atol=1e-5),
f"Outputs do not match at iteration {i} for nested parameters.",
)
def get_autoencoder_kl_config(self, block_out_channels=None, norm_num_groups=None):
@@ -590,7 +602,7 @@ class DummyModelWithConditionalModules(ModelMixin):
return x
class TestConditionalModuleGroupOffload(TestGroupOffload):
class ConditionalModuleGroupOffloadTests(GroupOffloadTests):
"""Tests for conditionally-executed modules under group offloading with streams.
Regression tests for the case where a module is not executed during the first forward pass
@@ -608,10 +620,10 @@ class TestConditionalModuleGroupOffload(TestGroupOffload):
num_layers=self.num_layers,
)
@pytest.mark.parametrize("offload_type", ["leaf_level", "block_level"])
@pytest.mark.skipif(
@parameterized.expand([("leaf_level",), ("block_level",)])
@unittest.skipIf(
torch.device(torch_device).type not in ["cuda", "xpu"],
reason="Test requires a CUDA or XPU device.",
"Test requires a CUDA or XPU device.",
)
def test_conditional_modules_with_stream(self, offload_type: str):
"""Regression test: conditionally-executed modules must not cause device mismatch when using streams.
@@ -658,20 +670,23 @@ class TestConditionalModuleGroupOffload(TestGroupOffload):
# execution order is traced. optional_proj_1/2 are NOT in the traced order.
out_ref_no_opt = model_ref(x, optional_input=None)
out_no_opt = model(x, optional_input=None)
assert torch.allclose(out_ref_no_opt, out_no_opt, atol=1e-5), (
f"[{offload_type}] Outputs do not match on first pass (no optional_input)."
self.assertTrue(
torch.allclose(out_ref_no_opt, out_no_opt, atol=1e-5),
f"[{offload_type}] Outputs do not match on first pass (no optional_input).",
)
# Second forward pass WITH optional_input — optional_proj_1/2 ARE now called.
out_ref_with_opt = model_ref(x, optional_input=optional_input)
out_with_opt = model(x, optional_input=optional_input)
assert torch.allclose(out_ref_with_opt, out_with_opt, atol=1e-5), (
f"[{offload_type}] Outputs do not match on second pass (with optional_input)."
self.assertTrue(
torch.allclose(out_ref_with_opt, out_with_opt, atol=1e-5),
f"[{offload_type}] Outputs do not match on second pass (with optional_input).",
)
# Third pass again without optional_input — verify stable behavior.
out_ref_no_opt2 = model_ref(x, optional_input=None)
out_no_opt2 = model(x, optional_input=None)
assert torch.allclose(out_ref_no_opt2, out_no_opt2, atol=1e-5), (
f"[{offload_type}] Outputs do not match on third pass (back to no optional_input)."
self.assertTrue(
torch.allclose(out_ref_no_opt2, out_no_opt2, atol=1e-5),
f"[{offload_type}] Outputs do not match on third pass (back to no optional_input).",
)

View File

@@ -1,244 +0,0 @@
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from diffusers import MagCacheConfig, apply_mag_cache
from diffusers.hooks._helpers import TransformerBlockMetadata, TransformerBlockRegistry
from diffusers.models import ModelMixin
from diffusers.utils import logging
logger = logging.get_logger(__name__)
class DummyBlock(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
# Output is double input
# This ensures Residual = 2*Input - Input = Input
return hidden_states * 2.0
class DummyTransformer(ModelMixin):
def __init__(self):
super().__init__()
self.transformer_blocks = torch.nn.ModuleList([DummyBlock(), DummyBlock()])
def forward(self, hidden_states, encoder_hidden_states=None):
for block in self.transformer_blocks:
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
return hidden_states
class TupleOutputBlock(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, hidden_states, encoder_hidden_states=None, **kwargs):
# Returns a tuple
return hidden_states * 2.0, encoder_hidden_states
class TupleTransformer(ModelMixin):
def __init__(self):
super().__init__()
self.transformer_blocks = torch.nn.ModuleList([TupleOutputBlock()])
def forward(self, hidden_states, encoder_hidden_states=None):
for block in self.transformer_blocks:
# Emulate Flux-like behavior
output = block(hidden_states, encoder_hidden_states=encoder_hidden_states)
hidden_states = output[0]
encoder_hidden_states = output[1]
return hidden_states, encoder_hidden_states
class MagCacheTests(unittest.TestCase):
def setUp(self):
# Register standard dummy block
TransformerBlockRegistry.register(
DummyBlock,
TransformerBlockMetadata(return_hidden_states_index=None, return_encoder_hidden_states_index=None),
)
# Register tuple block (Flux style)
TransformerBlockRegistry.register(
TupleOutputBlock,
TransformerBlockMetadata(return_hidden_states_index=0, return_encoder_hidden_states_index=1),
)
def _set_context(self, model, context_name):
"""Helper to set context on all hooks in the model."""
for module in model.modules():
if hasattr(module, "_diffusers_hook"):
module._diffusers_hook._set_context(context_name)
def _get_calibration_data(self, model):
for module in model.modules():
if hasattr(module, "_diffusers_hook"):
hook = module._diffusers_hook.get_hook("mag_cache_block_hook")
if hook:
return hook.state_manager.get_state().calibration_ratios
return []
def test_mag_cache_validation(self):
"""Test that missing mag_ratios raises ValueError."""
with self.assertRaises(ValueError):
MagCacheConfig(num_inference_steps=10, calibrate=False)
def test_mag_cache_skipping_logic(self):
"""
Tests that MagCache correctly calculates residuals and skips blocks when conditions are met.
"""
model = DummyTransformer()
# Dummy ratios: [1.0, 1.0] implies 0 accumulated error if we skip
ratios = np.array([1.0, 1.0])
config = MagCacheConfig(
threshold=100.0,
num_inference_steps=2,
retention_ratio=0.0, # Enable immediate skipping
max_skip_steps=5,
mag_ratios=ratios,
)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0: Input 10.0 -> Output 40.0 (2 blocks * 2x each)
# HeadInput=10. Output=40. Residual=30.
input_t0 = torch.tensor([[[10.0]]])
output_t0 = model(input_t0)
self.assertTrue(torch.allclose(output_t0, torch.tensor([[[40.0]]])), "Step 0 failed")
# Step 1: Input 11.0.
# If Skipped: Output = Input(11) + Residual(30) = 41.0
# If Computed: Output = 11 * 4 = 44.0
input_t1 = torch.tensor([[[11.0]]])
output_t1 = model(input_t1)
self.assertTrue(
torch.allclose(output_t1, torch.tensor([[[41.0]]])), f"Expected Skip (41.0), got {output_t1.item()}"
)
def test_mag_cache_retention(self):
"""Test that retention_ratio prevents skipping even if error is low."""
model = DummyTransformer()
# Ratios that imply 0 error, so it *would* skip if retention allowed it
ratios = np.array([1.0, 1.0])
config = MagCacheConfig(
threshold=100.0,
num_inference_steps=2,
retention_ratio=1.0, # Force retention for ALL steps
mag_ratios=ratios,
)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0
model(torch.tensor([[[10.0]]]))
# Step 1: Should COMPUTE (44.0) not SKIP (41.0) because of retention
input_t1 = torch.tensor([[[11.0]]])
output_t1 = model(input_t1)
self.assertTrue(
torch.allclose(output_t1, torch.tensor([[[44.0]]])),
f"Expected Compute (44.0) due to retention, got {output_t1.item()}",
)
def test_mag_cache_tuple_outputs(self):
"""Test compatibility with models returning (hidden, encoder_hidden) like Flux."""
model = TupleTransformer()
ratios = np.array([1.0, 1.0])
config = MagCacheConfig(threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=ratios)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0: Compute. Input 10.0 -> Output 20.0 (1 block * 2x)
# Residual = 10.0
input_t0 = torch.tensor([[[10.0]]])
enc_t0 = torch.tensor([[[1.0]]])
out_0, _ = model(input_t0, encoder_hidden_states=enc_t0)
self.assertTrue(torch.allclose(out_0, torch.tensor([[[20.0]]])))
# Step 1: Skip. Input 11.0.
# Skipped Output = 11 + 10 = 21.0
input_t1 = torch.tensor([[[11.0]]])
out_1, _ = model(input_t1, encoder_hidden_states=enc_t0)
self.assertTrue(
torch.allclose(out_1, torch.tensor([[[21.0]]])), f"Tuple skip failed. Expected 21.0, got {out_1.item()}"
)
def test_mag_cache_reset(self):
"""Test that state resets correctly after num_inference_steps."""
model = DummyTransformer()
config = MagCacheConfig(
threshold=100.0, num_inference_steps=2, retention_ratio=0.0, mag_ratios=np.array([1.0, 1.0])
)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
input_t = torch.ones(1, 1, 1)
model(input_t) # Step 0
model(input_t) # Step 1 (Skipped)
# Step 2 (Reset -> Step 0) -> Should Compute
# Input 2.0 -> Output 8.0
input_t2 = torch.tensor([[[2.0]]])
output_t2 = model(input_t2)
self.assertTrue(torch.allclose(output_t2, torch.tensor([[[8.0]]])), "State did not reset correctly")
def test_mag_cache_calibration(self):
"""Test that calibration mode records ratios."""
model = DummyTransformer()
config = MagCacheConfig(num_inference_steps=2, calibrate=True)
apply_mag_cache(model, config)
self._set_context(model, "test_context")
# Step 0
# HeadInput = 10. Output = 40. Residual = 30.
# Ratio 0 is placeholder 1.0
model(torch.tensor([[[10.0]]]))
# Check intermediate state
ratios = self._get_calibration_data(model)
self.assertEqual(len(ratios), 1)
self.assertEqual(ratios[0], 1.0)
# Step 1
# HeadInput = 10. Output = 40. Residual = 30.
# PrevResidual = 30. CurrResidual = 30.
# Ratio = 30/30 = 1.0
model(torch.tensor([[[10.0]]]))
# Verify it computes fully (no skip)
# If it skipped, output would be 41.0. It should be 40.0
# Actually in test setup, input is same (10.0) so output 40.0.
# Let's ensure list is empty after reset (end of step 1)
ratios_after = self._get_calibration_data(model)
self.assertEqual(ratios_after, [])

View File

@@ -5,8 +5,12 @@ from .cache import (
FasterCacheTesterMixin,
FirstBlockCacheConfigMixin,
FirstBlockCacheTesterMixin,
MagCacheConfigMixin,
MagCacheTesterMixin,
PyramidAttentionBroadcastConfigMixin,
PyramidAttentionBroadcastTesterMixin,
TaylorSeerCacheConfigMixin,
TaylorSeerCacheTesterMixin,
)
from .common import BaseModelTesterConfig, ModelTesterMixin
from .compile import TorchCompileTesterMixin
@@ -50,6 +54,8 @@ __all__ = [
"FasterCacheTesterMixin",
"FirstBlockCacheConfigMixin",
"FirstBlockCacheTesterMixin",
"MagCacheConfigMixin",
"MagCacheTesterMixin",
"GGUFCompileTesterMixin",
"GGUFConfigMixin",
"GGUFTesterMixin",
@@ -65,6 +71,8 @@ __all__ = [
"ModelTesterMixin",
"PyramidAttentionBroadcastConfigMixin",
"PyramidAttentionBroadcastTesterMixin",
"TaylorSeerCacheConfigMixin",
"TaylorSeerCacheTesterMixin",
"QuantizationCompileTesterMixin",
"QuantizationTesterMixin",
"QuantoCompileTesterMixin",

View File

@@ -18,10 +18,18 @@ import gc
import pytest
import torch
from diffusers.hooks import FasterCacheConfig, FirstBlockCacheConfig, PyramidAttentionBroadcastConfig
from diffusers.hooks import (
FasterCacheConfig,
FirstBlockCacheConfig,
MagCacheConfig,
PyramidAttentionBroadcastConfig,
TaylorSeerCacheConfig,
)
from diffusers.hooks.faster_cache import _FASTER_CACHE_BLOCK_HOOK, _FASTER_CACHE_DENOISER_HOOK
from diffusers.hooks.first_block_cache import _FBC_BLOCK_HOOK, _FBC_LEADER_BLOCK_HOOK
from diffusers.hooks.mag_cache import _MAG_CACHE_BLOCK_HOOK, _MAG_CACHE_LEADER_BLOCK_HOOK
from diffusers.hooks.pyramid_attention_broadcast import _PYRAMID_ATTENTION_BROADCAST_HOOK
from diffusers.hooks.taylorseer_cache import _TAYLORSEER_CACHE_HOOK
from diffusers.models.cache_utils import CacheMixin
from ...testing_utils import assert_tensors_close, backend_empty_cache, is_cache, torch_device
@@ -554,3 +562,192 @@ class FasterCacheTesterMixin(FasterCacheConfigMixin, CacheTesterMixin):
@require_cache_mixin
def test_faster_cache_reset_stateful_cache(self):
self._test_reset_stateful_cache()
@is_cache
class MagCacheConfigMixin:
"""
Base mixin providing MagCache config.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
"""
# Default MagCache config - can be overridden by subclasses.
# Uses neutral ratios [1.0, 1.0] and a high threshold so the second
# inference step is always skipped, which is required by _test_cache_inference.
MAG_CACHE_CONFIG = {
"num_inference_steps": 2,
"retention_ratio": 0.0,
"threshold": 100.0,
"mag_ratios": [1.0, 1.0],
}
def _get_cache_config(self):
return MagCacheConfig(**self.MAG_CACHE_CONFIG)
def _get_hook_names(self):
return [_MAG_CACHE_LEADER_BLOCK_HOOK, _MAG_CACHE_BLOCK_HOOK]
@is_cache
class MagCacheTesterMixin(MagCacheConfigMixin, CacheTesterMixin):
"""
Mixin class for testing MagCache on models.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
Expected methods to be implemented by subclasses:
- get_init_dict(): Returns dict of arguments to initialize the model
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
Pytest mark: cache
Use `pytest -m "not cache"` to skip these tests
"""
@require_cache_mixin
def test_mag_cache_enable_disable_state(self):
self._test_cache_enable_disable_state()
@require_cache_mixin
def test_mag_cache_double_enable_raises_error(self):
self._test_cache_double_enable_raises_error()
@require_cache_mixin
def test_mag_cache_hooks_registered(self):
self._test_cache_hooks_registered()
@require_cache_mixin
def test_mag_cache_inference(self):
self._test_cache_inference()
@require_cache_mixin
def test_mag_cache_context_manager(self):
self._test_cache_context_manager()
@require_cache_mixin
def test_mag_cache_reset_stateful_cache(self):
self._test_reset_stateful_cache()
@is_cache
class TaylorSeerCacheConfigMixin:
"""
Base mixin providing TaylorSeerCache config.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
"""
# Default TaylorSeerCache config - can be overridden by subclasses.
# Uses a low cache_interval and disable_cache_before_step=0 so the second
# inference step is always predicted, which is required by _test_cache_inference.
TAYLORSEER_CACHE_CONFIG = {
"cache_interval": 3,
"disable_cache_before_step": 1,
"max_order": 1,
}
def _get_cache_config(self):
return TaylorSeerCacheConfig(**self.TAYLORSEER_CACHE_CONFIG)
def _get_hook_names(self):
return [_TAYLORSEER_CACHE_HOOK]
@is_cache
class TaylorSeerCacheTesterMixin(TaylorSeerCacheConfigMixin, CacheTesterMixin):
"""
Mixin class for testing TaylorSeerCache on models.
Expected class attributes:
- model_class: The model class to test (must use CacheMixin)
Expected methods to be implemented by subclasses:
- get_init_dict(): Returns dict of arguments to initialize the model
- get_dummy_inputs(): Returns dict of inputs to pass to the model forward pass
Pytest mark: cache
Use `pytest -m "not cache"` to skip these tests
"""
@torch.no_grad()
def _test_cache_inference(self):
"""Test that model can run inference with TaylorSeer cache enabled (requires cache_context)."""
init_dict = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**init_dict).to(torch_device)
model.eval()
config = self._get_cache_config()
model.enable_cache(config)
# TaylorSeer requires cache_context to be set for inference
with model.cache_context("taylorseer_test"):
# First pass populates the cache
_ = model(**inputs_dict, return_dict=False)[0]
# Create modified inputs for second pass
inputs_dict_step2 = inputs_dict.copy()
if self.cache_input_key in inputs_dict_step2:
inputs_dict_step2[self.cache_input_key] = inputs_dict_step2[self.cache_input_key] + torch.randn_like(
inputs_dict_step2[self.cache_input_key]
)
# Second pass - TaylorSeer should use cached Taylor series predictions
output_with_cache = model(**inputs_dict_step2, return_dict=False)[0]
assert output_with_cache is not None, "Model output should not be None with cache enabled."
assert not torch.isnan(output_with_cache).any(), "Model output contains NaN with cache enabled."
# Run same inputs without cache to compare
model.disable_cache()
output_without_cache = model(**inputs_dict_step2, return_dict=False)[0]
# Cached output should be different from non-cached output (due to approximation)
assert not torch.allclose(output_without_cache, output_with_cache, atol=1e-5), (
"Cached output should be different from non-cached output due to cache approximation."
)
@torch.no_grad()
def _test_reset_stateful_cache(self):
"""Test that _reset_stateful_cache resets the TaylorSeer cache state (requires cache_context)."""
init_dict = self.get_init_dict()
inputs_dict = self.get_dummy_inputs()
model = self.model_class(**init_dict).to(torch_device)
model.eval()
config = self._get_cache_config()
model.enable_cache(config)
with model.cache_context("taylorseer_test"):
_ = model(**inputs_dict, return_dict=False)[0]
model._reset_stateful_cache()
model.disable_cache()
@require_cache_mixin
def test_taylorseer_cache_enable_disable_state(self):
self._test_cache_enable_disable_state()
@require_cache_mixin
def test_taylorseer_cache_double_enable_raises_error(self):
self._test_cache_double_enable_raises_error()
@require_cache_mixin
def test_taylorseer_cache_hooks_registered(self):
self._test_cache_hooks_registered()
@require_cache_mixin
def test_taylorseer_cache_inference(self):
self._test_cache_inference()
@require_cache_mixin
def test_taylorseer_cache_context_manager(self):
self._test_cache_context_manager()
@require_cache_mixin
def test_taylorseer_cache_reset_stateful_cache(self):
self._test_reset_stateful_cache()

View File

@@ -37,6 +37,7 @@ from ..testing_utils import (
IPAdapterTesterMixin,
LoraHotSwappingForModelTesterMixin,
LoraTesterMixin,
MagCacheTesterMixin,
MemoryTesterMixin,
ModelOptCompileTesterMixin,
ModelOptTesterMixin,
@@ -45,6 +46,7 @@ from ..testing_utils import (
QuantoCompileTesterMixin,
QuantoTesterMixin,
SingleFileTesterMixin,
TaylorSeerCacheTesterMixin,
TorchAoCompileTesterMixin,
TorchAoTesterMixin,
TorchCompileTesterMixin,
@@ -430,3 +432,11 @@ class TestFluxTransformerFasterCache(FluxTransformerTesterConfig, FasterCacheTes
"tensor_format": "BCHW",
"is_guidance_distilled": True,
}
class TestFluxTransformerMagCache(FluxTransformerTesterConfig, MagCacheTesterMixin):
"""MagCache tests for Flux Transformer."""
class TestFluxTransformerTaylorSeerCache(FluxTransformerTesterConfig, TaylorSeerCacheTesterMixin):
"""TaylorSeerCache tests for Flux Transformer."""

View File

@@ -0,0 +1,166 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from diffusers.modular_pipelines import (
HeliosAutoBlocks,
HeliosModularPipeline,
HeliosPyramidAutoBlocks,
HeliosPyramidModularPipeline,
)
from ..test_modular_pipelines_common import ModularPipelineTesterMixin
HELIOS_WORKFLOWS = {
"text2video": [
("text_encoder", "HeliosTextEncoderStep"),
("denoise.input", "HeliosTextInputStep"),
("denoise.prepare_history", "HeliosPrepareHistoryStep"),
("denoise.set_timesteps", "HeliosSetTimestepsStep"),
("denoise.chunk_denoise", "HeliosChunkDenoiseStep"),
("decode", "HeliosDecodeStep"),
],
"image2video": [
("text_encoder", "HeliosTextEncoderStep"),
("vae_encoder", "HeliosImageVaeEncoderStep"),
("denoise.input", "HeliosTextInputStep"),
("denoise.additional_inputs", "HeliosAdditionalInputsStep"),
("denoise.add_noise_image", "HeliosAddNoiseToImageLatentsStep"),
("denoise.prepare_history", "HeliosPrepareHistoryStep"),
("denoise.seed_history", "HeliosI2VSeedHistoryStep"),
("denoise.set_timesteps", "HeliosSetTimestepsStep"),
("denoise.chunk_denoise", "HeliosI2VChunkDenoiseStep"),
("decode", "HeliosDecodeStep"),
],
"video2video": [
("text_encoder", "HeliosTextEncoderStep"),
("vae_encoder", "HeliosVideoVaeEncoderStep"),
("denoise.input", "HeliosTextInputStep"),
("denoise.additional_inputs", "HeliosAdditionalInputsStep"),
("denoise.add_noise_video", "HeliosAddNoiseToVideoLatentsStep"),
("denoise.prepare_history", "HeliosPrepareHistoryStep"),
("denoise.seed_history", "HeliosV2VSeedHistoryStep"),
("denoise.set_timesteps", "HeliosSetTimestepsStep"),
("denoise.chunk_denoise", "HeliosI2VChunkDenoiseStep"),
("decode", "HeliosDecodeStep"),
],
}
class TestHeliosModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = HeliosModularPipeline
pipeline_blocks_class = HeliosAutoBlocks
pretrained_model_name_or_path = "hf-internal-testing/tiny-helios-modular-pipe"
params = frozenset(["prompt", "height", "width", "num_frames"])
batch_params = frozenset(["prompt"])
optional_params = frozenset(["num_inference_steps", "num_videos_per_prompt", "latents"])
output_name = "videos"
expected_workflow_blocks = HELIOS_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"generator": generator,
"num_inference_steps": 2,
"height": 16,
"width": 16,
"num_frames": 9,
"max_sequence_length": 16,
"output_type": "pt",
}
return inputs
@pytest.mark.skip(reason="num_videos_per_prompt")
def test_num_images_per_prompt(self):
pass
HELIOS_PYRAMID_WORKFLOWS = {
"text2video": [
("text_encoder", "HeliosTextEncoderStep"),
("denoise.input", "HeliosTextInputStep"),
("denoise.prepare_history", "HeliosPrepareHistoryStep"),
("denoise.pyramid_chunk_denoise", "HeliosPyramidChunkDenoiseStep"),
("decode", "HeliosDecodeStep"),
],
"image2video": [
("text_encoder", "HeliosTextEncoderStep"),
("vae_encoder", "HeliosImageVaeEncoderStep"),
("denoise.input", "HeliosTextInputStep"),
("denoise.additional_inputs", "HeliosAdditionalInputsStep"),
("denoise.add_noise_image", "HeliosAddNoiseToImageLatentsStep"),
("denoise.prepare_history", "HeliosPrepareHistoryStep"),
("denoise.seed_history", "HeliosI2VSeedHistoryStep"),
("denoise.pyramid_chunk_denoise", "HeliosPyramidI2VChunkDenoiseStep"),
("decode", "HeliosDecodeStep"),
],
"video2video": [
("text_encoder", "HeliosTextEncoderStep"),
("vae_encoder", "HeliosVideoVaeEncoderStep"),
("denoise.input", "HeliosTextInputStep"),
("denoise.additional_inputs", "HeliosAdditionalInputsStep"),
("denoise.add_noise_video", "HeliosAddNoiseToVideoLatentsStep"),
("denoise.prepare_history", "HeliosPrepareHistoryStep"),
("denoise.seed_history", "HeliosV2VSeedHistoryStep"),
("denoise.pyramid_chunk_denoise", "HeliosPyramidI2VChunkDenoiseStep"),
("decode", "HeliosDecodeStep"),
],
}
class TestHeliosPyramidModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = HeliosPyramidModularPipeline
pipeline_blocks_class = HeliosPyramidAutoBlocks
pretrained_model_name_or_path = "hf-internal-testing/tiny-helios-pyramid-modular-pipe"
params = frozenset(["prompt", "height", "width", "num_frames"])
batch_params = frozenset(["prompt"])
optional_params = frozenset(["pyramid_num_inference_steps_list", "num_videos_per_prompt", "latents"])
output_name = "videos"
expected_workflow_blocks = HELIOS_PYRAMID_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"generator": generator,
"pyramid_num_inference_steps_list": [2, 2],
"height": 64,
"width": 64,
"num_frames": 9,
"max_sequence_length": 16,
"output_type": "pt",
}
return inputs
def test_inference_batch_single_identical(self):
# Pyramid pipeline injects noise at each stage, so batch vs single can differ more
super().test_inference_batch_single_identical(expected_max_diff=5e-1)
@pytest.mark.skip(reason="Pyramid multi-stage noise makes offload comparison unreliable with tiny models")
def test_components_auto_cpu_offload_inference_consistent(self):
pass
@pytest.mark.skip(reason="Pyramid multi-stage noise makes save/load comparison unreliable with tiny models")
def test_save_from_pretrained(self):
pass
@pytest.mark.skip(reason="num_videos_per_prompt")
def test_num_images_per_prompt(self):
pass