Compare commits

..

21 Commits

Author SHA1 Message Date
Sayak Paul
60e3284003 Merge branch 'main' into requirements-custom-blocks 2026-01-20 19:10:24 +05:30
sayakpaul
7b43d0e409 add tests 2026-01-20 09:29:32 +05:30
Sayak Paul
3879e32254 Merge branch 'main' into requirements-custom-blocks 2026-01-20 08:20:38 +05:30
sayakpaul
a88d11bc90 resolve conflicts. 2025-11-06 10:29:24 +05:30
Sayak Paul
a9165eb749 Merge branch 'main' into requirements-custom-blocks 2025-11-03 12:12:08 +05:30
Sayak Paul
eeb3445444 Merge branch 'main' into requirements-custom-blocks 2025-11-01 08:36:16 +05:30
Sayak Paul
5b7d0dfab6 Merge branch 'main' into requirements-custom-blocks 2025-10-29 16:30:46 +05:30
sayakpaul
1de4402c26 up 2025-10-27 13:55:17 +05:30
sayakpaul
024c2b9839 Merge branch 'main' into requirements-custom-blocks 2025-10-27 11:56:00 +05:30
Sayak Paul
35d8d97c02 Merge branch 'main' into requirements-custom-blocks 2025-10-22 21:57:45 +05:30
Sayak Paul
e52cabeff2 Merge branch 'main' into requirements-custom-blocks 2025-10-22 06:23:40 +05:30
Sayak Paul
2c4d73d72d Merge branch 'main' into requirements-custom-blocks 2025-10-21 01:54:38 +05:30
sayakpaul
046be83946 up 2025-10-02 15:43:44 +05:30
Sayak Paul
b7fba892f5 Merge branch 'main' into requirements-custom-blocks 2025-09-23 13:35:49 +05:30
Sayak Paul
ecbd907e76 Merge branch 'main' into requirements-custom-blocks 2025-09-12 15:47:22 +05:30
Sayak Paul
d159ae025d Merge branch 'main' into requirements-custom-blocks 2025-09-02 10:04:22 +05:30
Sayak Paul
756a1567f5 Merge branch 'main' into requirements-custom-blocks 2025-08-29 08:03:00 +02:00
Sayak Paul
d2731ababa Merge branch 'main' into requirements-custom-blocks 2025-08-21 07:59:54 +05:30
sayakpaul
37d3887194 unify. 2025-08-20 12:09:33 +05:30
sayakpaul
127e9a39d8 up 2025-08-20 11:51:15 +05:30
sayakpaul
12ceecf077 feat: implement requirements validation for custom blocks. 2025-08-20 11:04:28 +05:30
44 changed files with 392 additions and 1588 deletions

View File

@@ -413,9 +413,6 @@ else:
_import_structure["modular_pipelines"].extend(
[
"Flux2AutoBlocks",
"Flux2KleinAutoBlocks",
"Flux2KleinBaseAutoBlocks",
"Flux2KleinModularPipeline",
"Flux2ModularPipeline",
"FluxAutoBlocks",
"FluxKontextAutoBlocks",
@@ -1149,9 +1146,6 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
else:
from .modular_pipelines import (
Flux2AutoBlocks,
Flux2KleinAutoBlocks,
Flux2KleinBaseAutoBlocks,
Flux2KleinModularPipeline,
Flux2ModularPipeline,
FluxAutoBlocks,
FluxKontextAutoBlocks,

View File

@@ -89,8 +89,6 @@ class CustomBlocksCommand(BaseDiffusersCLICommand):
# automap = self._create_automap(parent_class=parent_class, child_class=child_class)
# with open(CONFIG, "w") as f:
# json.dump(automap, f)
with open("requirements.txt", "w") as f:
f.write("")
def _choose_block(self, candidates, chosen=None):
for cls, base in candidates:

View File

@@ -54,10 +54,7 @@ else:
]
_import_structure["flux2"] = [
"Flux2AutoBlocks",
"Flux2KleinAutoBlocks",
"Flux2KleinBaseAutoBlocks",
"Flux2ModularPipeline",
"Flux2KleinModularPipeline",
]
_import_structure["qwenimage"] = [
"QwenImageAutoBlocks",
@@ -84,13 +81,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
else:
from .components_manager import ComponentsManager
from .flux import FluxAutoBlocks, FluxKontextAutoBlocks, FluxKontextModularPipeline, FluxModularPipeline
from .flux2 import (
Flux2AutoBlocks,
Flux2KleinAutoBlocks,
Flux2KleinBaseAutoBlocks,
Flux2KleinModularPipeline,
Flux2ModularPipeline,
)
from .flux2 import Flux2AutoBlocks, Flux2ModularPipeline
from .modular_pipeline import (
AutoPipelineBlocks,
BlockState,

View File

@@ -43,7 +43,7 @@ else:
"Flux2ProcessImagesInputStep",
"Flux2TextInputStep",
]
_import_structure["modular_blocks_flux2"] = [
_import_structure["modular_blocks"] = [
"ALL_BLOCKS",
"AUTO_BLOCKS",
"REMOTE_AUTO_BLOCKS",
@@ -51,11 +51,10 @@ else:
"IMAGE_CONDITIONED_BLOCKS",
"Flux2AutoBlocks",
"Flux2AutoVaeEncoderStep",
"Flux2CoreDenoiseStep",
"Flux2BeforeDenoiseStep",
"Flux2VaeEncoderSequentialStep",
]
_import_structure["modular_blocks_flux2_klein"] = ["Flux2KleinAutoBlocks", "Flux2KleinBaseAutoBlocks"]
_import_structure["modular_pipeline"] = ["Flux2ModularPipeline", "Flux2KleinModularPipeline"]
_import_structure["modular_pipeline"] = ["Flux2ModularPipeline"]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
@@ -86,7 +85,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
Flux2ProcessImagesInputStep,
Flux2TextInputStep,
)
from .modular_blocks_flux2 import (
from .modular_blocks import (
ALL_BLOCKS,
AUTO_BLOCKS,
IMAGE_CONDITIONED_BLOCKS,
@@ -94,14 +93,10 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
TEXT2IMAGE_BLOCKS,
Flux2AutoBlocks,
Flux2AutoVaeEncoderStep,
Flux2CoreDenoiseStep,
Flux2BeforeDenoiseStep,
Flux2VaeEncoderSequentialStep,
)
from .modular_blocks_flux2_klein import (
Flux2KleinAutoBlocks,
Flux2KleinBaseAutoBlocks,
)
from .modular_pipeline import Flux2KleinModularPipeline, Flux2ModularPipeline
from .modular_pipeline import Flux2ModularPipeline
else:
import sys

View File

@@ -129,9 +129,17 @@ class Flux2SetTimestepsStep(ModularPipelineBlocks):
InputParam("num_inference_steps", default=50),
InputParam("timesteps"),
InputParam("sigmas"),
InputParam("guidance_scale", default=4.0),
InputParam("latents", type_hint=torch.Tensor),
InputParam("num_images_per_prompt", default=1),
InputParam("height", type_hint=int),
InputParam("width", type_hint=int),
InputParam(
"batch_size",
required=True,
type_hint=int,
description="Number of prompts, the final batch size of model inputs should be `batch_size * num_images_per_prompt`.",
),
]
@property
@@ -143,12 +151,13 @@ class Flux2SetTimestepsStep(ModularPipelineBlocks):
type_hint=int,
description="The number of denoising steps to perform at inference time",
),
OutputParam("guidance", type_hint=torch.Tensor, description="Guidance scale tensor"),
]
@torch.no_grad()
def __call__(self, components: Flux2ModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
device = components._execution_device
block_state.device = components._execution_device
scheduler = components.scheduler
@@ -174,7 +183,7 @@ class Flux2SetTimestepsStep(ModularPipelineBlocks):
timesteps, num_inference_steps = retrieve_timesteps(
scheduler,
num_inference_steps,
device,
block_state.device,
timesteps=timesteps,
sigmas=sigmas,
mu=mu,
@@ -182,6 +191,11 @@ class Flux2SetTimestepsStep(ModularPipelineBlocks):
block_state.timesteps = timesteps
block_state.num_inference_steps = num_inference_steps
batch_size = block_state.batch_size * block_state.num_images_per_prompt
guidance = torch.full([1], block_state.guidance_scale, device=block_state.device, dtype=torch.float32)
guidance = guidance.expand(batch_size)
block_state.guidance = guidance
components.scheduler.set_begin_index(0)
self.set_block_state(state, block_state)
@@ -339,6 +353,7 @@ class Flux2RoPEInputsStep(ModularPipelineBlocks):
def inputs(self) -> List[InputParam]:
return [
InputParam(name="prompt_embeds", required=True),
InputParam(name="latent_ids"),
]
@property
@@ -350,6 +365,12 @@ class Flux2RoPEInputsStep(ModularPipelineBlocks):
type_hint=torch.Tensor,
description="4D position IDs (T, H, W, L) for text tokens, used for RoPE calculation.",
),
OutputParam(
name="latent_ids",
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="4D position IDs (T, H, W, L) for image latents, used for RoPE calculation.",
),
]
@staticmethod
@@ -382,72 +403,6 @@ class Flux2RoPEInputsStep(ModularPipelineBlocks):
return components, state
class Flux2KleinBaseRoPEInputsStep(ModularPipelineBlocks):
model_name = "flux2-klein"
@property
def description(self) -> str:
return "Step that prepares the 4D RoPE position IDs for Flux2-Klein base model denoising. Should be placed after text encoder and latent preparation steps."
@property
def inputs(self) -> List[InputParam]:
return [
InputParam(name="prompt_embeds", required=True),
InputParam(name="negative_prompt_embeds", required=False),
]
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
name="txt_ids",
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="4D position IDs (T, H, W, L) for text tokens, used for RoPE calculation.",
),
OutputParam(
name="negative_txt_ids",
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="4D position IDs (T, H, W, L) for negative text tokens, used for RoPE calculation.",
),
]
@staticmethod
def _prepare_text_ids(x: torch.Tensor, t_coord: Optional[torch.Tensor] = None):
"""Prepare 4D position IDs for text tokens."""
B, L, _ = x.shape
out_ids = []
for i in range(B):
t = torch.arange(1) if t_coord is None else t_coord[i]
h = torch.arange(1)
w = torch.arange(1)
seq_l = torch.arange(L)
coords = torch.cartesian_prod(t, h, w, seq_l)
out_ids.append(coords)
return torch.stack(out_ids)
def __call__(self, components: Flux2ModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
prompt_embeds = block_state.prompt_embeds
device = prompt_embeds.device
block_state.txt_ids = self._prepare_text_ids(prompt_embeds)
block_state.txt_ids = block_state.txt_ids.to(device)
block_state.negative_txt_ids = None
if block_state.negative_prompt_embeds is not None:
block_state.negative_txt_ids = self._prepare_text_ids(block_state.negative_prompt_embeds)
block_state.negative_txt_ids = block_state.negative_txt_ids.to(device)
self.set_block_state(state, block_state)
return components, state
class Flux2PrepareImageLatentsStep(ModularPipelineBlocks):
model_name = "flux2"
@@ -551,42 +506,3 @@ class Flux2PrepareImageLatentsStep(ModularPipelineBlocks):
self.set_block_state(state, block_state)
return components, state
class Flux2PrepareGuidanceStep(ModularPipelineBlocks):
model_name = "flux2"
@property
def description(self) -> str:
return "Step that prepares the guidance scale tensor for Flux2 inference"
@property
def inputs(self) -> List[InputParam]:
return [
InputParam("guidance_scale", default=4.0),
InputParam("num_images_per_prompt", default=1),
InputParam(
"batch_size",
required=True,
type_hint=int,
description="Number of prompts, the final batch size of model inputs should be `batch_size * num_images_per_prompt`.",
),
]
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam("guidance", type_hint=torch.Tensor, description="Guidance scale tensor"),
]
@torch.no_grad()
def __call__(self, components: Flux2ModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
device = components._execution_device
batch_size = block_state.batch_size * block_state.num_images_per_prompt
guidance = torch.full([1], block_state.guidance_scale, device=device, dtype=torch.float32)
guidance = guidance.expand(batch_size)
block_state.guidance = guidance
self.set_block_state(state, block_state)
return components, state

View File

@@ -29,16 +29,29 @@ from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
class Flux2UnpackLatentsStep(ModularPipelineBlocks):
class Flux2DecodeStep(ModularPipelineBlocks):
model_name = "flux2"
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec("vae", AutoencoderKLFlux2),
ComponentSpec(
"image_processor",
Flux2ImageProcessor,
config=FrozenDict({"vae_scale_factor": 16, "vae_latent_channels": 32}),
default_creation_method="from_config",
),
]
@property
def description(self) -> str:
return "Step that unpacks the latents from the denoising step"
return "Step that decodes the denoised latents into images using Flux2 VAE with batch norm denormalization"
@property
def inputs(self) -> List[Tuple[str, Any]]:
return [
InputParam("output_type", default="pil"),
InputParam(
"latents",
required=True,
@@ -57,9 +70,9 @@ class Flux2UnpackLatentsStep(ModularPipelineBlocks):
def intermediate_outputs(self) -> List[str]:
return [
OutputParam(
"latents",
type_hint=torch.Tensor,
description="The denoise latents from denoising step, unpacked with position IDs.",
"images",
type_hint=Union[List[PIL.Image.Image], torch.Tensor, np.ndarray],
description="The generated images, can be a list of PIL.Image.Image, torch.Tensor or a numpy array",
)
]
@@ -94,62 +107,6 @@ class Flux2UnpackLatentsStep(ModularPipelineBlocks):
return torch.stack(x_list, dim=0)
@torch.no_grad()
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
latents = block_state.latents
latent_ids = block_state.latent_ids
latents = self._unpack_latents_with_ids(latents, latent_ids)
block_state.latents = latents
self.set_block_state(state, block_state)
return components, state
class Flux2DecodeStep(ModularPipelineBlocks):
model_name = "flux2"
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec("vae", AutoencoderKLFlux2),
ComponentSpec(
"image_processor",
Flux2ImageProcessor,
config=FrozenDict({"vae_scale_factor": 16, "vae_latent_channels": 32}),
default_creation_method="from_config",
),
]
@property
def description(self) -> str:
return "Step that decodes the denoised latents into images using Flux2 VAE with batch norm denormalization"
@property
def inputs(self) -> List[Tuple[str, Any]]:
return [
InputParam("output_type", default="pil"),
InputParam(
"latents",
required=True,
type_hint=torch.Tensor,
description="The denoised latents from the denoising step",
),
]
@property
def intermediate_outputs(self) -> List[str]:
return [
OutputParam(
"images",
type_hint=Union[List[PIL.Image.Image], torch.Tensor, np.ndarray],
description="The generated images, can be a list of PIL.Image.Image, torch.Tensor or a numpy array",
)
]
@staticmethod
def _unpatchify_latents(latents):
"""Convert patchified latents back to regular format."""
@@ -164,20 +121,26 @@ class Flux2DecodeStep(ModularPipelineBlocks):
block_state = self.get_block_state(state)
vae = components.vae
latents = block_state.latents
if block_state.output_type == "latent":
block_state.images = block_state.latents
else:
latents = block_state.latents
latent_ids = block_state.latent_ids
latents_bn_mean = vae.bn.running_mean.view(1, -1, 1, 1).to(latents.device, latents.dtype)
latents_bn_std = torch.sqrt(vae.bn.running_var.view(1, -1, 1, 1) + vae.config.batch_norm_eps).to(
latents.device, latents.dtype
)
latents = latents * latents_bn_std + latents_bn_mean
latents = self._unpack_latents_with_ids(latents, latent_ids)
latents = self._unpatchify_latents(latents)
latents_bn_mean = vae.bn.running_mean.view(1, -1, 1, 1).to(latents.device, latents.dtype)
latents_bn_std = torch.sqrt(vae.bn.running_var.view(1, -1, 1, 1) + vae.config.batch_norm_eps).to(
latents.device, latents.dtype
)
latents = latents * latents_bn_std + latents_bn_mean
block_state.images = vae.decode(latents, return_dict=False)[0]
block_state.images = components.image_processor.postprocess(
block_state.images, output_type=block_state.output_type
)
latents = self._unpatchify_latents(latents)
block_state.images = vae.decode(latents, return_dict=False)[0]
block_state.images = components.image_processor.postprocess(
block_state.images, output_type=block_state.output_type
)
self.set_block_state(state, block_state)
return components, state

View File

@@ -16,8 +16,6 @@ from typing import Any, List, Tuple
import torch
from ...configuration_utils import FrozenDict
from ...guiders import ClassifierFreeGuidance
from ...models import Flux2Transformer2DModel
from ...schedulers import FlowMatchEulerDiscreteScheduler
from ...utils import is_torch_xla_available, logging
@@ -27,8 +25,8 @@ from ..modular_pipeline import (
ModularPipelineBlocks,
PipelineState,
)
from ..modular_pipeline_utils import ComponentSpec, ConfigSpec, InputParam, OutputParam
from .modular_pipeline import Flux2KleinModularPipeline, Flux2ModularPipeline
from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam
from .modular_pipeline import Flux2ModularPipeline
if is_torch_xla_available():
@@ -136,229 +134,6 @@ class Flux2LoopDenoiser(ModularPipelineBlocks):
return components, block_state
# same as Flux2LoopDenoiser but guidance=None
class Flux2KleinLoopDenoiser(ModularPipelineBlocks):
model_name = "flux2-klein"
@property
def expected_components(self) -> List[ComponentSpec]:
return [ComponentSpec("transformer", Flux2Transformer2DModel)]
@property
def description(self) -> str:
return (
"Step within the denoising loop that denoises the latents for Flux2. "
"This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` "
"object (e.g. `Flux2DenoiseLoopWrapper`)"
)
@property
def inputs(self) -> List[Tuple[str, Any]]:
return [
InputParam("joint_attention_kwargs"),
InputParam(
"latents",
required=True,
type_hint=torch.Tensor,
description="The latents to denoise. Shape: (B, seq_len, C)",
),
InputParam(
"image_latents",
type_hint=torch.Tensor,
description="Packed image latents for conditioning. Shape: (B, img_seq_len, C)",
),
InputParam(
"image_latent_ids",
type_hint=torch.Tensor,
description="Position IDs for image latents. Shape: (B, img_seq_len, 4)",
),
InputParam(
"prompt_embeds",
required=True,
type_hint=torch.Tensor,
description="Text embeddings from Qwen3",
),
InputParam(
"txt_ids",
required=True,
type_hint=torch.Tensor,
description="4D position IDs for text tokens (T, H, W, L)",
),
InputParam(
"latent_ids",
required=True,
type_hint=torch.Tensor,
description="4D position IDs for latent tokens (T, H, W, L)",
),
]
@torch.no_grad()
def __call__(
self, components: Flux2KleinModularPipeline, block_state: BlockState, i: int, t: torch.Tensor
) -> PipelineState:
latents = block_state.latents
latent_model_input = latents.to(components.transformer.dtype)
img_ids = block_state.latent_ids
image_latents = getattr(block_state, "image_latents", None)
if image_latents is not None:
latent_model_input = torch.cat([latents, image_latents], dim=1).to(components.transformer.dtype)
image_latent_ids = block_state.image_latent_ids
img_ids = torch.cat([img_ids, image_latent_ids], dim=1)
timestep = t.expand(latents.shape[0]).to(latents.dtype)
noise_pred = components.transformer(
hidden_states=latent_model_input,
timestep=timestep / 1000,
guidance=None,
encoder_hidden_states=block_state.prompt_embeds,
txt_ids=block_state.txt_ids,
img_ids=img_ids,
joint_attention_kwargs=block_state.joint_attention_kwargs,
return_dict=False,
)[0]
noise_pred = noise_pred[:, : latents.size(1)]
block_state.noise_pred = noise_pred
return components, block_state
# support CFG for Flux2-Klein base model
class Flux2KleinBaseLoopDenoiser(ModularPipelineBlocks):
model_name = "flux2-klein"
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec("transformer", Flux2Transformer2DModel),
ComponentSpec(
"guider",
ClassifierFreeGuidance,
config=FrozenDict({"guidance_scale": 4.0}),
default_creation_method="from_config",
),
]
@property
def expected_configs(self) -> List[ConfigSpec]:
return [
ConfigSpec(name="is_distilled", default=False),
]
@property
def description(self) -> str:
return (
"Step within the denoising loop that denoises the latents for Flux2. "
"This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` "
"object (e.g. `Flux2DenoiseLoopWrapper`)"
)
@property
def inputs(self) -> List[Tuple[str, Any]]:
return [
InputParam("joint_attention_kwargs"),
InputParam(
"latents",
required=True,
type_hint=torch.Tensor,
description="The latents to denoise. Shape: (B, seq_len, C)",
),
InputParam(
"image_latents",
type_hint=torch.Tensor,
description="Packed image latents for conditioning. Shape: (B, img_seq_len, C)",
),
InputParam(
"image_latent_ids",
type_hint=torch.Tensor,
description="Position IDs for image latents. Shape: (B, img_seq_len, 4)",
),
InputParam(
"prompt_embeds",
required=True,
type_hint=torch.Tensor,
description="Text embeddings from Qwen3",
),
InputParam(
"negative_prompt_embeds",
required=False,
type_hint=torch.Tensor,
description="Negative text embeddings from Qwen3",
),
InputParam(
"txt_ids",
required=True,
type_hint=torch.Tensor,
description="4D position IDs for text tokens (T, H, W, L)",
),
InputParam(
"negative_txt_ids",
required=False,
type_hint=torch.Tensor,
description="4D position IDs for negative text tokens (T, H, W, L)",
),
InputParam(
"latent_ids",
required=True,
type_hint=torch.Tensor,
description="4D position IDs for latent tokens (T, H, W, L)",
),
]
@torch.no_grad()
def __call__(
self, components: Flux2KleinModularPipeline, block_state: BlockState, i: int, t: torch.Tensor
) -> PipelineState:
latents = block_state.latents
latent_model_input = latents.to(components.transformer.dtype)
img_ids = block_state.latent_ids
image_latents = getattr(block_state, "image_latents", None)
if image_latents is not None:
latent_model_input = torch.cat([latents, image_latents], dim=1).to(components.transformer.dtype)
image_latent_ids = block_state.image_latent_ids
img_ids = torch.cat([img_ids, image_latent_ids], dim=1)
timestep = t.expand(latents.shape[0]).to(latents.dtype)
guider_inputs = {
"encoder_hidden_states": (
getattr(block_state, "prompt_embeds", None),
getattr(block_state, "negative_prompt_embeds", None),
),
"txt_ids": (
getattr(block_state, "txt_ids", None),
getattr(block_state, "negative_txt_ids", None),
),
}
components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t)
guider_state = components.guider.prepare_inputs(guider_inputs)
for guider_state_batch in guider_state:
components.guider.prepare_models(components.transformer)
cond_kwargs = {input_name: getattr(guider_state_batch, input_name) for input_name in guider_inputs.keys()}
noise_pred = components.transformer(
hidden_states=latent_model_input,
timestep=timestep / 1000,
guidance=None,
img_ids=img_ids,
joint_attention_kwargs=block_state.joint_attention_kwargs,
return_dict=False,
**cond_kwargs,
)[0]
guider_state_batch.noise_pred = noise_pred[:, : latents.size(1)]
components.guider.cleanup_models(components.transformer)
# perform guidance
block_state.noise_pred = components.guider(guider_state)[0]
return components, block_state
class Flux2LoopAfterDenoiser(ModularPipelineBlocks):
model_name = "flux2"
@@ -475,35 +250,3 @@ class Flux2DenoiseStep(Flux2DenoiseLoopWrapper):
" - `Flux2LoopAfterDenoiser`\n"
"This block supports both text-to-image and image-conditioned generation."
)
class Flux2KleinDenoiseStep(Flux2DenoiseLoopWrapper):
block_classes = [Flux2KleinLoopDenoiser, Flux2LoopAfterDenoiser]
block_names = ["denoiser", "after_denoiser"]
@property
def description(self) -> str:
return (
"Denoise step that iteratively denoises the latents for Flux2. \n"
"Its loop logic is defined in `Flux2DenoiseLoopWrapper.__call__` method \n"
"At each iteration, it runs blocks defined in `sub_blocks` sequentially:\n"
" - `Flux2KleinLoopDenoiser`\n"
" - `Flux2LoopAfterDenoiser`\n"
"This block supports both text-to-image and image-conditioned generation."
)
class Flux2KleinBaseDenoiseStep(Flux2DenoiseLoopWrapper):
block_classes = [Flux2KleinBaseLoopDenoiser, Flux2LoopAfterDenoiser]
block_names = ["denoiser", "after_denoiser"]
@property
def description(self) -> str:
return (
"Denoise step that iteratively denoises the latents for Flux2. \n"
"Its loop logic is defined in `Flux2DenoiseLoopWrapper.__call__` method \n"
"At each iteration, it runs blocks defined in `sub_blocks` sequentially:\n"
" - `Flux2KleinBaseLoopDenoiser`\n"
" - `Flux2LoopAfterDenoiser`\n"
"This block supports both text-to-image and image-conditioned generation."
)

View File

@@ -15,15 +15,13 @@
from typing import List, Optional, Tuple, Union
import torch
from transformers import AutoProcessor, Mistral3ForConditionalGeneration, Qwen2TokenizerFast, Qwen3ForCausalLM
from transformers import AutoProcessor, Mistral3ForConditionalGeneration
from ...configuration_utils import FrozenDict
from ...guiders import ClassifierFreeGuidance
from ...models import AutoencoderKLFlux2
from ...utils import logging
from ..modular_pipeline import ModularPipelineBlocks, PipelineState
from ..modular_pipeline_utils import ComponentSpec, ConfigSpec, InputParam, OutputParam
from .modular_pipeline import Flux2KleinModularPipeline, Flux2ModularPipeline
from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam
from .modular_pipeline import Flux2ModularPipeline
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@@ -81,8 +79,10 @@ class Flux2TextEncoderStep(ModularPipelineBlocks):
def inputs(self) -> List[InputParam]:
return [
InputParam("prompt"),
InputParam("prompt_embeds", type_hint=torch.Tensor, required=False),
InputParam("max_sequence_length", type_hint=int, default=512, required=False),
InputParam("text_encoder_out_layers", type_hint=Tuple[int], default=(10, 20, 30), required=False),
InputParam("joint_attention_kwargs"),
]
@property
@@ -99,7 +99,14 @@ class Flux2TextEncoderStep(ModularPipelineBlocks):
@staticmethod
def check_inputs(block_state):
prompt = block_state.prompt
if prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
prompt_embeds = getattr(block_state, "prompt_embeds", None)
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. "
"Please make sure to only forward one of the two."
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
@staticmethod
@@ -158,6 +165,10 @@ class Flux2TextEncoderStep(ModularPipelineBlocks):
block_state.device = components._execution_device
if block_state.prompt_embeds is not None:
self.set_block_state(state, block_state)
return components, state
prompt = block_state.prompt
if prompt is None:
prompt = ""
@@ -194,6 +205,7 @@ class Flux2RemoteTextEncoderStep(ModularPipelineBlocks):
def inputs(self) -> List[InputParam]:
return [
InputParam("prompt"),
InputParam("prompt_embeds", type_hint=torch.Tensor, required=False),
]
@property
@@ -210,8 +222,15 @@ class Flux2RemoteTextEncoderStep(ModularPipelineBlocks):
@staticmethod
def check_inputs(block_state):
prompt = block_state.prompt
if prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(block_state.prompt)}")
prompt_embeds = getattr(block_state, "prompt_embeds", None)
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. "
"Please make sure to only forward one of the two."
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
@torch.no_grad()
def __call__(self, components: Flux2ModularPipeline, state: PipelineState) -> PipelineState:
@@ -225,6 +244,10 @@ class Flux2RemoteTextEncoderStep(ModularPipelineBlocks):
block_state.device = components._execution_device
if block_state.prompt_embeds is not None:
self.set_block_state(state, block_state)
return components, state
prompt = block_state.prompt
if prompt is None:
prompt = ""
@@ -247,289 +270,6 @@ class Flux2RemoteTextEncoderStep(ModularPipelineBlocks):
return components, state
class Flux2KleinTextEncoderStep(ModularPipelineBlocks):
model_name = "flux2-klein"
@property
def description(self) -> str:
return "Text Encoder step that generates text embeddings using Qwen3 to guide the image generation"
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec("text_encoder", Qwen3ForCausalLM),
ComponentSpec("tokenizer", Qwen2TokenizerFast),
]
@property
def expected_configs(self) -> List[ConfigSpec]:
return [
ConfigSpec(name="is_distilled", default=True),
]
@property
def inputs(self) -> List[InputParam]:
return [
InputParam("prompt"),
InputParam("max_sequence_length", type_hint=int, default=512, required=False),
InputParam("text_encoder_out_layers", type_hint=Tuple[int], default=(9, 18, 27), required=False),
]
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
"prompt_embeds",
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="Text embeddings from qwen3 used to guide the image generation",
),
]
@staticmethod
def check_inputs(block_state):
prompt = block_state.prompt
if prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
@staticmethod
# Copied from diffusers.pipelines.flux2.pipeline_flux2_klein.Flux2KleinPipeline._get_qwen3_prompt_embeds
def _get_qwen3_prompt_embeds(
text_encoder: Qwen3ForCausalLM,
tokenizer: Qwen2TokenizerFast,
prompt: Union[str, List[str]],
dtype: Optional[torch.dtype] = None,
device: Optional[torch.device] = None,
max_sequence_length: int = 512,
hidden_states_layers: List[int] = (9, 18, 27),
):
dtype = text_encoder.dtype if dtype is None else dtype
device = text_encoder.device if device is None else device
prompt = [prompt] if isinstance(prompt, str) else prompt
all_input_ids = []
all_attention_masks = []
for single_prompt in prompt:
messages = [{"role": "user", "content": single_prompt}]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False,
)
inputs = tokenizer(
text,
return_tensors="pt",
padding="max_length",
truncation=True,
max_length=max_sequence_length,
)
all_input_ids.append(inputs["input_ids"])
all_attention_masks.append(inputs["attention_mask"])
input_ids = torch.cat(all_input_ids, dim=0).to(device)
attention_mask = torch.cat(all_attention_masks, dim=0).to(device)
# Forward pass through the model
output = text_encoder(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
use_cache=False,
)
# Only use outputs from intermediate layers and stack them
out = torch.stack([output.hidden_states[k] for k in hidden_states_layers], dim=1)
out = out.to(dtype=dtype, device=device)
batch_size, num_channels, seq_len, hidden_dim = out.shape
prompt_embeds = out.permute(0, 2, 1, 3).reshape(batch_size, seq_len, num_channels * hidden_dim)
return prompt_embeds
@torch.no_grad()
def __call__(self, components: Flux2KleinModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
self.check_inputs(block_state)
device = components._execution_device
prompt = block_state.prompt
if prompt is None:
prompt = ""
prompt = [prompt] if isinstance(prompt, str) else prompt
block_state.prompt_embeds = self._get_qwen3_prompt_embeds(
text_encoder=components.text_encoder,
tokenizer=components.tokenizer,
prompt=prompt,
device=device,
max_sequence_length=block_state.max_sequence_length,
hidden_states_layers=block_state.text_encoder_out_layers,
)
self.set_block_state(state, block_state)
return components, state
class Flux2KleinBaseTextEncoderStep(ModularPipelineBlocks):
model_name = "flux2-klein"
@property
def description(self) -> str:
return "Text Encoder step that generates text embeddings using Qwen3 to guide the image generation"
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec("text_encoder", Qwen3ForCausalLM),
ComponentSpec("tokenizer", Qwen2TokenizerFast),
ComponentSpec(
"guider",
ClassifierFreeGuidance,
config=FrozenDict({"guidance_scale": 4.0}),
default_creation_method="from_config",
),
]
@property
def expected_configs(self) -> List[ConfigSpec]:
return [
ConfigSpec(name="is_distilled", default=False),
]
@property
def inputs(self) -> List[InputParam]:
return [
InputParam("prompt"),
InputParam("max_sequence_length", type_hint=int, default=512, required=False),
InputParam("text_encoder_out_layers", type_hint=Tuple[int], default=(9, 18, 27), required=False),
]
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
"prompt_embeds",
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="Text embeddings from qwen3 used to guide the image generation",
),
OutputParam(
"negative_prompt_embeds",
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="Negative text embeddings from qwen3 used to guide the image generation",
),
]
@staticmethod
def check_inputs(block_state):
prompt = block_state.prompt
if prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
@staticmethod
# Copied from diffusers.pipelines.flux2.pipeline_flux2_klein.Flux2KleinPipeline._get_qwen3_prompt_embeds
def _get_qwen3_prompt_embeds(
text_encoder: Qwen3ForCausalLM,
tokenizer: Qwen2TokenizerFast,
prompt: Union[str, List[str]],
dtype: Optional[torch.dtype] = None,
device: Optional[torch.device] = None,
max_sequence_length: int = 512,
hidden_states_layers: List[int] = (9, 18, 27),
):
dtype = text_encoder.dtype if dtype is None else dtype
device = text_encoder.device if device is None else device
prompt = [prompt] if isinstance(prompt, str) else prompt
all_input_ids = []
all_attention_masks = []
for single_prompt in prompt:
messages = [{"role": "user", "content": single_prompt}]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False,
)
inputs = tokenizer(
text,
return_tensors="pt",
padding="max_length",
truncation=True,
max_length=max_sequence_length,
)
all_input_ids.append(inputs["input_ids"])
all_attention_masks.append(inputs["attention_mask"])
input_ids = torch.cat(all_input_ids, dim=0).to(device)
attention_mask = torch.cat(all_attention_masks, dim=0).to(device)
# Forward pass through the model
output = text_encoder(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
use_cache=False,
)
# Only use outputs from intermediate layers and stack them
out = torch.stack([output.hidden_states[k] for k in hidden_states_layers], dim=1)
out = out.to(dtype=dtype, device=device)
batch_size, num_channels, seq_len, hidden_dim = out.shape
prompt_embeds = out.permute(0, 2, 1, 3).reshape(batch_size, seq_len, num_channels * hidden_dim)
return prompt_embeds
@torch.no_grad()
def __call__(self, components: Flux2KleinModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
self.check_inputs(block_state)
device = components._execution_device
prompt = block_state.prompt
if prompt is None:
prompt = ""
prompt = [prompt] if isinstance(prompt, str) else prompt
block_state.prompt_embeds = self._get_qwen3_prompt_embeds(
text_encoder=components.text_encoder,
tokenizer=components.tokenizer,
prompt=prompt,
device=device,
max_sequence_length=block_state.max_sequence_length,
hidden_states_layers=block_state.text_encoder_out_layers,
)
if components.requires_unconditional_embeds:
negative_prompt = [""] * len(prompt)
block_state.negative_prompt_embeds = self._get_qwen3_prompt_embeds(
text_encoder=components.text_encoder,
tokenizer=components.tokenizer,
prompt=negative_prompt,
device=device,
max_sequence_length=block_state.max_sequence_length,
hidden_states_layers=block_state.text_encoder_out_layers,
)
else:
block_state.negative_prompt_embeds = None
self.set_block_state(state, block_state)
return components, state
class Flux2VaeEncoderStep(ModularPipelineBlocks):
model_name = "flux2"

View File

@@ -47,7 +47,7 @@ class Flux2TextInputStep(ModularPipelineBlocks):
required=True,
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="Pre-generated text embeddings. Can be generated from text_encoder step.",
description="Pre-generated text embeddings from Mistral3. Can be generated from text_encoder step.",
),
]
@@ -89,90 +89,6 @@ class Flux2TextInputStep(ModularPipelineBlocks):
return components, state
class Flux2KleinBaseTextInputStep(ModularPipelineBlocks):
model_name = "flux2-klein"
@property
def description(self) -> str:
return (
"This step:\n"
" 1. Determines `batch_size` and `dtype` based on `prompt_embeds`\n"
" 2. Ensures all text embeddings have consistent batch sizes (batch_size * num_images_per_prompt)"
)
@property
def inputs(self) -> List[InputParam]:
return [
InputParam("num_images_per_prompt", default=1),
InputParam(
"prompt_embeds",
required=True,
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="Pre-generated text embeddings. Can be generated from text_encoder step.",
),
InputParam(
"negative_prompt_embeds",
required=False,
kwargs_type="denoiser_input_fields",
type_hint=torch.Tensor,
description="Pre-generated negative text embeddings. Can be generated from text_encoder step.",
),
]
@property
def intermediate_outputs(self) -> List[str]:
return [
OutputParam(
"batch_size",
type_hint=int,
description="Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt",
),
OutputParam(
"dtype",
type_hint=torch.dtype,
description="Data type of model tensor inputs (determined by `prompt_embeds`)",
),
OutputParam(
"prompt_embeds",
type_hint=torch.Tensor,
kwargs_type="denoiser_input_fields",
description="Text embeddings used to guide the image generation",
),
OutputParam(
"negative_prompt_embeds",
type_hint=torch.Tensor,
kwargs_type="denoiser_input_fields",
description="Negative text embeddings used to guide the image generation",
),
]
@torch.no_grad()
def __call__(self, components: Flux2ModularPipeline, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
block_state.batch_size = block_state.prompt_embeds.shape[0]
block_state.dtype = block_state.prompt_embeds.dtype
_, seq_len, _ = block_state.prompt_embeds.shape
block_state.prompt_embeds = block_state.prompt_embeds.repeat(1, block_state.num_images_per_prompt, 1)
block_state.prompt_embeds = block_state.prompt_embeds.view(
block_state.batch_size * block_state.num_images_per_prompt, seq_len, -1
)
if block_state.negative_prompt_embeds is not None:
_, seq_len, _ = block_state.negative_prompt_embeds.shape
block_state.negative_prompt_embeds = block_state.negative_prompt_embeds.repeat(
1, block_state.num_images_per_prompt, 1
)
block_state.negative_prompt_embeds = block_state.negative_prompt_embeds.view(
block_state.batch_size * block_state.num_images_per_prompt, seq_len, -1
)
self.set_block_state(state, block_state)
return components, state
class Flux2ProcessImagesInputStep(ModularPipelineBlocks):
model_name = "flux2"

View File

@@ -12,22 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import PIL.Image
import torch
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InsertableDict, OutputParam
from ..modular_pipeline_utils import InsertableDict
from .before_denoise import (
Flux2PrepareGuidanceStep,
Flux2PrepareImageLatentsStep,
Flux2PrepareLatentsStep,
Flux2RoPEInputsStep,
Flux2SetTimestepsStep,
)
from .decoders import Flux2DecodeStep, Flux2UnpackLatentsStep
from .decoders import Flux2DecodeStep
from .denoise import Flux2DenoiseStep
from .encoders import (
Flux2RemoteTextEncoderStep,
@@ -47,6 +41,7 @@ Flux2VaeEncoderBlocks = InsertableDict(
[
("preprocess", Flux2ProcessImagesInputStep()),
("encode", Flux2VaeEncoderStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
]
)
@@ -77,56 +72,33 @@ class Flux2AutoVaeEncoderStep(AutoPipelineBlocks):
)
Flux2CoreDenoiseBlocks = InsertableDict(
Flux2BeforeDenoiseBlocks = InsertableDict(
[
("input", Flux2TextInputStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_guidance", Flux2PrepareGuidanceStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2DenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
]
)
class Flux2CoreDenoiseStep(SequentialPipelineBlocks):
class Flux2BeforeDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = Flux2CoreDenoiseBlocks.values()
block_names = Flux2CoreDenoiseBlocks.keys()
block_classes = Flux2BeforeDenoiseBlocks.values()
block_names = Flux2BeforeDenoiseBlocks.keys()
@property
def description(self):
return (
"Core denoise step that performs the denoising process for Flux2-dev.\n"
" - `Flux2TextInputStep` (input) standardizes the text inputs (prompt_embeds) for the denoising step.\n"
" - `Flux2PrepareImageLatentsStep` (prepare_image_latents) prepares the image latents and image_latent_ids for the denoising step.\n"
" - `Flux2PrepareLatentsStep` (prepare_latents) prepares the initial latents (latents) and latent_ids for the denoising step.\n"
" - `Flux2SetTimestepsStep` (set_timesteps) sets the timesteps for the denoising step.\n"
" - `Flux2PrepareGuidanceStep` (prepare_guidance) prepares the guidance tensor for the denoising step.\n"
" - `Flux2RoPEInputsStep` (prepare_rope_inputs) prepares the RoPE inputs (txt_ids) for the denoising step.\n"
" - `Flux2DenoiseStep` (denoise) iteratively denoises the latents.\n"
" - `Flux2UnpackLatentsStep` (after_denoise) unpacks the latents from the denoising step.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="latents",
type_hint=torch.Tensor,
description="The latents from the denoising step.",
)
]
return "Before denoise step that prepares the inputs for the denoise step in Flux2 generation."
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", Flux2TextEncoderStep()),
("vae_encoder", Flux2AutoVaeEncoderStep()),
("denoise", Flux2CoreDenoiseStep()),
("text_input", Flux2TextInputStep()),
("vae_image_encoder", Flux2AutoVaeEncoderStep()),
("before_denoise", Flux2BeforeDenoiseStep()),
("denoise", Flux2DenoiseStep()),
("decode", Flux2DecodeStep()),
]
)
@@ -135,8 +107,10 @@ AUTO_BLOCKS = InsertableDict(
REMOTE_AUTO_BLOCKS = InsertableDict(
[
("text_encoder", Flux2RemoteTextEncoderStep()),
("vae_encoder", Flux2AutoVaeEncoderStep()),
("denoise", Flux2CoreDenoiseStep()),
("text_input", Flux2TextInputStep()),
("vae_image_encoder", Flux2AutoVaeEncoderStep()),
("before_denoise", Flux2BeforeDenoiseStep()),
("denoise", Flux2DenoiseStep()),
("decode", Flux2DecodeStep()),
]
)
@@ -156,16 +130,6 @@ class Flux2AutoBlocks(SequentialPipelineBlocks):
"- For image-conditioned generation, you need to provide `image` (list of PIL images)."
)
@property
def outputs(self):
return [
OutputParam(
name="images",
type_hint=List[PIL.Image.Image],
description="The images from the decoding step.",
)
]
TEXT2IMAGE_BLOCKS = InsertableDict(
[
@@ -173,10 +137,8 @@ TEXT2IMAGE_BLOCKS = InsertableDict(
("text_input", Flux2TextInputStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_guidance", Flux2PrepareGuidanceStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2DenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
("decode", Flux2DecodeStep()),
]
)
@@ -190,10 +152,8 @@ IMAGE_CONDITIONED_BLOCKS = InsertableDict(
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_guidance", Flux2PrepareGuidanceStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2DenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
("decode", Flux2DecodeStep()),
]
)

View File

@@ -1,232 +0,0 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import PIL.Image
import torch
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InsertableDict, OutputParam
from .before_denoise import (
Flux2KleinBaseRoPEInputsStep,
Flux2PrepareImageLatentsStep,
Flux2PrepareLatentsStep,
Flux2RoPEInputsStep,
Flux2SetTimestepsStep,
)
from .decoders import Flux2DecodeStep, Flux2UnpackLatentsStep
from .denoise import Flux2KleinBaseDenoiseStep, Flux2KleinDenoiseStep
from .encoders import (
Flux2KleinBaseTextEncoderStep,
Flux2KleinTextEncoderStep,
Flux2VaeEncoderStep,
)
from .inputs import (
Flux2KleinBaseTextInputStep,
Flux2ProcessImagesInputStep,
Flux2TextInputStep,
)
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
################
# VAE encoder
################
Flux2KleinVaeEncoderBlocks = InsertableDict(
[
("preprocess", Flux2ProcessImagesInputStep()),
("encode", Flux2VaeEncoderStep()),
]
)
class Flux2KleinVaeEncoderSequentialStep(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = Flux2KleinVaeEncoderBlocks.values()
block_names = Flux2KleinVaeEncoderBlocks.keys()
@property
def description(self) -> str:
return "VAE encoder step that preprocesses and encodes the image inputs into their latent representations."
class Flux2KleinAutoVaeEncoderStep(AutoPipelineBlocks):
block_classes = [Flux2KleinVaeEncoderSequentialStep]
block_names = ["img_conditioning"]
block_trigger_inputs = ["image"]
@property
def description(self):
return (
"VAE encoder step that encodes the image inputs into their latent representations.\n"
"This is an auto pipeline block that works for image conditioning tasks.\n"
" - `Flux2KleinVaeEncoderSequentialStep` is used when `image` is provided.\n"
" - If `image` is not provided, step will be skipped."
)
###
### Core denoise
###
Flux2KleinCoreDenoiseBlocks = InsertableDict(
[
("input", Flux2TextInputStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2KleinDenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
]
)
class Flux2KleinCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = Flux2KleinCoreDenoiseBlocks.values()
block_names = Flux2KleinCoreDenoiseBlocks.keys()
@property
def description(self):
return (
"Core denoise step that performs the denoising process for Flux2-Klein (distilled model).\n"
" - `Flux2KleinTextInputStep` (input) standardizes the text inputs (prompt_embeds) for the denoising step.\n"
" - `Flux2PrepareImageLatentsStep` (prepare_image_latents) prepares the image latents and image_latent_ids for the denoising step.\n"
" - `Flux2PrepareLatentsStep` (prepare_latents) prepares the initial latents (latents) and latent_ids for the denoising step.\n"
" - `Flux2SetTimestepsStep` (set_timesteps) sets the timesteps for the denoising step.\n"
" - `Flux2RoPEInputsStep` (prepare_rope_inputs) prepares the RoPE inputs (txt_ids) for the denoising step.\n"
" - `Flux2KleinDenoiseStep` (denoise) iteratively denoises the latents.\n"
" - `Flux2UnpackLatentsStep` (after_denoise) unpacks the latents from the denoising step.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="latents",
type_hint=torch.Tensor,
description="The latents from the denoising step.",
)
]
Flux2KleinBaseCoreDenoiseBlocks = InsertableDict(
[
("input", Flux2KleinBaseTextInputStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_rope_inputs", Flux2KleinBaseRoPEInputsStep()),
("denoise", Flux2KleinBaseDenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
]
)
class Flux2KleinBaseCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = Flux2KleinBaseCoreDenoiseBlocks.values()
block_names = Flux2KleinBaseCoreDenoiseBlocks.keys()
@property
def description(self):
return "Core denoise step that performs the denoising process for Flux2-Klein (base model)."
return (
"Core denoise step that performs the denoising process for Flux2-Klein (base model).\n"
" - `Flux2KleinBaseTextInputStep` (input) standardizes the text inputs (prompt_embeds + negative_prompt_embeds) for the denoising step.\n"
" - `Flux2PrepareImageLatentsStep` (prepare_image_latents) prepares the image latents and image_latent_ids for the denoising step.\n"
" - `Flux2PrepareLatentsStep` (prepare_latents) prepares the initial latents (latents) and latent_ids for the denoising step.\n"
" - `Flux2SetTimestepsStep` (set_timesteps) sets the timesteps for the denoising step.\n"
" - `Flux2KleinBaseRoPEInputsStep` (prepare_rope_inputs) prepares the RoPE inputs (txt_ids + negative_txt_ids) for the denoising step.\n"
" - `Flux2KleinBaseDenoiseStep` (denoise) iteratively denoises the latents using Classifier-Free Guidance.\n"
" - `Flux2UnpackLatentsStep` (after_denoise) unpacks the latents from the denoising step.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="latents",
type_hint=torch.Tensor,
description="The latents from the denoising step.",
)
]
###
### Auto blocks
###
class Flux2KleinAutoBlocks(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = [
Flux2KleinTextEncoderStep(),
Flux2KleinAutoVaeEncoderStep(),
Flux2KleinCoreDenoiseStep(),
Flux2DecodeStep(),
]
block_names = ["text_encoder", "vae_encoder", "denoise", "decode"]
@property
def description(self):
return (
"Auto blocks that perform the text-to-image and image-conditioned generation using Flux2-Klein.\n"
+ " - for image-conditioned generation, you need to provide `image` (list of PIL images).\n"
+ " - for text-to-image generation, all you need to provide is `prompt`.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="images",
type_hint=List[PIL.Image.Image],
description="The images from the decoding step.",
)
]
class Flux2KleinBaseAutoBlocks(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = [
Flux2KleinBaseTextEncoderStep(),
Flux2KleinAutoVaeEncoderStep(),
Flux2KleinBaseCoreDenoiseStep(),
Flux2DecodeStep(),
]
block_names = ["text_encoder", "vae_encoder", "denoise", "decode"]
@property
def description(self):
return (
"Auto blocks that perform the text-to-image and image-conditioned generation using Flux2-Klein (base model).\n"
+ " - for image-conditioned generation, you need to provide `image` (list of PIL images).\n"
+ " - for text-to-image generation, all you need to provide is `prompt`.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="images",
type_hint=List[PIL.Image.Image],
description="The images from the decoding step.",
)
]

View File

@@ -13,8 +13,6 @@
# limitations under the License.
from typing import Any, Dict, Optional
from ...loaders import Flux2LoraLoaderMixin
from ...utils import logging
from ..modular_pipeline import ModularPipeline
@@ -57,56 +55,3 @@ class Flux2ModularPipeline(ModularPipeline, Flux2LoraLoaderMixin):
if getattr(self, "transformer", None):
num_channels_latents = self.transformer.config.in_channels // 4
return num_channels_latents
class Flux2KleinModularPipeline(ModularPipeline, Flux2LoraLoaderMixin):
"""
A ModularPipeline for Flux2-Klein.
> [!WARNING] > This is an experimental feature and is likely to change in the future.
"""
default_blocks_name = "Flux2KleinBaseAutoBlocks"
def get_default_blocks_name(self, config_dict: Optional[Dict[str, Any]]) -> Optional[str]:
if config_dict is not None and "is_distilled" in config_dict and config_dict["is_distilled"]:
return "Flux2KleinAutoBlocks"
else:
return "Flux2KleinBaseAutoBlocks"
@property
def default_height(self):
return self.default_sample_size * self.vae_scale_factor
@property
def default_width(self):
return self.default_sample_size * self.vae_scale_factor
@property
def default_sample_size(self):
return 128
@property
def vae_scale_factor(self):
vae_scale_factor = 8
if getattr(self, "vae", None) is not None:
vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
return vae_scale_factor
@property
def num_channels_latents(self):
num_channels_latents = 32
if getattr(self, "transformer", None):
num_channels_latents = self.transformer.config.in_channels // 4
return num_channels_latents
@property
def requires_unconditional_embeds(self):
if hasattr(self.config, "is_distilled") and self.config.is_distilled:
return False
requires_unconditional_embeds = False
if hasattr(self, "guider") and self.guider is not None:
requires_unconditional_embeds = self.guider._enabled and self.guider.num_conditions > 1
return requires_unconditional_embeds

View File

@@ -39,6 +39,7 @@ from .modular_pipeline_utils import (
InputParam,
InsertableDict,
OutputParam,
_validate_requirements,
format_components,
format_configs,
make_doc_string,
@@ -59,7 +60,6 @@ MODULAR_PIPELINE_MAPPING = OrderedDict(
("flux", "FluxModularPipeline"),
("flux-kontext", "FluxKontextModularPipeline"),
("flux2", "Flux2ModularPipeline"),
("flux2-klein", "Flux2KleinModularPipeline"),
("qwenimage", "QwenImageModularPipeline"),
("qwenimage-edit", "QwenImageEditModularPipeline"),
("qwenimage-edit-plus", "QwenImageEditPlusModularPipeline"),
@@ -243,6 +243,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
config_name = "modular_config.json"
model_name = None
_requirements: Optional[Dict[str, str]] = None
@classmethod
def _get_signature_keys(cls, obj):
@@ -305,6 +306,19 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
trust_remote_code: bool = False,
**kwargs,
):
config = cls.load_config(pretrained_model_name_or_path)
has_remote_code = "auto_map" in config and cls.__name__ in config["auto_map"]
trust_remote_code = resolve_trust_remote_code(
trust_remote_code, pretrained_model_name_or_path, has_remote_code
)
if not (has_remote_code and trust_remote_code):
raise ValueError(
"Selected model repository does not happear to have any custom code or does not have a valid `config.json` file."
)
if "requirements" in config and config["requirements"] is not None:
_ = _validate_requirements(config["requirements"])
hub_kwargs_names = [
"cache_dir",
"force_download",
@@ -317,16 +331,6 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
]
hub_kwargs = {name: kwargs.pop(name) for name in hub_kwargs_names if name in kwargs}
config = cls.load_config(pretrained_model_name_or_path, **hub_kwargs)
has_remote_code = "auto_map" in config and cls.__name__ in config["auto_map"]
trust_remote_code = resolve_trust_remote_code(
trust_remote_code, pretrained_model_name_or_path, has_remote_code
)
if not has_remote_code and trust_remote_code:
raise ValueError(
"Selected model repository does not happear to have any custom code or does not have a valid `config.json` file."
)
class_ref = config["auto_map"][cls.__name__]
module_file, class_name = class_ref.split(".")
module_file = module_file + ".py"
@@ -351,8 +355,13 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
module = full_mod.rsplit(".", 1)[-1].replace("__dynamic__", "")
parent_module = self.save_pretrained.__func__.__qualname__.split(".", 1)[0]
auto_map = {f"{parent_module}": f"{module}.{cls_name}"}
self.register_to_config(auto_map=auto_map)
# resolve requirements
requirements = _validate_requirements(getattr(self, "_requirements", None))
if requirements:
self.register_to_config(requirements=requirements)
self.save_config(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs)
config = dict(self.config)
self._internal_dict = FrozenDict(config)
@@ -1155,6 +1164,14 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
expected_configs=self.expected_configs,
)
@property
def _requirements(self) -> Dict[str, str]:
requirements = {}
for block_name, block in self.sub_blocks.items():
if getattr(block, "_requirements", None):
requirements[block_name] = block._requirements
return requirements
class LoopSequentialPipelineBlocks(ModularPipelineBlocks):
"""

View File

@@ -19,10 +19,12 @@ from dataclasses import dataclass, field, fields
from typing import Any, Dict, List, Literal, Optional, Type, Union
import torch
from packaging.specifiers import InvalidSpecifier, SpecifierSet
from ..configuration_utils import ConfigMixin, FrozenDict
from ..loaders.single_file_utils import _is_single_file_path_or_url
from ..utils import is_torch_available, logging
from ..utils.import_utils import _is_package_available
if is_torch_available():
@@ -690,3 +692,86 @@ def make_doc_string(
output += format_output_params(outputs, indent_level=2)
return output
def _validate_requirements(reqs):
if reqs is None:
normalized_reqs = {}
else:
if not isinstance(reqs, dict):
raise ValueError(
"Requirements must be provided as a dictionary mapping package names to version specifiers."
)
normalized_reqs = _normalize_requirements(reqs)
if not normalized_reqs:
return {}
final: Dict[str, str] = {}
for req, specified_ver in normalized_reqs.items():
req_available, req_actual_ver = _is_package_available(req)
if not req_available:
logger.warning(f"{req} was specified in the requirements but wasn't found in the current environment.")
if specified_ver:
try:
specifier = SpecifierSet(specified_ver)
except InvalidSpecifier as err:
raise ValueError(f"Requirement specifier '{specified_ver}' for {req} is invalid.") from err
if req_actual_ver == "N/A":
logger.warning(
f"Version of {req} could not be determined to validate requirement '{specified_ver}'. Things might work unexpected."
)
elif not specifier.contains(req_actual_ver, prereleases=True):
logger.warning(
f"{req} requirement '{specified_ver}' is not satisfied by the installed version {req_actual_ver}. Things might work unexpected."
)
final[req] = specified_ver
return final
def _normalize_requirements(reqs):
if not reqs:
return {}
normalized: "OrderedDict[str, str]" = OrderedDict()
def _accumulate(mapping: Dict[str, Any]):
for pkg, spec in mapping.items():
if isinstance(spec, dict):
# This is recursive because blocks are composable. This way, we can merge requirements
# from multiple blocks.
_accumulate(spec)
continue
pkg_name = str(pkg).strip()
if not pkg_name:
raise ValueError("Requirement package name cannot be empty.")
spec_str = "" if spec is None else str(spec).strip()
if spec_str and not spec_str.startswith(("<", ">", "=", "!", "~")):
spec_str = f"=={spec_str}"
existing_spec = normalized.get(pkg_name)
if existing_spec is not None:
if not existing_spec and spec_str:
normalized[pkg_name] = spec_str
elif existing_spec and spec_str and existing_spec != spec_str:
try:
combined_spec = SpecifierSet(",".join(filter(None, [existing_spec, spec_str])))
except InvalidSpecifier:
logger.warning(
f"Conflicting requirements for '{pkg_name}' detected: '{existing_spec}' vs '{spec_str}'. Keeping '{existing_spec}'."
)
else:
normalized[pkg_name] = str(combined_spec)
continue
normalized[pkg_name] = spec_str
_accumulate(reqs)
return normalized

View File

@@ -14,7 +14,7 @@ from .scheduling_utils import SchedulerMixin
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -28,8 +28,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -51,7 +51,7 @@ class DDIMSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -65,8 +65,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -51,7 +51,7 @@ class DDIMSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -65,8 +65,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:
@@ -100,13 +100,14 @@ def betas_for_alpha_bar(
return torch.tensor(betas, dtype=torch.float32)
def rescale_zero_terminal_snr(alphas_cumprod: torch.Tensor) -> torch.Tensor:
def rescale_zero_terminal_snr(alphas_cumprod):
"""
Rescales betas to have zero terminal SNR Based on (Algorithm 1)[https://huggingface.co/papers/2305.08891]
Rescales betas to have zero terminal SNR Based on https://huggingface.co/papers/2305.08891 (Algorithm 1)
Args:
alphas_cumprod (`torch.Tensor`):
The alphas cumulative products that the scheduler is being initialized with.
betas (`torch.Tensor`):
the betas that the scheduler is being initialized with.
Returns:
`torch.Tensor`: rescaled betas with zero terminal SNR
@@ -141,11 +142,11 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
Args:
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
beta_start (`float`, defaults to 0.00085):
beta_start (`float`, defaults to 0.0001):
The starting `beta` value of inference.
beta_end (`float`, defaults to 0.0120):
beta_end (`float`, defaults to 0.02):
The final `beta` value.
beta_schedule (`str`, defaults to `"scaled_linear"`):
beta_schedule (`str`, defaults to `"linear"`):
The beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from
`linear`, `scaled_linear`, or `squaredcos_cap_v2`.
trained_betas (`np.ndarray`, *optional*):
@@ -178,8 +179,6 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
Whether to rescale the betas to have zero terminal SNR. This enables the model to generate very bright and
dark samples instead of limiting it to samples with medium brightness. Loosely related to
[`--offset_noise`](https://github.com/huggingface/diffusers/blob/74fd735eb073eb1d774b1ab4154a0876eb82f055/examples/dreambooth/train_dreambooth.py#L506).
snr_shift_scale (`float`, defaults to 3.0):
Shift scale for SNR.
"""
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
@@ -191,15 +190,15 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
num_train_timesteps: int = 1000,
beta_start: float = 0.00085,
beta_end: float = 0.0120,
beta_schedule: Literal["linear", "scaled_linear", "squaredcos_cap_v2"] = "scaled_linear",
beta_schedule: str = "scaled_linear",
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
clip_sample: bool = True,
set_alpha_to_one: bool = True,
steps_offset: int = 0,
prediction_type: Literal["epsilon", "sample", "v_prediction"] = "epsilon",
prediction_type: str = "epsilon",
clip_sample_range: float = 1.0,
sample_max_value: float = 1.0,
timestep_spacing: Literal["linspace", "leading", "trailing"] = "leading",
timestep_spacing: str = "leading",
rescale_betas_zero_snr: bool = False,
snr_shift_scale: float = 3.0,
):
@@ -209,15 +208,7 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
elif beta_schedule == "scaled_linear":
# this schedule is very specific to the latent diffusion model.
self.betas = (
torch.linspace(
beta_start**0.5,
beta_end**0.5,
num_train_timesteps,
dtype=torch.float64,
)
** 2
)
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float64) ** 2
elif beta_schedule == "squaredcos_cap_v2":
# Glide cosine schedule
self.betas = betas_for_alpha_bar(num_train_timesteps)
@@ -247,7 +238,7 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
self.num_inference_steps = None
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy().astype(np.int64))
def _get_variance(self, timestep: int, prev_timestep: int) -> torch.Tensor:
def _get_variance(self, timestep, prev_timestep):
alpha_prod_t = self.alphas_cumprod[timestep]
alpha_prod_t_prev = self.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else self.final_alpha_cumprod
beta_prod_t = 1 - alpha_prod_t
@@ -274,11 +265,7 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
"""
return sample
def set_timesteps(
self,
num_inference_steps: int,
device: Optional[Union[str, torch.device]] = None,
) -> None:
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
Sets the discrete timesteps used for the diffusion chain (to be run before inference).
@@ -330,7 +317,7 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
sample: torch.Tensor,
eta: float = 0.0,
use_clipped_model_output: bool = False,
generator: Optional[torch.Generator] = None,
generator=None,
variance_noise: Optional[torch.Tensor] = None,
return_dict: bool = True,
) -> Union[DDIMSchedulerOutput, Tuple]:
@@ -341,7 +328,7 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
Args:
model_output (`torch.Tensor`):
The direct output from learned diffusion model.
timestep (`int`):
timestep (`float`):
The current discrete timestep in the diffusion chain.
sample (`torch.Tensor`):
A current instance of a sample created by the diffusion process.
@@ -500,5 +487,5 @@ class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
return velocity
def __len__(self) -> int:
def __len__(self):
return self.config.num_train_timesteps

View File

@@ -49,7 +49,7 @@ class DDIMSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -63,8 +63,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -51,7 +51,7 @@ class DDIMParallelSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -65,8 +65,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -48,7 +48,7 @@ class DDPMSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -62,8 +62,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:
@@ -192,12 +192,7 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
beta_schedule: Literal["linear", "scaled_linear", "squaredcos_cap_v2", "sigmoid"] = "linear",
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
variance_type: Literal[
"fixed_small",
"fixed_small_log",
"fixed_large",
"fixed_large_log",
"learned",
"learned_range",
"fixed_small", "fixed_small_log", "fixed_large", "fixed_large_log", "learned", "learned_range"
] = "fixed_small",
clip_sample: bool = True,
prediction_type: Literal["epsilon", "sample", "v_prediction"] = "epsilon",
@@ -215,15 +210,7 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
elif beta_schedule == "scaled_linear":
# this schedule is very specific to the latent diffusion model.
self.betas = (
torch.linspace(
beta_start**0.5,
beta_end**0.5,
num_train_timesteps,
dtype=torch.float32,
)
** 2
)
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
elif beta_schedule == "squaredcos_cap_v2":
# Glide cosine schedule
self.betas = betas_for_alpha_bar(num_train_timesteps)
@@ -350,14 +337,7 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
t: int,
predicted_variance: Optional[torch.Tensor] = None,
variance_type: Optional[
Literal[
"fixed_small",
"fixed_small_log",
"fixed_large",
"fixed_large_log",
"learned",
"learned_range",
]
Literal["fixed_small", "fixed_small_log", "fixed_large", "fixed_large_log", "learned", "learned_range"]
] = None,
) -> torch.Tensor:
"""
@@ -492,10 +472,7 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
prev_t = self.previous_timestep(t)
if model_output.shape[1] == sample.shape[1] * 2 and self.variance_type in [
"learned",
"learned_range",
]:
if model_output.shape[1] == sample.shape[1] * 2 and self.variance_type in ["learned", "learned_range"]:
model_output, predicted_variance = torch.split(model_output, sample.shape[1], dim=1)
else:
predicted_variance = None
@@ -544,10 +521,7 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
if t > 0:
device = model_output.device
variance_noise = randn_tensor(
model_output.shape,
generator=generator,
device=device,
dtype=model_output.dtype,
model_output.shape, generator=generator, device=device, dtype=model_output.dtype
)
if self.variance_type == "fixed_small_log":
variance = self._get_variance(t, predicted_variance=predicted_variance) * variance_noise

View File

@@ -50,7 +50,7 @@ class DDPMParallelSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -64,8 +64,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:
@@ -202,12 +202,7 @@ class DDPMParallelScheduler(SchedulerMixin, ConfigMixin):
beta_schedule: Literal["linear", "scaled_linear", "squaredcos_cap_v2", "sigmoid"] = "linear",
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
variance_type: Literal[
"fixed_small",
"fixed_small_log",
"fixed_large",
"fixed_large_log",
"learned",
"learned_range",
"fixed_small", "fixed_small_log", "fixed_large", "fixed_large_log", "learned", "learned_range"
] = "fixed_small",
clip_sample: bool = True,
prediction_type: Literal["epsilon", "sample", "v_prediction"] = "epsilon",
@@ -225,15 +220,7 @@ class DDPMParallelScheduler(SchedulerMixin, ConfigMixin):
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
elif beta_schedule == "scaled_linear":
# this schedule is very specific to the latent diffusion model.
self.betas = (
torch.linspace(
beta_start**0.5,
beta_end**0.5,
num_train_timesteps,
dtype=torch.float32,
)
** 2
)
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
elif beta_schedule == "squaredcos_cap_v2":
# Glide cosine schedule
self.betas = betas_for_alpha_bar(num_train_timesteps)
@@ -363,14 +350,7 @@ class DDPMParallelScheduler(SchedulerMixin, ConfigMixin):
t: int,
predicted_variance: Optional[torch.Tensor] = None,
variance_type: Optional[
Literal[
"fixed_small",
"fixed_small_log",
"fixed_large",
"fixed_large_log",
"learned",
"learned_range",
]
Literal["fixed_small", "fixed_small_log", "fixed_large", "fixed_large_log", "learned", "learned_range"]
] = None,
) -> torch.Tensor:
"""

View File

@@ -34,7 +34,7 @@ if is_scipy_available():
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -48,8 +48,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -52,7 +52,7 @@ class DDIMSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -66,8 +66,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -34,7 +34,7 @@ if is_scipy_available():
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -48,8 +48,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -34,7 +34,7 @@ if is_scipy_available():
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -48,8 +48,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -117,7 +117,7 @@ class BrownianTreeNoiseSampler:
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -131,8 +131,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -36,7 +36,7 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -50,8 +50,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -51,7 +51,7 @@ class EulerAncestralDiscreteSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -65,8 +65,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -54,7 +54,7 @@ class EulerDiscreteSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -68,8 +68,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -51,7 +51,7 @@ class HeunDiscreteSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -65,8 +65,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -52,7 +52,7 @@ class KDPM2AncestralDiscreteSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -66,8 +66,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -51,7 +51,7 @@ class KDPM2DiscreteSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -65,8 +65,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -53,7 +53,7 @@ class LCMSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -67,8 +67,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -49,7 +49,7 @@ class LMSDiscreteSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -63,8 +63,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -28,7 +28,7 @@ from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin, Schedul
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -42,8 +42,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -47,7 +47,7 @@ class RePaintSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -61,8 +61,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -35,7 +35,7 @@ if is_scipy_available():
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -49,8 +49,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -52,7 +52,7 @@ class TCDSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -66,8 +66,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -48,7 +48,7 @@ class UnCLIPSchedulerOutput(BaseOutput):
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -62,8 +62,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -34,7 +34,7 @@ if is_scipy_available():
def betas_for_alpha_bar(
num_diffusion_timesteps: int,
max_beta: float = 0.999,
alpha_transform_type: Literal["cosine", "exp", "laplace"] = "cosine",
alpha_transform_type: Literal["cosine", "exp"] = "cosine",
) -> torch.Tensor:
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
@@ -48,8 +48,8 @@ def betas_for_alpha_bar(
The number of betas to produce.
max_beta (`float`, defaults to `0.999`):
The maximum beta to use; use values lower than 1 to avoid numerical instability.
alpha_transform_type (`str`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.
alpha_transform_type (`"cosine"` or `"exp"`, defaults to `"cosine"`):
The type of noise schedule for `alpha_bar`. Choose from `cosine` or `exp`.
Returns:
`torch.Tensor`:

View File

@@ -17,51 +17,6 @@ class Flux2AutoBlocks(metaclass=DummyObject):
requires_backends(cls, ["torch", "transformers"])
class Flux2KleinAutoBlocks(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class Flux2KleinBaseAutoBlocks(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class Flux2KleinModularPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class Flux2ModularPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]

View File

@@ -1,91 +0,0 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import numpy as np
import PIL
import pytest
from diffusers.modular_pipelines import (
Flux2KleinAutoBlocks,
Flux2KleinModularPipeline,
)
from ...testing_utils import floats_tensor, torch_device
from ..test_modular_pipelines_common import ModularPipelineTesterMixin
class TestFlux2ModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = Flux2KleinModularPipeline
pipeline_blocks_class = Flux2KleinAutoBlocks
pretrained_model_name_or_path = "hf-internal-testing/tiny-flux2-klein-modular"
params = frozenset(["prompt", "height", "width"])
batch_params = frozenset(["prompt"])
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
# TODO (Dhruv): Update text encoder config so that vocab_size matches tokenizer
"max_sequence_length": 8, # bit of a hack to workaround vocab size mismatch
"text_encoder_out_layers": (1,),
"generator": generator,
"num_inference_steps": 2,
"height": 32,
"width": 32,
"output_type": "pt",
}
return inputs
def test_float16_inference(self):
super().test_float16_inference(9e-2)
class TestFlux2ImageConditionedModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = Flux2KleinModularPipeline
pipeline_blocks_class = Flux2KleinAutoBlocks
pretrained_model_name_or_path = "hf-internal-testing/tiny-flux2-klein-modular"
params = frozenset(["prompt", "height", "width", "image"])
batch_params = frozenset(["prompt", "image"])
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
# TODO (Dhruv): Update text encoder config so that vocab_size matches tokenizer
"max_sequence_length": 8, # bit of a hack to workaround vocab size mismatch
"text_encoder_out_layers": (1,),
"generator": generator,
"num_inference_steps": 2,
"height": 32,
"width": 32,
"output_type": "pt",
}
image = floats_tensor((1, 3, 64, 64), rng=random.Random(seed)).to(torch_device)
image = image.cpu().permute(0, 2, 3, 1)[0]
init_image = PIL.Image.fromarray(np.uint8(image * 255)).convert("RGB")
inputs["image"] = init_image
return inputs
def test_float16_inference(self):
super().test_float16_inference(9e-2)
@pytest.mark.skip(reason="batched inference is currently not supported")
def test_inference_batch_single_identical(self, batch_size=2, expected_max_diff=0.0001):
return

View File

@@ -1,91 +0,0 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import numpy as np
import PIL
import pytest
from diffusers.modular_pipelines import (
Flux2KleinBaseAutoBlocks,
Flux2KleinModularPipeline,
)
from ...testing_utils import floats_tensor, torch_device
from ..test_modular_pipelines_common import ModularPipelineTesterMixin
class TestFlux2ModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = Flux2KleinModularPipeline
pipeline_blocks_class = Flux2KleinBaseAutoBlocks
pretrained_model_name_or_path = "hf-internal-testing/tiny-flux2-klein-base-modular"
params = frozenset(["prompt", "height", "width"])
batch_params = frozenset(["prompt"])
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
# TODO (Dhruv): Update text encoder config so that vocab_size matches tokenizer
"max_sequence_length": 8, # bit of a hack to workaround vocab size mismatch
"text_encoder_out_layers": (1,),
"generator": generator,
"num_inference_steps": 2,
"height": 32,
"width": 32,
"output_type": "pt",
}
return inputs
def test_float16_inference(self):
super().test_float16_inference(9e-2)
class TestFlux2ImageConditionedModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = Flux2KleinModularPipeline
pipeline_blocks_class = Flux2KleinBaseAutoBlocks
pretrained_model_name_or_path = "hf-internal-testing/tiny-flux2-klein-base-modular"
params = frozenset(["prompt", "height", "width", "image"])
batch_params = frozenset(["prompt", "image"])
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
# TODO (Dhruv): Update text encoder config so that vocab_size matches tokenizer
"max_sequence_length": 8, # bit of a hack to workaround vocab size mismatch
"text_encoder_out_layers": (1,),
"generator": generator,
"num_inference_steps": 2,
"height": 32,
"width": 32,
"output_type": "pt",
}
image = floats_tensor((1, 3, 64, 64), rng=random.Random(seed)).to(torch_device)
image = image.cpu().permute(0, 2, 3, 1)[0]
init_image = PIL.Image.fromarray(np.uint8(image * 255)).convert("RGB")
inputs["image"] = init_image
return inputs
def test_float16_inference(self):
super().test_float16_inference(9e-2)
@pytest.mark.skip(reason="batched inference is currently not supported")
def test_inference_batch_single_identical(self, batch_size=2, expected_max_diff=0.0001):
return

View File

@@ -1,4 +1,6 @@
import gc
import json
import os
import tempfile
from typing import Callable, Union
@@ -8,9 +10,16 @@ import torch
import diffusers
from diffusers import ComponentsManager, ModularPipeline, ModularPipelineBlocks
from diffusers.guiders import ClassifierFreeGuidance
from diffusers.modular_pipelines import SequentialPipelineBlocks
from diffusers.utils import logging
from ..testing_utils import backend_empty_cache, numpy_cosine_similarity_distance, require_accelerator, torch_device
from ..testing_utils import (
CaptureLogger,
backend_empty_cache,
numpy_cosine_similarity_distance,
require_accelerator,
torch_device,
)
class ModularPipelineTesterMixin:
@@ -335,3 +344,53 @@ class ModularGuiderTesterMixin:
assert out_cfg.shape == out_no_cfg.shape
max_diff = torch.abs(out_cfg - out_no_cfg).max()
assert max_diff > expected_max_diff, "Output with CFG must be different from normal inference"
class TestCustomBlockRequirements:
def get_dummy_block_pipe(self):
class DummyBlockOne:
# keep two arbitrary deps so that we can test warnings.
_requirements = {"xyz": ">=0.8.0", "abc": ">=10.0.0"}
class DummyBlockTwo:
# keep two dependencies that will be available during testing.
_requirements = {"transformers": ">=4.44.0", "diffusers": ">=0.2.0"}
pipe = SequentialPipelineBlocks.from_blocks_dict(
{"dummy_block_one": DummyBlockOne, "dummy_block_two": DummyBlockTwo}
)
return pipe
def test_custom_requirements_save_load(self):
pipe = self.get_dummy_block_pipe()
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
config_path = os.path.join(tmpdir, "modular_config.json")
with open(config_path, "r") as f:
config = json.load(f)
assert "requirements" in config
requirements = config["requirements"]
expected_requirements = {
"xyz": ">=0.8.0",
"abc": ">=10.0.0",
"transformers": ">=4.44.0",
"diffusers": ">=0.2.0",
}
assert expected_requirements == requirements
def test_warnings(self):
pipe = self.get_dummy_block_pipe()
with tempfile.TemporaryDirectory() as tmpdir:
logger = logging.get_logger("diffusers.modular_pipelines.modular_pipeline_utils")
logger.setLevel(30)
with CaptureLogger(logger) as cap_logger:
pipe.save_pretrained(tmpdir)
template = "{req} was specified in the requirements but wasn't found in the current environment"
msg_xyz = template.format(req="xyz")
msg_abc = template.format(req="abc")
assert msg_xyz in str(cap_logger.out)
assert msg_abc in str(cap_logger.out)