Compare commits

..

1 Commits

Author SHA1 Message Date
Álvaro Somoza
84718e0d6c change lora mixin 2026-02-13 23:57:20 -03:00
33 changed files with 1114 additions and 1576 deletions

View File

@@ -294,17 +294,10 @@ else:
)
_import_structure["modular_pipelines"].extend(
[
"AutoPipelineBlocks",
"ComponentsManager",
"ComponentSpec",
"ConditionalPipelineBlocks",
"ConfigSpec",
"InputParam",
"LoopSequentialPipelineBlocks",
"ModularPipeline",
"ModularPipelineBlocks",
"OutputParam",
"SequentialPipelineBlocks",
]
)
_import_structure["optimization"] = [
@@ -1070,19 +1063,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
ZImageTransformer2DModel,
attention_backend,
)
from .modular_pipelines import (
AutoPipelineBlocks,
ComponentsManager,
ComponentSpec,
ConditionalPipelineBlocks,
ConfigSpec,
InputParam,
LoopSequentialPipelineBlocks,
ModularPipeline,
ModularPipelineBlocks,
OutputParam,
SequentialPipelineBlocks,
)
from .modular_pipelines import ComponentsManager, ComponentSpec, ModularPipeline, ModularPipelineBlocks
from .optimization import (
get_constant_schedule,
get_constant_schedule_with_warmup,

View File

@@ -33,7 +33,6 @@ else:
"ModularPipeline",
"AutoPipelineBlocks",
"SequentialPipelineBlocks",
"ConditionalPipelineBlocks",
"LoopSequentialPipelineBlocks",
"PipelineState",
"BlockState",
@@ -106,7 +105,6 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
from .modular_pipeline import (
AutoPipelineBlocks,
BlockState,
ConditionalPipelineBlocks,
LoopSequentialPipelineBlocks,
ModularPipeline,
ModularPipelineBlocks,

View File

@@ -21,8 +21,21 @@ except OptionalDependencyNotAvailable:
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
_import_structure["modular_blocks_flux"] = ["FluxAutoBlocks"]
_import_structure["modular_blocks_flux_kontext"] = ["FluxKontextAutoBlocks"]
_import_structure["encoders"] = ["FluxTextEncoderStep"]
_import_structure["modular_blocks"] = [
"ALL_BLOCKS",
"AUTO_BLOCKS",
"AUTO_BLOCKS_KONTEXT",
"FLUX_KONTEXT_BLOCKS",
"TEXT2IMAGE_BLOCKS",
"FluxAutoBeforeDenoiseStep",
"FluxAutoBlocks",
"FluxAutoDecodeStep",
"FluxAutoDenoiseStep",
"FluxKontextAutoBlocks",
"FluxKontextAutoDenoiseStep",
"FluxKontextBeforeDenoiseStep",
]
_import_structure["modular_pipeline"] = ["FluxKontextModularPipeline", "FluxModularPipeline"]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
@@ -32,8 +45,21 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
from .modular_blocks_flux import FluxAutoBlocks
from .modular_blocks_flux_kontext import FluxKontextAutoBlocks
from .encoders import FluxTextEncoderStep
from .modular_blocks import (
ALL_BLOCKS,
AUTO_BLOCKS,
AUTO_BLOCKS_KONTEXT,
FLUX_KONTEXT_BLOCKS,
TEXT2IMAGE_BLOCKS,
FluxAutoBeforeDenoiseStep,
FluxAutoBlocks,
FluxAutoDecodeStep,
FluxAutoDenoiseStep,
FluxKontextAutoBlocks,
FluxKontextAutoDenoiseStep,
FluxKontextBeforeDenoiseStep,
)
from .modular_pipeline import FluxKontextModularPipeline, FluxModularPipeline
else:
import sys

View File

@@ -205,7 +205,7 @@ class FluxKontextProcessImagesInputStep(ModularPipelineBlocks):
return components, state
class FluxVaeEncoderStep(ModularPipelineBlocks):
class FluxVaeEncoderDynamicStep(ModularPipelineBlocks):
model_name = "flux"
def __init__(

View File

@@ -121,7 +121,7 @@ class FluxTextInputStep(ModularPipelineBlocks):
# Adapted from `QwenImageAdditionalInputsStep`
class FluxAdditionalInputsStep(ModularPipelineBlocks):
class FluxInputsDynamicStep(ModularPipelineBlocks):
model_name = "flux"
def __init__(
@@ -243,7 +243,7 @@ class FluxAdditionalInputsStep(ModularPipelineBlocks):
return components, state
class FluxKontextAdditionalInputsStep(FluxAdditionalInputsStep):
class FluxKontextInputsDynamicStep(FluxInputsDynamicStep):
model_name = "flux-kontext"
def __call__(self, components: FluxModularPipeline, state: PipelineState) -> PipelineState:
@@ -256,7 +256,7 @@ class FluxKontextAdditionalInputsStep(FluxAdditionalInputsStep):
continue
# 1. Calculate height/width from latents
# Unlike the `FluxAdditionalInputsStep`, we don't overwrite the `block.height` and `block.width`
# Unlike the `FluxInputsDynamicStep`, we don't overwrite the `block.height` and `block.width`
height, width = calculate_dimension_from_latents(image_latent_tensor, components.vae_scale_factor)
if not hasattr(block_state, "image_height"):
block_state.image_height = height
@@ -303,7 +303,6 @@ class FluxKontextAdditionalInputsStep(FluxAdditionalInputsStep):
class FluxKontextSetResolutionStep(ModularPipelineBlocks):
model_name = "flux-kontext"
@property
def description(self):
return (
"Determines the height and width to be used during the subsequent computations.\n"

View File

@@ -0,0 +1,446 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InsertableDict
from .before_denoise import (
FluxImg2ImgPrepareLatentsStep,
FluxImg2ImgSetTimestepsStep,
FluxKontextRoPEInputsStep,
FluxPrepareLatentsStep,
FluxRoPEInputsStep,
FluxSetTimestepsStep,
)
from .decoders import FluxDecodeStep
from .denoise import FluxDenoiseStep, FluxKontextDenoiseStep
from .encoders import (
FluxKontextProcessImagesInputStep,
FluxProcessImagesInputStep,
FluxTextEncoderStep,
FluxVaeEncoderDynamicStep,
)
from .inputs import (
FluxInputsDynamicStep,
FluxKontextInputsDynamicStep,
FluxKontextSetResolutionStep,
FluxTextInputStep,
)
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# vae encoder (run before before_denoise)
FluxImg2ImgVaeEncoderBlocks = InsertableDict(
[("preprocess", FluxProcessImagesInputStep()), ("encode", FluxVaeEncoderDynamicStep())]
)
class FluxImg2ImgVaeEncoderStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = FluxImg2ImgVaeEncoderBlocks.values()
block_names = FluxImg2ImgVaeEncoderBlocks.keys()
@property
def description(self) -> str:
return "Vae encoder step that preprocess andencode the image inputs into their latent representations."
class FluxAutoVaeEncoderStep(AutoPipelineBlocks):
block_classes = [FluxImg2ImgVaeEncoderStep]
block_names = ["img2img"]
block_trigger_inputs = ["image"]
@property
def description(self):
return (
"Vae encoder step that encode the image inputs into their latent representations.\n"
+ "This is an auto pipeline block that works for img2img tasks.\n"
+ " - `FluxImg2ImgVaeEncoderStep` (img2img) is used when only `image` is provided."
+ " - if `image` is not provided, step will be skipped."
)
# Flux Kontext vae encoder (run before before_denoise)
FluxKontextVaeEncoderBlocks = InsertableDict(
[("preprocess", FluxKontextProcessImagesInputStep()), ("encode", FluxVaeEncoderDynamicStep(sample_mode="argmax"))]
)
class FluxKontextVaeEncoderStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = FluxKontextVaeEncoderBlocks.values()
block_names = FluxKontextVaeEncoderBlocks.keys()
@property
def description(self) -> str:
return "Vae encoder step that preprocess andencode the image inputs into their latent representations."
class FluxKontextAutoVaeEncoderStep(AutoPipelineBlocks):
block_classes = [FluxKontextVaeEncoderStep]
block_names = ["img2img"]
block_trigger_inputs = ["image"]
@property
def description(self):
return (
"Vae encoder step that encode the image inputs into their latent representations.\n"
+ "This is an auto pipeline block that works for img2img tasks.\n"
+ " - `FluxKontextVaeEncoderStep` (img2img) is used when only `image` is provided."
+ " - if `image` is not provided, step will be skipped."
)
# before_denoise: text2img
FluxBeforeDenoiseBlocks = InsertableDict(
[
("prepare_latents", FluxPrepareLatentsStep()),
("set_timesteps", FluxSetTimestepsStep()),
("prepare_rope_inputs", FluxRoPEInputsStep()),
]
)
class FluxBeforeDenoiseStep(SequentialPipelineBlocks):
block_classes = FluxBeforeDenoiseBlocks.values()
block_names = FluxBeforeDenoiseBlocks.keys()
@property
def description(self):
return "Before denoise step that prepares the inputs for the denoise step in text-to-image generation."
# before_denoise: img2img
FluxImg2ImgBeforeDenoiseBlocks = InsertableDict(
[
("prepare_latents", FluxPrepareLatentsStep()),
("set_timesteps", FluxImg2ImgSetTimestepsStep()),
("prepare_img2img_latents", FluxImg2ImgPrepareLatentsStep()),
("prepare_rope_inputs", FluxRoPEInputsStep()),
]
)
class FluxImg2ImgBeforeDenoiseStep(SequentialPipelineBlocks):
block_classes = FluxImg2ImgBeforeDenoiseBlocks.values()
block_names = FluxImg2ImgBeforeDenoiseBlocks.keys()
@property
def description(self):
return "Before denoise step that prepare the inputs for the denoise step for img2img task."
# before_denoise: all task (text2img, img2img)
class FluxAutoBeforeDenoiseStep(AutoPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxImg2ImgBeforeDenoiseStep, FluxBeforeDenoiseStep]
block_names = ["img2img", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Before denoise step that prepare the inputs for the denoise step.\n"
+ "This is an auto pipeline block that works for text2image.\n"
+ " - `FluxBeforeDenoiseStep` (text2image) is used.\n"
+ " - `FluxImg2ImgBeforeDenoiseStep` (img2img) is used when only `image_latents` is provided.\n"
)
# before_denoise: FluxKontext
FluxKontextBeforeDenoiseBlocks = InsertableDict(
[
("prepare_latents", FluxPrepareLatentsStep()),
("set_timesteps", FluxSetTimestepsStep()),
("prepare_rope_inputs", FluxKontextRoPEInputsStep()),
]
)
class FluxKontextBeforeDenoiseStep(SequentialPipelineBlocks):
block_classes = FluxKontextBeforeDenoiseBlocks.values()
block_names = FluxKontextBeforeDenoiseBlocks.keys()
@property
def description(self):
return (
"Before denoise step that prepare the inputs for the denoise step\n"
"for img2img/text2img task for Flux Kontext."
)
class FluxKontextAutoBeforeDenoiseStep(AutoPipelineBlocks):
block_classes = [FluxKontextBeforeDenoiseStep, FluxBeforeDenoiseStep]
block_names = ["img2img", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Before denoise step that prepare the inputs for the denoise step.\n"
+ "This is an auto pipeline block that works for text2image.\n"
+ " - `FluxBeforeDenoiseStep` (text2image) is used.\n"
+ " - `FluxKontextBeforeDenoiseStep` (img2img) is used when only `image_latents` is provided.\n"
)
# denoise: text2image
class FluxAutoDenoiseStep(AutoPipelineBlocks):
block_classes = [FluxDenoiseStep]
block_names = ["denoise"]
block_trigger_inputs = [None]
@property
def description(self) -> str:
return (
"Denoise step that iteratively denoise the latents. "
"This is a auto pipeline block that works for text2image and img2img tasks."
" - `FluxDenoiseStep` (denoise) for text2image and img2img tasks."
)
# denoise: Flux Kontext
class FluxKontextAutoDenoiseStep(AutoPipelineBlocks):
block_classes = [FluxKontextDenoiseStep]
block_names = ["denoise"]
block_trigger_inputs = [None]
@property
def description(self) -> str:
return (
"Denoise step that iteratively denoise the latents for Flux Kontext. "
"This is a auto pipeline block that works for text2image and img2img tasks."
" - `FluxDenoiseStep` (denoise) for text2image and img2img tasks."
)
# decode: all task (text2img, img2img)
class FluxAutoDecodeStep(AutoPipelineBlocks):
block_classes = [FluxDecodeStep]
block_names = ["non-inpaint"]
block_trigger_inputs = [None]
@property
def description(self):
return "Decode step that decode the denoised latents into image outputs.\n - `FluxDecodeStep`"
# inputs: text2image/img2img
FluxImg2ImgBlocks = InsertableDict(
[("text_inputs", FluxTextInputStep()), ("additional_inputs", FluxInputsDynamicStep())]
)
class FluxImg2ImgInputStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = FluxImg2ImgBlocks.values()
block_names = FluxImg2ImgBlocks.keys()
@property
def description(self):
return "Input step that prepares the inputs for the img2img denoising step. It:\n"
" - make sure the text embeddings have consistent batch size as well as the additional inputs (`image_latents`).\n"
" - update height/width based `image_latents`, patchify `image_latents`."
class FluxAutoInputStep(AutoPipelineBlocks):
block_classes = [FluxImg2ImgInputStep, FluxTextInputStep]
block_names = ["img2img", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Input step that standardize the inputs for the denoising step, e.g. make sure inputs have consistent batch size, and patchified. \n"
" This is an auto pipeline block that works for text2image/img2img tasks.\n"
+ " - `FluxImg2ImgInputStep` (img2img) is used when `image_latents` is provided.\n"
+ " - `FluxTextInputStep` (text2image) is used when `image_latents` are not provided.\n"
)
# inputs: Flux Kontext
FluxKontextBlocks = InsertableDict(
[
("set_resolution", FluxKontextSetResolutionStep()),
("text_inputs", FluxTextInputStep()),
("additional_inputs", FluxKontextInputsDynamicStep()),
]
)
class FluxKontextInputStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = FluxKontextBlocks.values()
block_names = FluxKontextBlocks.keys()
@property
def description(self):
return (
"Input step that prepares the inputs for the both text2img and img2img denoising step. It:\n"
" - make sure the text embeddings have consistent batch size as well as the additional inputs (`image_latents`).\n"
" - update height/width based `image_latents`, patchify `image_latents`."
)
class FluxKontextAutoInputStep(AutoPipelineBlocks):
block_classes = [FluxKontextInputStep, FluxTextInputStep]
# block_classes = [FluxKontextInputStep]
block_names = ["img2img", "text2img"]
# block_names = ["img2img"]
block_trigger_inputs = ["image_latents", None]
# block_trigger_inputs = ["image_latents"]
@property
def description(self):
return (
"Input step that standardize the inputs for the denoising step, e.g. make sure inputs have consistent batch size, and patchified. \n"
" This is an auto pipeline block that works for text2image/img2img tasks.\n"
+ " - `FluxKontextInputStep` (img2img) is used when `image_latents` is provided.\n"
+ " - `FluxKontextInputStep` is also capable of handling text2image task when `image_latent` isn't present."
)
class FluxCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = [FluxAutoInputStep, FluxAutoBeforeDenoiseStep, FluxAutoDenoiseStep]
block_names = ["input", "before_denoise", "denoise"]
@property
def description(self):
return (
"Core step that performs the denoising process. \n"
+ " - `FluxAutoInputStep` (input) standardizes the inputs for the denoising step.\n"
+ " - `FluxAutoBeforeDenoiseStep` (before_denoise) prepares the inputs for the denoising step.\n"
+ " - `FluxAutoDenoiseStep` (denoise) iteratively denoises the latents.\n"
+ "This step supports text-to-image and image-to-image tasks for Flux:\n"
+ " - for image-to-image generation, you need to provide `image_latents`\n"
+ " - for text-to-image generation, all you need to provide is prompt embeddings."
)
class FluxKontextCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextAutoInputStep, FluxKontextAutoBeforeDenoiseStep, FluxKontextAutoDenoiseStep]
block_names = ["input", "before_denoise", "denoise"]
@property
def description(self):
return (
"Core step that performs the denoising process. \n"
+ " - `FluxKontextAutoInputStep` (input) standardizes the inputs for the denoising step.\n"
+ " - `FluxKontextAutoBeforeDenoiseStep` (before_denoise) prepares the inputs for the denoising step.\n"
+ " - `FluxKontextAutoDenoiseStep` (denoise) iteratively denoises the latents.\n"
+ "This step supports text-to-image and image-to-image tasks for Flux:\n"
+ " - for image-to-image generation, you need to provide `image_latents`\n"
+ " - for text-to-image generation, all you need to provide is prompt embeddings."
)
# Auto blocks (text2image and img2img)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("vae_encoder", FluxAutoVaeEncoderStep()),
("denoise", FluxCoreDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
AUTO_BLOCKS_KONTEXT = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("vae_encoder", FluxKontextAutoVaeEncoderStep()),
("denoise", FluxKontextCoreDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
class FluxAutoBlocks(SequentialPipelineBlocks):
model_name = "flux"
block_classes = AUTO_BLOCKS.values()
block_names = AUTO_BLOCKS.keys()
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image and image-to-image using Flux.\n"
+ "- for text-to-image generation, all you need to provide is `prompt`\n"
+ "- for image-to-image generation, you need to provide either `image` or `image_latents`"
)
class FluxKontextAutoBlocks(FluxAutoBlocks):
model_name = "flux-kontext"
block_classes = AUTO_BLOCKS_KONTEXT.values()
block_names = AUTO_BLOCKS_KONTEXT.keys()
TEXT2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("input", FluxTextInputStep()),
("prepare_latents", FluxPrepareLatentsStep()),
("set_timesteps", FluxSetTimestepsStep()),
("prepare_rope_inputs", FluxRoPEInputsStep()),
("denoise", FluxDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
IMAGE2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("vae_encoder", FluxVaeEncoderDynamicStep()),
("input", FluxImg2ImgInputStep()),
("prepare_latents", FluxPrepareLatentsStep()),
("set_timesteps", FluxImg2ImgSetTimestepsStep()),
("prepare_img2img_latents", FluxImg2ImgPrepareLatentsStep()),
("prepare_rope_inputs", FluxRoPEInputsStep()),
("denoise", FluxDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
FLUX_KONTEXT_BLOCKS = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("vae_encoder", FluxVaeEncoderDynamicStep(sample_mode="argmax")),
("input", FluxKontextInputStep()),
("prepare_latents", FluxPrepareLatentsStep()),
("set_timesteps", FluxSetTimestepsStep()),
("prepare_rope_inputs", FluxKontextRoPEInputsStep()),
("denoise", FluxKontextDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
ALL_BLOCKS = {
"text2image": TEXT2IMAGE_BLOCKS,
"img2img": IMAGE2IMAGE_BLOCKS,
"auto": AUTO_BLOCKS,
"auto_kontext": AUTO_BLOCKS_KONTEXT,
"kontext": FLUX_KONTEXT_BLOCKS,
}

View File

@@ -1,192 +0,0 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InsertableDict
from .before_denoise import (
FluxImg2ImgPrepareLatentsStep,
FluxImg2ImgSetTimestepsStep,
FluxKontextRoPEInputsStep,
FluxPrepareLatentsStep,
FluxRoPEInputsStep,
FluxSetTimestepsStep,
)
from .decoders import FluxDecodeStep
from .denoise import FluxDenoiseStep, FluxKontextDenoiseStep
from .encoders import (
FluxKontextProcessImagesInputStep,
FluxProcessImagesInputStep,
FluxTextEncoderStep,
FluxVaeEncoderStep,
)
from .inputs import (
FluxAdditionalInputsStep,
FluxKontextAdditionalInputsStep,
FluxKontextSetResolutionStep,
FluxTextInputStep,
)
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# vae encoder (run before before_denoise)
# auto_docstring
class FluxImg2ImgVaeEncoderStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = [FluxProcessImagesInputStep(), FluxVaeEncoderStep()]
block_names = ["preprocess", "encode"]
@property
def description(self) -> str:
return "Vae encoder step that preprocess andencode the image inputs into their latent representations."
# auto_docstring
class FluxAutoVaeEncoderStep(AutoPipelineBlocks):
model_name = "flux"
block_classes = [FluxImg2ImgVaeEncoderStep]
block_names = ["img2img"]
block_trigger_inputs = ["image"]
@property
def description(self):
return (
"Vae encoder step that encode the image inputs into their latent representations.\n"
+ "This is an auto pipeline block that works for img2img tasks.\n"
+ " - `FluxImg2ImgVaeEncoderStep` (img2img) is used when only `image` is provided."
+ " - if `image` is not provided, step will be skipped."
)
# before_denoise: text2img
# auto_docstring
class FluxBeforeDenoiseStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = [FluxPrepareLatentsStep(), FluxSetTimestepsStep(), FluxRoPEInputsStep()]
block_names = ["prepare_latents", "set_timesteps", "prepare_rope_inputs"]
@property
def description(self):
return "Before denoise step that prepares the inputs for the denoise step in text-to-image generation."
# before_denoise: img2img
# auto_docstring
class FluxImg2ImgBeforeDenoiseStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = [FluxPrepareLatentsStep(), FluxImg2ImgSetTimestepsStep(), FluxImg2ImgPrepareLatentsStep(), FluxRoPEInputsStep()]
block_names = ["prepare_latents", "set_timesteps", "prepare_img2img_latents", "prepare_rope_inputs"]
@property
def description(self):
return "Before denoise step that prepare the inputs for the denoise step for img2img task."
# before_denoise: all task (text2img, img2img)
# auto_docstring
class FluxAutoBeforeDenoiseStep(AutoPipelineBlocks):
model_name = "flux"
block_classes = [FluxImg2ImgBeforeDenoiseStep, FluxBeforeDenoiseStep]
block_names = ["img2img", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Before denoise step that prepare the inputs for the denoise step.\n"
+ "This is an auto pipeline block that works for text2image.\n"
+ " - `FluxBeforeDenoiseStep` (text2image) is used.\n"
+ " - `FluxImg2ImgBeforeDenoiseStep` (img2img) is used when only `image_latents` is provided.\n"
)
# inputs: text2image/img2img
# auto_docstring
class FluxImg2ImgInputStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = [FluxTextInputStep(), FluxAdditionalInputsStep()]
block_names = ["text_inputs", "additional_inputs"]
@property
def description(self):
return "Input step that prepares the inputs for the img2img denoising step. It:\n"
" - make sure the text embeddings have consistent batch size as well as the additional inputs (`image_latents`).\n"
" - update height/width based `image_latents`, patchify `image_latents`."
# auto_docstring
class FluxAutoInputStep(AutoPipelineBlocks):
model_name = "flux"
block_classes = [FluxImg2ImgInputStep, FluxTextInputStep]
block_names = ["img2img", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Input step that standardize the inputs for the denoising step, e.g. make sure inputs have consistent batch size, and patchified. \n"
" This is an auto pipeline block that works for text2image/img2img tasks.\n"
+ " - `FluxImg2ImgInputStep` (img2img) is used when `image_latents` is provided.\n"
+ " - `FluxTextInputStep` (text2image) is used when `image_latents` are not provided.\n"
)
# auto_docstring
class FluxCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux"
block_classes = [FluxAutoInputStep, FluxAutoBeforeDenoiseStep, FluxDenoiseStep]
block_names = ["input", "before_denoise", "denoise"]
@property
def description(self):
return (
"Core step that performs the denoising process for Flux.\n"
+ "This step supports text-to-image and image-to-image tasks for Flux:\n"
+ " - for image-to-image generation, you need to provide `image_latents`\n"
+ " - for text-to-image generation, all you need to provide is prompt embeddings."
)
# Auto blocks (text2image and img2img)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("vae_encoder", FluxAutoVaeEncoderStep()),
("denoise", FluxCoreDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
# auto_docstring
class FluxAutoBlocks(SequentialPipelineBlocks):
model_name = "flux"
block_classes = AUTO_BLOCKS.values()
block_names = AUTO_BLOCKS.keys()
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"image": True, "prompt": True},
}
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image and image-to-image using Flux."
)

View File

@@ -1,189 +0,0 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InsertableDict
from .before_denoise import (
FluxImg2ImgPrepareLatentsStep,
FluxImg2ImgSetTimestepsStep,
FluxKontextRoPEInputsStep,
FluxPrepareLatentsStep,
FluxRoPEInputsStep,
FluxSetTimestepsStep,
)
from .decoders import FluxDecodeStep
from .denoise import FluxDenoiseStep, FluxKontextDenoiseStep
from .encoders import (
FluxKontextProcessImagesInputStep,
FluxProcessImagesInputStep,
FluxTextEncoderStep,
FluxVaeEncoderStep,
)
from .inputs import (
FluxAdditionalInputsStep,
FluxKontextAdditionalInputsStep,
FluxKontextSetResolutionStep,
FluxTextInputStep,
)
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# Flux Kontext vae encoder (run before before_denoise)
class FluxKontextVaeEncoderStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextProcessImagesInputStep(), FluxVaeEncoderStep(sample_mode="argmax")]
block_names = ["preprocess", "encode"]
@property
def description(self) -> str:
return "Vae encoder step that preprocess andencode the image inputs into their latent representations."
class FluxKontextAutoVaeEncoderStep(AutoPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextVaeEncoderStep]
block_names = ["image_conditioned"]
block_trigger_inputs = ["image"]
@property
def description(self):
return (
"Vae encoder step that encode the image inputs into their latent representations.\n"
+ "This is an auto pipeline block that works for img2img tasks.\n"
+ " - `FluxKontextVaeEncoderStep` (img2img) is used when only `image` is provided."
+ " - if `image` is not provided, step will be skipped."
)
# before_denoise: text2img
class FluxKontextBeforeDenoiseStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxPrepareLatentsStep(), FluxSetTimestepsStep(), FluxRoPEInputsStep()]
block_names = ["prepare_latents", "set_timesteps", "prepare_rope_inputs"]
@property
def description(self):
return "Before denoise step that prepares the inputs for the denoise step in text-to-image generation."
# before_denoise: FluxKontext
class FluxKontextImageConditionedBeforeDenoiseStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxPrepareLatentsStep(), FluxSetTimestepsStep(), FluxKontextRoPEInputsStep()]
block_names = ["prepare_latents", "set_timesteps", "prepare_rope_inputs"]
@property
def description(self):
return (
"Before denoise step that prepare the inputs for the denoise step\n"
"for img2img/text2img task for Flux Kontext."
)
class FluxKontextAutoBeforeDenoiseStep(AutoPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextImageConditionedBeforeDenoiseStep, FluxKontextBeforeDenoiseStep]
block_names = ["image_conditioned", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Before denoise step that prepare the inputs for the denoise step.\n"
+ "This is an auto pipeline block that works for text2image.\n"
+ " - `FluxKontextBeforeDenoiseStep` (text2image) is used.\n"
+ " - `FluxKontextImageConditionedBeforeDenoiseStep` (image_conditioned) is used when only `image_latents` is provided.\n"
)
# inputs: Flux Kontext
class FluxKontextInputStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextSetResolutionStep(), FluxTextInputStep(), FluxKontextAdditionalInputsStep()]
block_names = ["set_resolution", "text_inputs", "additional_inputs"]
@property
def description(self):
return (
"Input step that prepares the inputs for the both text2img and img2img denoising step. It:\n"
" - make sure the text embeddings have consistent batch size as well as the additional inputs (`image_latents`).\n"
" - update height/width based `image_latents`, patchify `image_latents`."
)
class FluxKontextAutoInputStep(AutoPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextInputStep, FluxTextInputStep]
block_names = ["image_conditioned", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Input step that standardize the inputs for the denoising step, e.g. make sure inputs have consistent batch size, and patchified. \n"
" This is an auto pipeline block that works for text2image/img2img tasks.\n"
+ " - `FluxKontextInputStep` (image_conditioned) is used when `image_latents` is provided.\n"
+ " - `FluxKontextInputStep` is also capable of handling text2image task when `image_latent` isn't present."
)
# auto_docstring
class FluxKontextCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = [FluxKontextAutoInputStep, FluxKontextAutoBeforeDenoiseStep, FluxKontextDenoiseStep]
block_names = ["input", "before_denoise", "denoise"]
@property
def description(self):
return (
"Core step that performs the denoising process for Flux Kontext.\n"
+ "This step supports text-to-image and image-conditioned tasks for Flux Kontext:\n"
+ " - for image-conditioned generation, you need to provide `image_latents`\n"
+ " - for text-to-image generation, all you need to provide is prompt embeddings."
)
AUTO_BLOCKS_KONTEXT = InsertableDict(
[
("text_encoder", FluxTextEncoderStep()),
("vae_encoder", FluxKontextAutoVaeEncoderStep()),
("denoise", FluxKontextCoreDenoiseStep()),
("decode", FluxDecodeStep()),
]
)
class FluxKontextAutoBlocks(SequentialPipelineBlocks):
model_name = "flux-kontext"
block_classes = AUTO_BLOCKS_KONTEXT.values()
block_names = AUTO_BLOCKS_KONTEXT.keys()
_workflow_map = {
"image_conditioned": {"image": True, "prompt": True},
"text2image": {"prompt": True},
}
@property
def description(self):
return (
"Modular pipeline for image-to-image using Flux Kontext."
)

View File

@@ -21,11 +21,45 @@ except OptionalDependencyNotAvailable:
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
_import_structure["encoders"] = ["Flux2RemoteTextEncoderStep"]
_import_structure["modular_blocks_flux2"] = ["Flux2AutoBlocks"]
_import_structure["modular_blocks_flux2_klein_base"] = ["Flux2KleinBaseAutoBlocks"]
_import_structure["modular_blocks_flux2_klein"] = ["Flux2KleinAutoBlocks"]
_import_structure["modular_pipeline"] = ["Flux2ModularPipeline", "Flux2KleinModularPipeline", "Flux2KleinBaseModularPipeline"]
_import_structure["encoders"] = [
"Flux2TextEncoderStep",
"Flux2RemoteTextEncoderStep",
"Flux2VaeEncoderStep",
]
_import_structure["before_denoise"] = [
"Flux2SetTimestepsStep",
"Flux2PrepareLatentsStep",
"Flux2RoPEInputsStep",
"Flux2PrepareImageLatentsStep",
]
_import_structure["denoise"] = [
"Flux2LoopDenoiser",
"Flux2LoopAfterDenoiser",
"Flux2DenoiseLoopWrapper",
"Flux2DenoiseStep",
]
_import_structure["decoders"] = ["Flux2DecodeStep"]
_import_structure["inputs"] = [
"Flux2ProcessImagesInputStep",
"Flux2TextInputStep",
]
_import_structure["modular_blocks_flux2"] = [
"ALL_BLOCKS",
"AUTO_BLOCKS",
"REMOTE_AUTO_BLOCKS",
"TEXT2IMAGE_BLOCKS",
"IMAGE_CONDITIONED_BLOCKS",
"Flux2AutoBlocks",
"Flux2AutoVaeEncoderStep",
"Flux2CoreDenoiseStep",
"Flux2VaeEncoderSequentialStep",
]
_import_structure["modular_blocks_flux2_klein"] = ["Flux2KleinAutoBlocks", "Flux2KleinBaseAutoBlocks"]
_import_structure["modular_pipeline"] = [
"Flux2ModularPipeline",
"Flux2KleinModularPipeline",
"Flux2KleinBaseModularPipeline",
]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
@@ -34,10 +68,43 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
from .encoders import Flux2RemoteTextEncoderStep
from .modular_blocks_flux2 import Flux2AutoBlocks
from .modular_blocks_flux2_klein_base import Flux2KleinBaseAutoBlocks
from .modular_blocks_flux2_klein import Flux2KleinAutoBlocks
from .before_denoise import (
Flux2PrepareImageLatentsStep,
Flux2PrepareLatentsStep,
Flux2RoPEInputsStep,
Flux2SetTimestepsStep,
)
from .decoders import Flux2DecodeStep
from .denoise import (
Flux2DenoiseLoopWrapper,
Flux2DenoiseStep,
Flux2LoopAfterDenoiser,
Flux2LoopDenoiser,
)
from .encoders import (
Flux2RemoteTextEncoderStep,
Flux2TextEncoderStep,
Flux2VaeEncoderStep,
)
from .inputs import (
Flux2ProcessImagesInputStep,
Flux2TextInputStep,
)
from .modular_blocks_flux2 import (
ALL_BLOCKS,
AUTO_BLOCKS,
IMAGE_CONDITIONED_BLOCKS,
REMOTE_AUTO_BLOCKS,
TEXT2IMAGE_BLOCKS,
Flux2AutoBlocks,
Flux2AutoVaeEncoderStep,
Flux2CoreDenoiseStep,
Flux2VaeEncoderSequentialStep,
)
from .modular_blocks_flux2_klein import (
Flux2KleinAutoBlocks,
Flux2KleinBaseAutoBlocks,
)
from .modular_pipeline import Flux2KleinBaseModularPipeline, Flux2KleinModularPipeline, Flux2ModularPipeline
else:
import sys

View File

@@ -51,7 +51,6 @@ Flux2VaeEncoderBlocks = InsertableDict(
)
# auto_docstring
class Flux2VaeEncoderSequentialStep(SequentialPipelineBlocks):
model_name = "flux2"
@@ -63,7 +62,6 @@ class Flux2VaeEncoderSequentialStep(SequentialPipelineBlocks):
return "VAE encoder step that preprocesses, encodes, and prepares image latents for Flux2 conditioning."
# auto_docstring
class Flux2AutoVaeEncoderStep(AutoPipelineBlocks):
block_classes = [Flux2VaeEncoderSequentialStep]
block_names = ["img_conditioning"]
@@ -80,43 +78,6 @@ class Flux2AutoVaeEncoderStep(AutoPipelineBlocks):
Flux2CoreDenoiseBlocks = InsertableDict(
[
("input", Flux2TextInputStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_guidance", Flux2PrepareGuidanceStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2DenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
]
)
# auto_docstring
class Flux2CoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = Flux2CoreDenoiseBlocks.values()
block_names = Flux2CoreDenoiseBlocks.keys()
@property
def description(self):
return (
"Core denoise step that performs the denoising process for Flux2-dev."
)
@property
def outputs(self):
return [
OutputParam(
name="latents",
type_hint=torch.Tensor,
description="The latents from the denoising step.",
)
]
Flux2ImageConditionedCoreDenoiseBlocks = InsertableDict(
[
("input", Flux2TextInputStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
@@ -130,17 +91,24 @@ Flux2ImageConditionedCoreDenoiseBlocks = InsertableDict(
)
# auto_docstring
class Flux2ImageConditionedCoreDenoiseStep(SequentialPipelineBlocks):
class Flux2CoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = Flux2ImageConditionedCoreDenoiseBlocks.values()
block_names = Flux2ImageConditionedCoreDenoiseBlocks.keys()
block_classes = Flux2CoreDenoiseBlocks.values()
block_names = Flux2CoreDenoiseBlocks.keys()
@property
def description(self):
return (
"Core denoise step that performs the denoising process for Flux2-dev with image conditioning."
"Core denoise step that performs the denoising process for Flux2-dev.\n"
" - `Flux2TextInputStep` (input) standardizes the text inputs (prompt_embeds) for the denoising step.\n"
" - `Flux2PrepareImageLatentsStep` (prepare_image_latents) prepares the image latents and image_latent_ids for the denoising step.\n"
" - `Flux2PrepareLatentsStep` (prepare_latents) prepares the initial latents (latents) and latent_ids for the denoising step.\n"
" - `Flux2SetTimestepsStep` (set_timesteps) sets the timesteps for the denoising step.\n"
" - `Flux2PrepareGuidanceStep` (prepare_guidance) prepares the guidance tensor for the denoising step.\n"
" - `Flux2RoPEInputsStep` (prepare_rope_inputs) prepares the RoPE inputs (txt_ids) for the denoising step.\n"
" - `Flux2DenoiseStep` (denoise) iteratively denoises the latents.\n"
" - `Flux2UnpackLatentsStep` (after_denoise) unpacks the latents from the denoising step.\n"
)
@property
@@ -153,45 +121,39 @@ class Flux2ImageConditionedCoreDenoiseStep(SequentialPipelineBlocks):
)
]
class Flux2AutoCoreDenoiseStep(AutoPipelineBlocks):
model_name = "flux2"
block_classes = [Flux2ImageConditionedCoreDenoiseStep, Flux2CoreDenoiseStep]
block_names = ["image_conditioned", "text2image"]
block_trigger_inputs = ["image_latents", None]
@property
def description(self):
return (
"Auto core denoise step that performs the denoising process for Flux2-dev."
"This is an auto pipeline block that works for text-to-image and image-conditioned generation."
" - `Flux2CoreDenoiseStep` is used for text-to-image generation.\n"
" - `Flux2ImageConditionedCoreDenoiseStep` is used for image-conditioned generation.\n"
)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", Flux2TextEncoderStep()),
("vae_encoder", Flux2AutoVaeEncoderStep()),
("denoise", Flux2AutoCoreDenoiseStep()),
("denoise", Flux2CoreDenoiseStep()),
("decode", Flux2DecodeStep()),
]
)
# auto_docstring
REMOTE_AUTO_BLOCKS = InsertableDict(
[
("text_encoder", Flux2RemoteTextEncoderStep()),
("vae_encoder", Flux2AutoVaeEncoderStep()),
("denoise", Flux2CoreDenoiseStep()),
("decode", Flux2DecodeStep()),
]
)
class Flux2AutoBlocks(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = AUTO_BLOCKS.values()
block_names = AUTO_BLOCKS.keys()
_workflow_map = {
"text2image": {"prompt": True},
"image_conditioned": {"image": True, "prompt": True},
}
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image and image-conditioned generation using Flux2."
"Auto Modular pipeline for text-to-image and image-conditioned generation using Flux2.\n"
"- For text-to-image generation, all you need to provide is `prompt`.\n"
"- For image-conditioned generation, you need to provide `image` (list of PIL images)."
)
@property
@@ -203,3 +165,42 @@ class Flux2AutoBlocks(SequentialPipelineBlocks):
description="The images from the decoding step.",
)
]
TEXT2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", Flux2TextEncoderStep()),
("text_input", Flux2TextInputStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_guidance", Flux2PrepareGuidanceStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2DenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
("decode", Flux2DecodeStep()),
]
)
IMAGE_CONDITIONED_BLOCKS = InsertableDict(
[
("text_encoder", Flux2TextEncoderStep()),
("text_input", Flux2TextInputStep()),
("preprocess_images", Flux2ProcessImagesInputStep()),
("vae_encoder", Flux2VaeEncoderStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_guidance", Flux2PrepareGuidanceStep()),
("prepare_rope_inputs", Flux2RoPEInputsStep()),
("denoise", Flux2DenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
("decode", Flux2DecodeStep()),
]
)
ALL_BLOCKS = {
"text2image": TEXT2IMAGE_BLOCKS,
"image_conditioned": IMAGE_CONDITIONED_BLOCKS,
"auto": AUTO_BLOCKS,
"remote": REMOTE_AUTO_BLOCKS,
}

View File

@@ -47,12 +47,19 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# VAE encoder
################
Flux2KleinVaeEncoderBlocks = InsertableDict(
[
("preprocess", Flux2ProcessImagesInputStep()),
("encode", Flux2VaeEncoderStep()),
]
)
class Flux2KleinVaeEncoderSequentialStep(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = [Flux2ProcessImagesInputStep(), Flux2VaeEncoderStep()]
block_names = ["preprocess", "encode"]
block_classes = Flux2KleinVaeEncoderBlocks.values()
block_names = Flux2KleinVaeEncoderBlocks.keys()
@property
def description(self) -> str:
@@ -100,7 +107,14 @@ class Flux2KleinCoreDenoiseStep(SequentialPipelineBlocks):
@property
def description(self):
return (
"Core denoise step that performs the denoising process for Flux2-Klein (distilled model)."
"Core denoise step that performs the denoising process for Flux2-Klein (distilled model).\n"
" - `Flux2KleinTextInputStep` (input) standardizes the text inputs (prompt_embeds) for the denoising step.\n"
" - `Flux2PrepareImageLatentsStep` (prepare_image_latents) prepares the image latents and image_latent_ids for the denoising step.\n"
" - `Flux2PrepareLatentsStep` (prepare_latents) prepares the initial latents (latents) and latent_ids for the denoising step.\n"
" - `Flux2SetTimestepsStep` (set_timesteps) sets the timesteps for the denoising step.\n"
" - `Flux2RoPEInputsStep` (prepare_rope_inputs) prepares the RoPE inputs (txt_ids) for the denoising step.\n"
" - `Flux2KleinDenoiseStep` (denoise) iteratively denoises the latents.\n"
" - `Flux2UnpackLatentsStep` (after_denoise) unpacks the latents from the denoising step.\n"
)
@property
@@ -114,12 +128,52 @@ class Flux2KleinCoreDenoiseStep(SequentialPipelineBlocks):
]
Flux2KleinBaseCoreDenoiseBlocks = InsertableDict(
[
("input", Flux2KleinBaseTextInputStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_rope_inputs", Flux2KleinBaseRoPEInputsStep()),
("denoise", Flux2KleinBaseDenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
]
)
class Flux2KleinBaseCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = Flux2KleinBaseCoreDenoiseBlocks.values()
block_names = Flux2KleinBaseCoreDenoiseBlocks.keys()
@property
def description(self):
return "Core denoise step that performs the denoising process for Flux2-Klein (base model)."
return (
"Core denoise step that performs the denoising process for Flux2-Klein (base model).\n"
" - `Flux2KleinBaseTextInputStep` (input) standardizes the text inputs (prompt_embeds + negative_prompt_embeds) for the denoising step.\n"
" - `Flux2PrepareImageLatentsStep` (prepare_image_latents) prepares the image latents and image_latent_ids for the denoising step.\n"
" - `Flux2PrepareLatentsStep` (prepare_latents) prepares the initial latents (latents) and latent_ids for the denoising step.\n"
" - `Flux2SetTimestepsStep` (set_timesteps) sets the timesteps for the denoising step.\n"
" - `Flux2KleinBaseRoPEInputsStep` (prepare_rope_inputs) prepares the RoPE inputs (txt_ids + negative_txt_ids) for the denoising step.\n"
" - `Flux2KleinBaseDenoiseStep` (denoise) iteratively denoises the latents using Classifier-Free Guidance.\n"
" - `Flux2UnpackLatentsStep` (after_denoise) unpacks the latents from the denoising step.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="latents",
type_hint=torch.Tensor,
description="The latents from the denoising step.",
)
]
###
### Auto blocks
###
# auto_docstring
class Flux2KleinAutoBlocks(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = [
@@ -129,15 +183,42 @@ class Flux2KleinAutoBlocks(SequentialPipelineBlocks):
Flux2DecodeStep(),
]
block_names = ["text_encoder", "vae_encoder", "denoise", "decode"]
_workflow_map = {
"text2image": {"prompt": True},
"image_conditioned": {"image": True, "prompt": True},
}
@property
def description(self):
return (
"Auto blocks that perform the text-to-image and image-conditioned generation using Flux2-Klein."
"Auto blocks that perform the text-to-image and image-conditioned generation using Flux2-Klein.\n"
+ " - for image-conditioned generation, you need to provide `image` (list of PIL images).\n"
+ " - for text-to-image generation, all you need to provide is `prompt`.\n"
)
@property
def outputs(self):
return [
OutputParam(
name="images",
type_hint=List[PIL.Image.Image],
description="The images from the decoding step.",
)
]
class Flux2KleinBaseAutoBlocks(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = [
Flux2KleinBaseTextEncoderStep(),
Flux2KleinAutoVaeEncoderStep(),
Flux2KleinBaseCoreDenoiseStep(),
Flux2DecodeStep(),
]
block_names = ["text_encoder", "vae_encoder", "denoise", "decode"]
@property
def description(self):
return (
"Auto blocks that perform the text-to-image and image-conditioned generation using Flux2-Klein (base model).\n"
+ " - for image-conditioned generation, you need to provide `image` (list of PIL images).\n"
+ " - for text-to-image generation, all you need to provide is `prompt`.\n"
)
@property

View File

@@ -1,149 +0,0 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import PIL.Image
import torch
from ...utils import logging
from ..modular_pipeline import AutoPipelineBlocks, SequentialPipelineBlocks
from ..modular_pipeline_utils import InsertableDict, OutputParam
from .before_denoise import (
Flux2KleinBaseRoPEInputsStep,
Flux2PrepareImageLatentsStep,
Flux2PrepareLatentsStep,
Flux2RoPEInputsStep,
Flux2SetTimestepsStep,
)
from .decoders import Flux2DecodeStep, Flux2UnpackLatentsStep
from .denoise import Flux2KleinBaseDenoiseStep, Flux2KleinDenoiseStep
from .encoders import (
Flux2KleinBaseTextEncoderStep,
Flux2KleinTextEncoderStep,
Flux2VaeEncoderStep,
)
from .inputs import (
Flux2KleinBaseTextInputStep,
Flux2ProcessImagesInputStep,
Flux2TextInputStep,
)
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
################
# VAE encoder
################
class Flux2KleinBaseVaeEncoderSequentialStep(SequentialPipelineBlocks):
model_name = "flux2"
block_classes = [Flux2ProcessImagesInputStep(), Flux2VaeEncoderStep()]
block_names = ["preprocess", "encode"]
@property
def description(self) -> str:
return "VAE encoder step that preprocesses and encodes the image inputs into their latent representations."
class Flux2KleinBaseAutoVaeEncoderStep(AutoPipelineBlocks):
block_classes = [Flux2KleinBaseVaeEncoderSequentialStep]
block_names = ["img_conditioning"]
block_trigger_inputs = ["image"]
@property
def description(self):
return (
"VAE encoder step that encodes the image inputs into their latent representations.\n"
"This is an auto pipeline block that works for image conditioning tasks.\n"
" - `Flux2KleinBaseVaeEncoderSequentialStep` is used when `image` is provided.\n"
" - If `image` is not provided, step will be skipped."
)
###
### Core denoise
###
Flux2KleinBaseCoreDenoiseBlocks = InsertableDict(
[
("input", Flux2KleinBaseTextInputStep()),
("prepare_latents", Flux2PrepareLatentsStep()),
("prepare_image_latents", Flux2PrepareImageLatentsStep()),
("set_timesteps", Flux2SetTimestepsStep()),
("prepare_rope_inputs", Flux2KleinBaseRoPEInputsStep()),
("denoise", Flux2KleinBaseDenoiseStep()),
("after_denoise", Flux2UnpackLatentsStep()),
]
)
class Flux2KleinBaseCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = Flux2KleinBaseCoreDenoiseBlocks.values()
block_names = Flux2KleinBaseCoreDenoiseBlocks.keys()
@property
def description(self):
return (
"Core denoise step that performs the denoising process for Flux2-Klein (base model)."
)
@property
def outputs(self):
return [
OutputParam(
name="latents",
type_hint=torch.Tensor,
description="The latents from the denoising step.",
)
]
###
### Auto blocks
###
# auto_docstring
class Flux2KleinBaseAutoBlocks(SequentialPipelineBlocks):
model_name = "flux2-klein"
block_classes = [
Flux2KleinBaseTextEncoderStep(),
Flux2KleinBaseAutoVaeEncoderStep(),
Flux2KleinBaseCoreDenoiseStep(),
Flux2DecodeStep(),
]
block_names = ["text_encoder", "vae_encoder", "denoise", "decode"]
_workflow_map = {
"text2image": {"prompt": True},
"image_conditioned": {"image": True, "prompt": True},
}
@property
def description(self):
return (
"Auto blocks that perform the text-to-image and image-conditioned generation using Flux2-Klein (base model)."
)
@property
def outputs(self):
return [
OutputParam(
name="images",
type_hint=List[PIL.Image.Image],
description="The images from the decoding step.",
)
]

View File

@@ -19,7 +19,7 @@ import warnings
from collections import OrderedDict
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Any, Optional
from typing import Any
import torch
from huggingface_hub import create_repo
@@ -40,11 +40,8 @@ from .modular_pipeline_utils import (
InputParam,
InsertableDict,
OutputParam,
combine_inputs,
combine_outputs,
format_components,
format_configs,
format_workflow,
generate_modular_model_card_content,
make_doc_string,
)
@@ -290,7 +287,6 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
config_name = "modular_config.json"
model_name = None
_workflow_map = None
@classmethod
def _get_signature_keys(cls, obj):
@@ -346,35 +342,6 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
def outputs(self) -> list[OutputParam]:
return self._get_outputs()
# currentlyonly ConditionalPipelineBlocks and SequentialPipelineBlocks support `get_execution_blocks`
def get_execution_blocks(self, **kwargs):
"""
Get the block(s) that would execute given the inputs. Must be implemented by subclasses that support
conditional block selection.
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
"""
raise NotImplementedError(f"`get_execution_blocks` is not implemented for {self.__class__.__name__}")
# currently only SequentialPipelineBlocks support workflows
@property
def workflow_names(self):
"""
Returns a list of available workflow names. Must be implemented by subclasses that define `_workflow_map`.
"""
raise NotImplementedError(f"`workflow_names` is not implemented for {self.__class__.__name__}")
def get_workflow(self, workflow_name: str):
"""
Get the execution blocks for a specific workflow. Must be implemented by subclasses that define
`_workflow_map`.
Args:
workflow_name: Name of the workflow to retrieve.
"""
raise NotImplementedError(f"`get_workflow` is not implemented for {self.__class__.__name__}")
@classmethod
def from_pretrained(
cls,
@@ -513,6 +480,72 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
if current_value is not param: # Using identity comparison to check if object was modified
state.set(param_name, param, input_param.kwargs_type)
@staticmethod
def combine_inputs(*named_input_lists: list[tuple[str, list[InputParam]]]) -> list[InputParam]:
"""
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if
current default value is None and new default value is not None. Warns if multiple non-None default values
exist for the same input.
Args:
named_input_lists: list of tuples containing (block_name, input_param_list) pairs
Returns:
list[InputParam]: Combined list of unique InputParam objects
"""
combined_dict = {} # name -> InputParam
value_sources = {} # name -> block_name
for block_name, inputs in named_input_lists:
for input_param in inputs:
if input_param.name is None and input_param.kwargs_type is not None:
input_name = "*_" + input_param.kwargs_type
else:
input_name = input_param.name
if input_name in combined_dict:
current_param = combined_dict[input_name]
if (
current_param.default is not None
and input_param.default is not None
and current_param.default != input_param.default
):
warnings.warn(
f"Multiple different default values found for input '{input_name}': "
f"{current_param.default} (from block '{value_sources[input_name]}') and "
f"{input_param.default} (from block '{block_name}'). Using {current_param.default}."
)
if current_param.default is None and input_param.default is not None:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
else:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
return list(combined_dict.values())
@staticmethod
def combine_outputs(*named_output_lists: list[tuple[str, list[OutputParam]]]) -> list[OutputParam]:
"""
Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
occurrence of each output name.
Args:
named_output_lists: list of tuples containing (block_name, output_param_list) pairs
Returns:
list[OutputParam]: Combined list of unique OutputParam objects
"""
combined_dict = {} # name -> OutputParam
for block_name, outputs in named_output_lists:
for output_param in outputs:
if (output_param.name not in combined_dict) or (
combined_dict[output_param.name].kwargs_type is None and output_param.kwargs_type is not None
):
combined_dict[output_param.name] = output_param
return list(combined_dict.values())
@property
def input_names(self) -> list[str]:
return [input_param.name for input_param in self.inputs if input_param.name is not None]
@@ -544,8 +577,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
class ConditionalPipelineBlocks(ModularPipelineBlocks):
"""
A Pipeline Blocks that conditionally selects a block to run based on the inputs. Subclasses must implement the
`select_block` method to define the logic for selecting the block. Currently, we only support selection logic based
on the presence or absence of inputs (i.e., whether they are `None` or not)
`select_block` method to define the logic for selecting the block.
This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the
library implements for all the pipeline blocks (such as loading or saving etc.)
@@ -553,20 +585,15 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
> [!WARNING] > This is an experimental feature and is likely to change in the future.
Attributes:
block_classes: List of block classes to be used. Must have the same length as `block_names`.
block_names: List of names for each block. Must have the same length as `block_classes`.
block_trigger_inputs: List of input names that `select_block()` uses to determine which block to run.
For `ConditionalPipelineBlocks`, this does not need to correspond to `block_names` and `block_classes`. For
`AutoPipelineBlocks`, this must have the same length as `block_names` and `block_classes`, where each
element specifies the trigger input for the corresponding block.
default_block_name: Name of the default block to run when no trigger inputs match.
If None, this block can be skipped entirely when no trigger inputs are provided.
block_classes: List of block classes to be used
block_names: List of prefixes for each block
block_trigger_inputs: List of input names that select_block() uses to determine which block to run
"""
block_classes = []
block_names = []
block_trigger_inputs = []
default_block_name = None
default_block_name = None # name of the default block if no trigger inputs are provided, if None, this block can be skipped if no trigger inputs are provided
def __init__(self):
sub_blocks = InsertableDict()
@@ -630,7 +657,7 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
@property
def inputs(self) -> list[tuple[str, Any]]:
named_inputs = [(name, block.inputs) for name, block in self.sub_blocks.items()]
combined_inputs = combine_inputs(*named_inputs)
combined_inputs = self.combine_inputs(*named_inputs)
# mark Required inputs only if that input is required by all the blocks
for input_param in combined_inputs:
if input_param.name in self.required_inputs:
@@ -642,16 +669,15 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
@property
def intermediate_outputs(self) -> list[str]:
named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()]
combined_outputs = combine_outputs(*named_outputs)
combined_outputs = self.combine_outputs(*named_outputs)
return combined_outputs
@property
def outputs(self) -> list[str]:
named_outputs = [(name, block.outputs) for name, block in self.sub_blocks.items()]
combined_outputs = combine_outputs(*named_outputs)
combined_outputs = self.combine_outputs(*named_outputs)
return combined_outputs
# used for `__repr__`
def _get_trigger_inputs(self) -> set:
"""
Returns a set of all unique trigger input values found in this block and nested blocks.
@@ -680,7 +706,12 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
return all_triggers
def select_block(self, **kwargs) -> Optional[str]:
@property
def trigger_inputs(self):
"""All trigger inputs including from nested blocks."""
return self._get_trigger_inputs()
def select_block(self, **kwargs) -> str | None:
"""
Select the block to run based on the trigger inputs. Subclasses must implement this method to define the logic
for selecting the block.
@@ -719,39 +750,6 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
logger.error(error_msg)
raise
def get_execution_blocks(self, **kwargs) -> Optional["ModularPipelineBlocks"]:
"""
Get the block(s) that would execute given the inputs.
Recursively resolves nested ConditionalPipelineBlocks until reaching either:
- A leaf block (no sub_blocks or LoopSequentialPipelineBlocks) → returns single `ModularPipelineBlocks`
- A `SequentialPipelineBlocks` → delegates to its `get_execution_blocks()` which returns
a `SequentialPipelineBlocks` containing the resolved execution blocks
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
Returns:
- `ModularPipelineBlocks`: A leaf block or resolved `SequentialPipelineBlocks`
- `None`: If this block would be skipped (no trigger matched and no default)
"""
trigger_kwargs = {name: kwargs.get(name) for name in self.block_trigger_inputs if name is not None}
block_name = self.select_block(**trigger_kwargs)
if block_name is None:
block_name = self.default_block_name
if block_name is None:
return None
block = self.sub_blocks[block_name]
# Recursively resolve until we hit a leaf block
if block.sub_blocks and not isinstance(block, LoopSequentialPipelineBlocks):
return block.get_execution_blocks(**kwargs)
return block
def __repr__(self):
class_name = self.__class__.__name__
base_class = self.__class__.__bases__[0].__name__
@@ -759,11 +757,11 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
f"{class_name}(\n Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
)
if self._get_trigger_inputs():
if self.trigger_inputs:
header += "\n"
header += " " + "=" * 100 + "\n"
header += " This pipeline contains blocks that are selected at runtime based on inputs.\n"
header += f" Trigger Inputs: {sorted(self._get_trigger_inputs())}\n"
header += f" Trigger Inputs: {sorted(self.trigger_inputs)}\n"
header += " " + "=" * 100 + "\n\n"
# Format description with proper indentation
@@ -830,56 +828,24 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
class AutoPipelineBlocks(ConditionalPipelineBlocks):
"""
A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs.
This is a specialized version of `ConditionalPipelineBlocks` where:
- Each block has one corresponding trigger input (1:1 mapping)
- Block selection is automatic: the first block whose trigger input is present gets selected
- `block_trigger_inputs` must have the same length as `block_names` and `block_classes`
- Use `None` in `block_trigger_inputs` to specify the default block, i.e the block that will run if no trigger
inputs are present
Attributes:
block_classes:
List of block classes to be used. Must have the same length as `block_names` and
`block_trigger_inputs`.
block_names:
List of names for each block. Must have the same length as `block_classes` and `block_trigger_inputs`.
block_trigger_inputs:
List of input names where each element specifies the trigger input for the corresponding block. Use
`None` to mark the default block.
Example:
```python
class MyAutoBlock(AutoPipelineBlocks):
block_classes = [InpaintEncoderBlock, ImageEncoderBlock, TextEncoderBlock]
block_names = ["inpaint", "img2img", "text2img"]
block_trigger_inputs = ["mask_image", "image", None] # text2img is the default
```
With this definition:
- As long as `mask_image` is provided, "inpaint" block runs (regardless of `image` being provided or not)
- If `mask_image` is not provided but `image` is provided, "img2img" block runs
- Otherwise, "text2img" block runs (default, trigger is `None`)
A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs.
"""
def __init__(self):
super().__init__()
if self.default_block_name is not None:
raise ValueError(
f"In {self.__class__.__name__}, do not set `default_block_name` for AutoPipelineBlocks. "
f"Use `None` in `block_trigger_inputs` to specify the default block."
)
if not (len(self.block_classes) == len(self.block_names) == len(self.block_trigger_inputs)):
raise ValueError(
f"In {self.__class__.__name__}, the number of block_classes, block_names, and block_trigger_inputs must be the same."
)
@property
def default_block_name(self) -> str | None:
"""Derive default_block_name from block_trigger_inputs (None entry)."""
if None in self.block_trigger_inputs:
idx = self.block_trigger_inputs.index(None)
self.default_block_name = self.block_names[idx]
return self.block_names[idx]
return None
def select_block(self, **kwargs) -> str | None:
"""Select block based on which trigger input is present (not None)."""
@@ -933,29 +899,6 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
expected_configs.append(config)
return expected_configs
@property
def workflow_names(self):
if self._workflow_map is None:
raise NotImplementedError(
f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}"
)
return list(self._workflow_map.keys())
def get_workflow(self, workflow_name: str):
if self._workflow_map is None:
raise NotImplementedError(
f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}"
)
if workflow_name not in self._workflow_map:
raise ValueError(f"Workflow {workflow_name} not found in {self.__class__.__name__}")
trigger_inputs = self._workflow_map[workflow_name]
workflow_blocks = self.get_execution_blocks(**trigger_inputs)
return workflow_blocks
@classmethod
def from_blocks_dict(
cls, blocks_dict: dict[str, Any], description: str | None = None
@@ -1051,7 +994,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
# filter out them here so they do not end up as intermediate_outputs
if name not in inp_names:
named_outputs.append((name, block.intermediate_outputs))
combined_outputs = combine_outputs(*named_outputs)
combined_outputs = self.combine_outputs(*named_outputs)
return combined_outputs
# YiYi TODO: I think we can remove the outputs property
@@ -1075,7 +1018,6 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
raise
return pipeline, state
# used for `__repr__`
def _get_trigger_inputs(self):
"""
Returns a set of all unique trigger input values found in the blocks.
@@ -1099,50 +1041,89 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
return fn_recursive_get_trigger(self.sub_blocks)
def get_execution_blocks(self, **kwargs) -> "SequentialPipelineBlocks":
@property
def trigger_inputs(self):
return self._get_trigger_inputs()
def _traverse_trigger_blocks(self, active_inputs):
"""
Get the blocks that would execute given the specified inputs.
Traverse blocks and select which ones would run given the active inputs.
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
active_inputs: Dict of input names to values that are "present"
Returns:
SequentialPipelineBlocks containing only the blocks that would execute
OrderedDict of block_name -> block that would execute
"""
# Copy kwargs so we can add outputs as we traverse
active_inputs = dict(kwargs)
def fn_recursive_traverse(block, block_name, active_inputs):
result_blocks = OrderedDict()
# ConditionalPipelineBlocks (includes AutoPipelineBlocks)
if isinstance(block, ConditionalPipelineBlocks):
block = block.get_execution_blocks(**active_inputs)
if block is None:
trigger_kwargs = {name: active_inputs.get(name) for name in block.block_trigger_inputs}
selected_block_name = block.select_block(**trigger_kwargs)
if selected_block_name is None:
selected_block_name = block.default_block_name
if selected_block_name is None:
return result_blocks
# Has sub_blocks (SequentialPipelineBlocks/ConditionalPipelineBlocks)
if block.sub_blocks and not isinstance(block, LoopSequentialPipelineBlocks):
selected_block = block.sub_blocks[selected_block_name]
if selected_block.sub_blocks:
result_blocks.update(fn_recursive_traverse(selected_block, block_name, active_inputs))
else:
result_blocks[block_name] = selected_block
if hasattr(selected_block, "outputs"):
for out in selected_block.outputs:
active_inputs[out.name] = True
return result_blocks
# SequentialPipelineBlocks or LoopSequentialPipelineBlocks
if block.sub_blocks:
for sub_block_name, sub_block in block.sub_blocks.items():
nested_blocks = fn_recursive_traverse(sub_block, sub_block_name, active_inputs)
nested_blocks = {f"{block_name}.{k}": v for k, v in nested_blocks.items()}
result_blocks.update(nested_blocks)
blocks_to_update = fn_recursive_traverse(sub_block, sub_block_name, active_inputs)
blocks_to_update = {f"{block_name}.{k}": v for k, v in blocks_to_update.items()}
result_blocks.update(blocks_to_update)
else:
# Leaf block: single ModularPipelineBlocks or LoopSequentialPipelineBlocks
result_blocks[block_name] = block
# Add outputs to active_inputs so subsequent blocks can use them as triggers
if hasattr(block, "intermediate_outputs"):
for out in block.intermediate_outputs:
if hasattr(block, "outputs"):
for out in block.outputs:
active_inputs[out.name] = True
return result_blocks
all_blocks = OrderedDict()
for block_name, block in self.sub_blocks.items():
nested_blocks = fn_recursive_traverse(block, block_name, active_inputs)
all_blocks.update(nested_blocks)
blocks_to_update = fn_recursive_traverse(block, block_name, active_inputs)
all_blocks.update(blocks_to_update)
return all_blocks
return SequentialPipelineBlocks.from_blocks_dict(all_blocks)
def get_execution_blocks(self, **kwargs):
"""
Get the blocks that would execute given the specified inputs.
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
Pass any inputs that would be non-None at runtime.
Returns:
SequentialPipelineBlocks containing only the blocks that would execute
Example:
# Get blocks for inpainting workflow blocks = pipeline.get_execution_blocks(prompt="a cat", mask=mask,
image=image)
# Get blocks for text2image workflow blocks = pipeline.get_execution_blocks(prompt="a cat")
"""
# Filter out None values
active_inputs = {k: v for k, v in kwargs.items() if v is not None}
blocks_triggered = self._traverse_trigger_blocks(active_inputs)
return SequentialPipelineBlocks.from_blocks_dict(blocks_triggered)
def __repr__(self):
class_name = self.__class__.__name__
@@ -1151,23 +1132,18 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
f"{class_name}(\n Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
)
if self._workflow_map is None and self._get_trigger_inputs():
if self.trigger_inputs:
header += "\n"
header += " " + "=" * 100 + "\n"
header += " This pipeline contains blocks that are selected at runtime based on inputs.\n"
header += f" Trigger Inputs: {[inp for inp in self._get_trigger_inputs() if inp is not None]}\n"
header += f" Trigger Inputs: {[inp for inp in self.trigger_inputs if inp is not None]}\n"
# Get first trigger input as example
example_input = next(t for t in self._get_trigger_inputs() if t is not None)
example_input = next(t for t in self.trigger_inputs if t is not None)
header += f" Use `get_execution_blocks()` to see selected blocks (e.g. `get_execution_blocks({example_input}=...)`).\n"
header += " " + "=" * 100 + "\n\n"
description = self.description
if self._workflow_map is not None:
workflow_str = format_workflow(self._workflow_map)
description = f"{self.description}\n\n{workflow_str}"
# Format description with proper indentation
desc_lines = description.split("\n")
desc_lines = self.description.split("\n")
desc = []
# First line with "Description:" label
desc.append(f" Description: {desc_lines[0]}")
@@ -1215,15 +1191,10 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
@property
def doc(self):
description = self.description
if self._workflow_map is not None:
workflow_str = format_workflow(self._workflow_map)
description = f"{self.description}\n\n{workflow_str}"
return make_doc_string(
self.inputs,
self.outputs,
description=description,
self.description,
class_name=self.__class__.__name__,
expected_components=self.expected_components,
expected_configs=self.expected_configs,
@@ -1356,7 +1327,7 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks):
@property
def intermediate_outputs(self) -> list[str]:
named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()]
combined_outputs = combine_outputs(*named_outputs)
combined_outputs = self.combine_outputs(*named_outputs)
for output in self.loop_intermediate_outputs:
if output.name not in {output.name for output in combined_outputs}:
combined_outputs.append(output)

View File

@@ -14,10 +14,10 @@
import inspect
import re
import warnings
from collections import OrderedDict
from dataclasses import dataclass, field, fields
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
from dataclasses import dataclass, field
from types import UnionType
from typing import Any, Literal, Type, Union, get_args, get_origin
import PIL.Image
import torch
@@ -887,30 +887,6 @@ def format_configs(configs, indent_level=4, max_line_length=115, add_empty_lines
return "\n".join(formatted_configs)
def format_workflow(workflow_map):
"""Format a workflow map into a readable string representation.
Args:
workflow_map: Dictionary mapping workflow names to trigger inputs
Returns:
A formatted string representing all workflows
"""
if workflow_map is None:
return ""
lines = ["Supported workflows:"]
for workflow_name, trigger_inputs in workflow_map.items():
required_inputs = [k for k, v in trigger_inputs.items() if v]
if required_inputs:
inputs_str = ", ".join(f"`{t}`" for t in required_inputs)
lines.append(f" - `{workflow_name}`: requires {inputs_str}")
else:
lines.append(f" - `{workflow_name}`: default (no additional inputs required)")
return "\n".join(lines)
def make_doc_string(
inputs,
outputs,
@@ -967,71 +943,7 @@ def make_doc_string(
return output
def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]:
"""
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if current
default value is None and new default value is not None. Warns if multiple non-None default values exist for the
same input.
Args:
named_input_lists: List of tuples containing (block_name, input_param_list) pairs
Returns:
List[InputParam]: Combined list of unique InputParam objects
"""
combined_dict = {} # name -> InputParam
value_sources = {} # name -> block_name
for block_name, inputs in named_input_lists:
for input_param in inputs:
if input_param.name is None and input_param.kwargs_type is not None:
input_name = "*_" + input_param.kwargs_type
else:
input_name = input_param.name
if input_name in combined_dict:
current_param = combined_dict[input_name]
if (
current_param.default is not None
and input_param.default is not None
and current_param.default != input_param.default
):
warnings.warn(
f"Multiple different default values found for input '{input_name}': "
f"{current_param.default} (from block '{value_sources[input_name]}') and "
f"{input_param.default} (from block '{block_name}'). Using {current_param.default}."
)
if current_param.default is None and input_param.default is not None:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
else:
combined_dict[input_name] = input_param
value_sources[input_name] = block_name
return list(combined_dict.values())
def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]:
"""
Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
occurrence of each output name.
Args:
named_output_lists: List of tuples containing (block_name, output_param_list) pairs
Returns:
List[OutputParam]: Combined list of unique OutputParam objects
"""
combined_dict = {} # name -> OutputParam
for block_name, outputs in named_output_lists:
for output_param in outputs:
if (output_param.name not in combined_dict) or (
combined_dict[output_param.name].kwargs_type is None and output_param.kwargs_type is not None
):
combined_dict[output_param.name] = output_param
return list(combined_dict.values())
def generate_modular_model_card_content(blocks) -> Dict[str, Any]:
def generate_modular_model_card_content(blocks) -> dict[str, Any]:
"""
Generate model card content for a modular pipeline.

View File

@@ -21,10 +21,22 @@ except OptionalDependencyNotAvailable:
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
_import_structure["modular_blocks_qwenimage"] = ["QwenImageAutoBlocks"]
_import_structure["modular_blocks_qwenimage_edit"] = ["QwenImageEditAutoBlocks"]
_import_structure["modular_blocks_qwenimage_edit_plus"] = ["QwenImageEditPlusAutoBlocks"]
_import_structure["modular_blocks_qwenimage_layered"] = ["QwenImageLayeredAutoBlocks"]
_import_structure["modular_blocks_qwenimage"] = [
"AUTO_BLOCKS",
"QwenImageAutoBlocks",
]
_import_structure["modular_blocks_qwenimage_edit"] = [
"EDIT_AUTO_BLOCKS",
"QwenImageEditAutoBlocks",
]
_import_structure["modular_blocks_qwenimage_edit_plus"] = [
"EDIT_PLUS_AUTO_BLOCKS",
"QwenImageEditPlusAutoBlocks",
]
_import_structure["modular_blocks_qwenimage_layered"] = [
"LAYERED_AUTO_BLOCKS",
"QwenImageLayeredAutoBlocks",
]
_import_structure["modular_pipeline"] = [
"QwenImageEditModularPipeline",
"QwenImageEditPlusModularPipeline",
@@ -39,10 +51,22 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
from .modular_blocks_qwenimage import QwenImageAutoBlocks
from .modular_blocks_qwenimage_edit import QwenImageEditAutoBlocks
from .modular_blocks_qwenimage_edit_plus import QwenImageEditPlusAutoBlocks
from .modular_blocks_qwenimage_layered import QwenImageLayeredAutoBlocks
from .modular_blocks_qwenimage import (
AUTO_BLOCKS,
QwenImageAutoBlocks,
)
from .modular_blocks_qwenimage_edit import (
EDIT_AUTO_BLOCKS,
QwenImageEditAutoBlocks,
)
from .modular_blocks_qwenimage_edit_plus import (
EDIT_PLUS_AUTO_BLOCKS,
QwenImageEditPlusAutoBlocks,
)
from .modular_blocks_qwenimage_layered import (
LAYERED_AUTO_BLOCKS,
QwenImageLayeredAutoBlocks,
)
from .modular_pipeline import (
QwenImageEditModularPipeline,
QwenImageEditPlusModularPipeline,

View File

@@ -1113,14 +1113,10 @@ AUTO_BLOCKS = InsertableDict(
class QwenImageAutoBlocks(SequentialPipelineBlocks):
"""
Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.
Supported workflows:
- `text2image`: requires `prompt`
- `image2image`: requires `prompt`, `image`
- `inpainting`: requires `prompt`, `mask_image`, `image`
- `controlnet_text2image`: requires `prompt`, `control_image`
- `controlnet_image2image`: requires `prompt`, `image`, `control_image`
- `controlnet_inpainting`: requires `prompt`, `mask_image`, `image`, `control_image`
- for image-to-image generation, you need to provide `image`
- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.
- to run the controlnet workflow, you need to provide `control_image`
- for text-to-image generation, all you need to provide is `prompt`
Components:
text_encoder (`Qwen2_5_VLForConditionalGeneration`): The text encoder to use tokenizer (`Qwen2Tokenizer`):
@@ -1201,23 +1197,15 @@ class QwenImageAutoBlocks(SequentialPipelineBlocks):
block_classes = AUTO_BLOCKS.values()
block_names = AUTO_BLOCKS.keys()
# Workflow map defines the trigger conditions for each workflow.
# How to define:
# - Only include required inputs and trigger inputs (inputs that determine which blocks run)
# - currently, only supports `True` means the workflow triggers when the input is not None
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"prompt": True, "image": True},
"inpainting": {"prompt": True, "mask_image": True, "image": True},
"controlnet_text2image": {"prompt": True, "control_image": True},
"controlnet_image2image": {"prompt": True, "image": True, "control_image": True},
"controlnet_inpainting": {"prompt": True, "mask_image": True, "image": True, "control_image": True},
}
@property
def description(self):
return "Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage."
return (
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.\n"
+ "- for image-to-image generation, you need to provide `image`\n"
+ "- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.\n"
+ "- to run the controlnet workflow, you need to provide `control_image`\n"
+ "- for text-to-image generation, all you need to provide is `prompt`"
)
@property
def outputs(self):

View File

@@ -773,10 +773,6 @@ class QwenImageEditAutoBlocks(SequentialPipelineBlocks):
model_name = "qwenimage-edit"
block_classes = EDIT_AUTO_BLOCKS.values()
block_names = EDIT_AUTO_BLOCKS.keys()
_workflow_map = {
"edit": {"prompt": True, "image": True},
"edit_inpainting": {"prompt": True, "mask_image": True, "image": True},
}
@property
def description(self):

View File

@@ -21,7 +21,21 @@ except OptionalDependencyNotAvailable:
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
_import_structure["modular_blocks"] = ["StableDiffusionXLAutoBlocks"]
_import_structure["encoders"] = ["StableDiffusionXLTextEncoderStep"]
_import_structure["modular_blocks"] = [
"ALL_BLOCKS",
"AUTO_BLOCKS",
"CONTROLNET_BLOCKS",
"IMAGE2IMAGE_BLOCKS",
"INPAINT_BLOCKS",
"IP_ADAPTER_BLOCKS",
"TEXT2IMAGE_BLOCKS",
"StableDiffusionXLAutoBlocks",
"StableDiffusionXLAutoControlnetStep",
"StableDiffusionXLAutoDecodeStep",
"StableDiffusionXLAutoIPAdapterStep",
"StableDiffusionXLAutoVaeEncoderStep",
]
_import_structure["modular_pipeline"] = ["StableDiffusionXLModularPipeline"]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
@@ -31,7 +45,23 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
from .modular_blocks import StableDiffusionXLAutoBlocks
from .encoders import (
StableDiffusionXLTextEncoderStep,
)
from .modular_blocks import (
ALL_BLOCKS,
AUTO_BLOCKS,
CONTROLNET_BLOCKS,
IMAGE2IMAGE_BLOCKS,
INPAINT_BLOCKS,
IP_ADAPTER_BLOCKS,
TEXT2IMAGE_BLOCKS,
StableDiffusionXLAutoBlocks,
StableDiffusionXLAutoControlnetStep,
StableDiffusionXLAutoDecodeStep,
StableDiffusionXLAutoIPAdapterStep,
StableDiffusionXLAutoVaeEncoderStep,
)
from .modular_pipeline import StableDiffusionXLModularPipeline
else:
import sys

View File

@@ -277,7 +277,6 @@ class StableDiffusionXLCoreDenoiseStep(SequentialPipelineBlocks):
# ip-adapter, controlnet, text2img, img2img, inpainting
# auto_docstring
class StableDiffusionXLAutoBlocks(SequentialPipelineBlocks):
block_classes = [
StableDiffusionXLTextEncoderStep,
@@ -294,29 +293,103 @@ class StableDiffusionXLAutoBlocks(SequentialPipelineBlocks):
"decode",
]
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"image": True, "prompt": True},
"inpainting": {"mask_image": True, "image": True, "prompt": True},
"controlnet_text2image": {"control_image": True, "prompt": True},
"controlnet_image2image": {"control_image": True, "image": True, "prompt": True},
"controlnet_inpainting": {"control_image": True, "mask_image": True, "image": True, "prompt": True},
"controlnet_union_text2image": {"control_image": True, "control_mode": True, "prompt": True},
"controlnet_union_image2image": {"control_image": True, "control_mode": True, "image": True, "prompt": True},
"controlnet_union_inpainting": {"control_image": True, "control_mode": True, "mask_image": True, "image": True, "prompt": True},
"ip_adapter_text2image": {"ip_adapter_image": True, "prompt": True},
"ip_adapter_image2image": {"ip_adapter_image": True, "image": True, "prompt": True},
"ip_adapter_inpainting": {"ip_adapter_image": True, "mask_image": True, "image": True, "prompt": True},
"ip_adapter_controlnet_text2image": {"ip_adapter_image": True, "control_image": True, "prompt": True},
"ip_adapter_controlnet_image2image": {"ip_adapter_image": True, "control_image": True, "image": True, "prompt": True},
"ip_adapter_controlnet_inpainting": {"ip_adapter_image": True, "control_image": True, "mask_image": True, "image": True, "prompt": True},
"ip_adapter_controlnet_union_text2image": {"ip_adapter_image": True, "control_image": True, "control_mode": True, "prompt": True},
"ip_adapter_controlnet_union_image2image": {"ip_adapter_image": True, "control_image": True, "control_mode": True, "image": True, "prompt": True},
"ip_adapter_controlnet_union_inpainting": {"ip_adapter_image": True, "control_image": True, "control_mode": True, "mask_image": True, "image": True, "prompt": True},
}
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using Stable Diffusion XL.\n"
+ "- for image-to-image generation, you need to provide either `image` or `image_latents`\n"
+ "- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop` \n"
+ "- to run the controlnet workflow, you need to provide `control_image`\n"
+ "- to run the controlnet_union workflow, you need to provide `control_image` and `control_mode`\n"
+ "- to run the ip_adapter workflow, you need to provide `ip_adapter_image`\n"
+ "- for text-to-image generation, all you need to provide is `prompt`"
)
# controlnet (input + denoise step)
class StableDiffusionXLAutoControlnetStep(SequentialPipelineBlocks):
block_classes = [
StableDiffusionXLAutoControlNetInputStep,
StableDiffusionXLAutoControlNetDenoiseStep,
]
block_names = ["controlnet_input", "controlnet_denoise"]
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using Stable Diffusion XL."
"Controlnet auto step that prepare the controlnet input and denoise the latents. "
+ "It works for both controlnet and controlnet_union and supports text2img, img2img and inpainting tasks."
+ " (it should be replace at 'denoise' step)"
)
TEXT2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("input", StableDiffusionXLInputStep),
("set_timesteps", StableDiffusionXLSetTimestepsStep),
("prepare_latents", StableDiffusionXLPrepareLatentsStep),
("prepare_add_cond", StableDiffusionXLPrepareAdditionalConditioningStep),
("denoise", StableDiffusionXLDenoiseStep),
("decode", StableDiffusionXLDecodeStep),
]
)
IMAGE2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("vae_encoder", StableDiffusionXLVaeEncoderStep),
("input", StableDiffusionXLInputStep),
("set_timesteps", StableDiffusionXLImg2ImgSetTimestepsStep),
("prepare_latents", StableDiffusionXLImg2ImgPrepareLatentsStep),
("prepare_add_cond", StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep),
("denoise", StableDiffusionXLDenoiseStep),
("decode", StableDiffusionXLDecodeStep),
]
)
INPAINT_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("vae_encoder", StableDiffusionXLInpaintVaeEncoderStep),
("input", StableDiffusionXLInputStep),
("set_timesteps", StableDiffusionXLImg2ImgSetTimestepsStep),
("prepare_latents", StableDiffusionXLInpaintPrepareLatentsStep),
("prepare_add_cond", StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep),
("denoise", StableDiffusionXLInpaintDenoiseStep),
("decode", StableDiffusionXLInpaintDecodeStep),
]
)
CONTROLNET_BLOCKS = InsertableDict(
[
("denoise", StableDiffusionXLAutoControlnetStep),
]
)
IP_ADAPTER_BLOCKS = InsertableDict(
[
("ip_adapter", StableDiffusionXLAutoIPAdapterStep),
]
)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", StableDiffusionXLTextEncoderStep),
("ip_adapter", StableDiffusionXLAutoIPAdapterStep),
("vae_encoder", StableDiffusionXLAutoVaeEncoderStep),
("denoise", StableDiffusionXLCoreDenoiseStep),
("decode", StableDiffusionXLAutoDecodeStep),
]
)
ALL_BLOCKS = {
"text2img": TEXT2IMAGE_BLOCKS,
"img2img": IMAGE2IMAGE_BLOCKS,
"inpaint": INPAINT_BLOCKS,
"controlnet": CONTROLNET_BLOCKS,
"ip_adapter": IP_ADAPTER_BLOCKS,
"auto": AUTO_BLOCKS,
}

View File

@@ -37,7 +37,6 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# inputs(text) -> set_timesteps -> prepare_latents -> denoise
# auto_docstring
class WanCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "wan"
block_classes = [
@@ -65,7 +64,6 @@ class WanCoreDenoiseStep(SequentialPipelineBlocks):
# ====================
# auto_docstring
class WanBlocks(SequentialPipelineBlocks):
model_name = "wan"
block_classes = [

View File

@@ -38,7 +38,6 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# inputs(text) -> set_timesteps -> prepare_latents -> denoise
# auto_docstring
class Wan22CoreDenoiseStep(SequentialPipelineBlocks):
model_name = "wan"
block_classes = [
@@ -66,8 +65,6 @@ class Wan22CoreDenoiseStep(SequentialPipelineBlocks):
# ====================
# auto_docstring
class Wan22Blocks(SequentialPipelineBlocks):
model_name = "wan"
block_classes = [

View File

@@ -40,7 +40,6 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# ====================
# auto_docstring
class WanImage2VideoVaeEncoderStep(SequentialPipelineBlocks):
model_name = "wan-i2v"
block_classes = [WanImageResizeStep, WanVaeEncoderStep, WanPrepareFirstFrameLatentsStep]
@@ -57,7 +56,6 @@ class WanImage2VideoVaeEncoderStep(SequentialPipelineBlocks):
# inputs (text + image_condition_latents) -> set_timesteps -> prepare_latents -> denoise (latents)
# auto_docstring
class Wan22Image2VideoCoreDenoiseStep(SequentialPipelineBlocks):
model_name = "wan-i2v"
block_classes = [
@@ -93,7 +91,6 @@ class Wan22Image2VideoCoreDenoiseStep(SequentialPipelineBlocks):
# ====================
# auto_docstring
class Wan22Image2VideoBlocks(SequentialPipelineBlocks):
model_name = "wan-i2v"
block_classes = [

View File

@@ -177,7 +177,6 @@ class WanImage2VideoCoreDenoiseStep(SequentialPipelineBlocks):
# wan2.1 Image2Video Auto Blocks
# auto_docstring
class WanImage2VideoAutoBlocks(SequentialPipelineBlocks):
model_name = "wan-i2v"
block_classes = [
@@ -195,13 +194,10 @@ class WanImage2VideoAutoBlocks(SequentialPipelineBlocks):
"decode",
]
_workflow_map = {
"image2video": {"image": True, "prompt": True},
"flf2v": {"last_image": True, "image": True, "prompt": True},
}
@property
def description(self):
return (
"Auto Modular pipeline for image-to-video using Wan."
"Auto Modular pipeline for image-to-video using Wan.\n"
+ "- for I2V workflow, all you need to provide is `image`"
+ "- for FLF2V workflow, all you need to provide is `last_image` and `image`"
)

View File

@@ -21,7 +21,12 @@ except OptionalDependencyNotAvailable:
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
_import_structure["modular_blocks_z_image"] = ["ZImageAutoBlocks"]
_import_structure["decoders"] = ["ZImageVaeDecoderStep"]
_import_structure["encoders"] = ["ZImageTextEncoderStep", "ZImageVaeImageEncoderStep"]
_import_structure["modular_blocks"] = [
"ALL_BLOCKS",
"ZImageAutoBlocks",
]
_import_structure["modular_pipeline"] = ["ZImageModularPipeline"]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
@@ -31,7 +36,12 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
from .modular_blocks_z_image import ZImageAutoBlocks
from .decoders import ZImageVaeDecoderStep
from .encoders import ZImageTextEncoderStep
from .modular_blocks import (
ALL_BLOCKS,
ZImageAutoBlocks,
)
from .modular_pipeline import ZImageModularPipeline
else:
import sys

View File

@@ -36,12 +36,8 @@ from .encoders import (
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# ====================
# 1. DENOISE
# ====================
# text2image: inputs(text) -> set_timesteps -> prepare_latents -> denoise
# auto_docstring
# z-image
# text2image
class ZImageCoreDenoiseStep(SequentialPipelineBlocks):
block_classes = [
ZImageTextInputStep,
@@ -63,8 +59,8 @@ class ZImageCoreDenoiseStep(SequentialPipelineBlocks):
)
# image2image: inputs(text + image_latents) -> prepare_latents -> set_timesteps -> set_timesteps_with_strength -> prepare_latents_with_image -> denoise
# auto_docstring
# z-image: image2image
## denoise
class ZImageImage2ImageCoreDenoiseStep(SequentialPipelineBlocks):
block_classes = [
ZImageTextInputStep,
@@ -100,7 +96,7 @@ class ZImageImage2ImageCoreDenoiseStep(SequentialPipelineBlocks):
)
# auto_docstring
## auto blocks
class ZImageAutoDenoiseStep(AutoPipelineBlocks):
block_classes = [
ZImageImage2ImageCoreDenoiseStep,
@@ -121,7 +117,6 @@ class ZImageAutoDenoiseStep(AutoPipelineBlocks):
)
# auto_docstring
class ZImageAutoVaeImageEncoderStep(AutoPipelineBlocks):
block_classes = [ZImageVaeImageEncoderStep]
block_names = ["vae_encoder"]
@@ -135,7 +130,6 @@ class ZImageAutoVaeImageEncoderStep(AutoPipelineBlocks):
+" - if `image` is not provided, step will be skipped."
# auto_docstring
class ZImageAutoBlocks(SequentialPipelineBlocks):
block_classes = [
ZImageTextEncoderStep,
@@ -144,12 +138,54 @@ class ZImageAutoBlocks(SequentialPipelineBlocks):
ZImageVaeDecoderStep,
]
block_names = ["text_encoder", "vae_encoder", "denoise", "decode"]
_workflow_map = {
"text2image": {"prompt": True},
"image2image": {"image": True, "prompt": True},
}
@property
def description(self) -> str:
return "Auto Modular pipeline for text-to-image and image-to-image using ZImage."
return "Auto Modular pipeline for text-to-image and image-to-image using ZImage.\n"
+" - for text-to-image generation, all you need to provide is `prompt`\n"
+" - for image-to-image generation, you need to provide `image`\n"
+" - if `image` is not provided, step will be skipped."
# presets
TEXT2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", ZImageTextEncoderStep),
("input", ZImageTextInputStep),
("prepare_latents", ZImagePrepareLatentsStep),
("set_timesteps", ZImageSetTimestepsStep),
("denoise", ZImageDenoiseStep),
("decode", ZImageVaeDecoderStep),
]
)
IMAGE2IMAGE_BLOCKS = InsertableDict(
[
("text_encoder", ZImageTextEncoderStep),
("vae_encoder", ZImageVaeImageEncoderStep),
("input", ZImageTextInputStep),
("additional_inputs", ZImageAdditionalInputsStep(image_latent_inputs=["image_latents"])),
("prepare_latents", ZImagePrepareLatentsStep),
("set_timesteps", ZImageSetTimestepsStep),
("set_timesteps_with_strength", ZImageSetTimestepsWithStrengthStep),
("prepare_latents_with_image", ZImagePrepareLatentswithImageStep),
("denoise", ZImageDenoiseStep),
("decode", ZImageVaeDecoderStep),
]
)
AUTO_BLOCKS = InsertableDict(
[
("text_encoder", ZImageTextEncoderStep),
("vae_encoder", ZImageAutoVaeImageEncoderStep),
("denoise", ZImageAutoDenoiseStep),
("decode", ZImageVaeDecoderStep),
]
)
ALL_BLOCKS = {
"text2image": TEXT2IMAGE_BLOCKS,
"image2image": IMAGE2IMAGE_BLOCKS,
"auto": AUTO_BLOCKS,
}

View File

@@ -22,7 +22,7 @@ from transformers import Gemma3ForConditionalGeneration, GemmaTokenizer, GemmaTo
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
from ...image_processor import PipelineImageInput
from ...loaders import FromSingleFileMixin, LTXVideoLoraLoaderMixin
from ...loaders import FromSingleFileMixin, LTX2LoraLoaderMixin
from ...models.autoencoders import AutoencoderKLLTX2Audio, AutoencoderKLLTX2Video
from ...models.transformers import LTX2VideoTransformer3DModel
from ...schedulers import FlowMatchEulerDiscreteScheduler
@@ -48,7 +48,7 @@ EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import LTX2ImageToVideoPipeline
>>> from diffusers import LTX2Pipeline
>>> from diffusers.pipelines.ltx2.export_utils import encode_video
>>> from diffusers.utils import load_image
@@ -62,7 +62,7 @@ EXAMPLE_DOC_STRING = """
>>> negative_prompt = "worst quality, inconsistent motion, blurry, jittery, distorted"
>>> frame_rate = 24.0
>>> video, audio = pipe(
>>> video = pipe(
... image=image,
... prompt=prompt,
... negative_prompt=negative_prompt,
@@ -202,7 +202,7 @@ def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
return noise_cfg
class LTX2ImageToVideoPipeline(DiffusionPipeline, FromSingleFileMixin, LTXVideoLoraLoaderMixin):
class LTX2ImageToVideoPipeline(DiffusionPipeline, FromSingleFileMixin, LTX2LoraLoaderMixin):
r"""
Pipeline for image-to-video generation.

View File

@@ -1905,21 +1905,6 @@ def attention_backend(*args, **kwargs):
requires_backends(attention_backend, ["torch"])
class AutoPipelineBlocks(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
class ComponentsManager(metaclass=DummyObject):
_backends = ["torch"]
@@ -1950,66 +1935,6 @@ class ComponentSpec(metaclass=DummyObject):
requires_backends(cls, ["torch"])
class ConditionalPipelineBlocks(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
class ConfigSpec(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
class InputParam(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
class LoopSequentialPipelineBlocks(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
class ModularPipeline(metaclass=DummyObject):
_backends = ["torch"]
@@ -2040,36 +1965,6 @@ class ModularPipelineBlocks(metaclass=DummyObject):
requires_backends(cls, ["torch"])
class OutputParam(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
class SequentialPipelineBlocks(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
def get_constant_schedule(*args, **kwargs):
requires_backends(get_constant_schedule, ["torch"])

View File

@@ -33,20 +33,6 @@ from ...testing_utils import floats_tensor, torch_device
from ..test_modular_pipelines_common import ModularPipelineTesterMixin
FLUX_TEXT2IMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "FluxTextEncoderStep"),
("input", "FluxTextInputStep"),
("prepare_latents", "FluxPrepareLatentsStep"),
("set_timesteps", "FluxSetTimestepsStep"),
("prepare_rope_inputs", "FluxRoPEInputsStep"),
("denoise", "FluxDenoiseStep"),
("decode", "FluxDecodeStep"),
]
}
class TestFluxModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = FluxModularPipeline
pipeline_blocks_class = FluxAutoBlocks
@@ -54,7 +40,6 @@ class TestFluxModularPipelineFast(ModularPipelineTesterMixin):
params = frozenset(["prompt", "height", "width", "guidance_scale"])
batch_params = frozenset(["prompt"])
expected_workflow_blocks = FLUX_TEXT2IMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
@@ -74,22 +59,6 @@ class TestFluxModularPipelineFast(ModularPipelineTesterMixin):
super().test_float16_inference(9e-2)
FLUX_IMAGE2IMAGE_WORKFLOWS = {
"image2image": [
("text_encoder", "FluxTextEncoderStep"),
("vae_encoder.preprocess", "FluxProcessImagesInputStep"),
("vae_encoder.encode", "FluxVaeEncoderStep"),
("input", "FluxTextInputStep"),
("additional_inputs", "FluxAdditionalInputsStep"),
("prepare_latents", "FluxPrepareLatentsStep"),
("set_timesteps", "FluxImg2ImgSetTimestepsStep"),
("prepare_img2img_latents", "FluxImg2ImgPrepareLatentsStep"),
("prepare_rope_inputs", "FluxRoPEInputsStep"),
("denoise", "FluxDenoiseStep"),
("decode", "FluxDecodeStep"),
]
}
class TestFluxImg2ImgModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = FluxModularPipeline
pipeline_blocks_class = FluxAutoBlocks
@@ -97,7 +66,6 @@ class TestFluxImg2ImgModularPipelineFast(ModularPipelineTesterMixin):
params = frozenset(["prompt", "height", "width", "guidance_scale", "image"])
batch_params = frozenset(["prompt", "image"])
expected_workflow_blocks = FLUX_IMAGE2IMAGE_WORKFLOWS
def get_pipeline(self, components_manager=None, torch_dtype=torch.float32):
pipeline = super().get_pipeline(components_manager, torch_dtype)
@@ -156,30 +124,6 @@ class TestFluxImg2ImgModularPipelineFast(ModularPipelineTesterMixin):
def test_float16_inference(self):
super().test_float16_inference(8e-2)
FLUX_KONTEXT_WORKFLOWS = {
"text2image": [
("text_encoder", "FluxTextEncoderStep"),
("denoise.input", "FluxTextInputStep"),
("denoise.before_denoise.prepare_latents", "FluxPrepareLatentsStep"),
("denoise.before_denoise.set_timesteps", "FluxSetTimestepsStep"),
("denoise.before_denoise.prepare_rope_inputs", "FluxRoPEInputsStep"),
("denoise.denoise", "FluxKontextDenoiseStep"),
("decode", "FluxDecodeStep"),
],
"image_conditioned": [
("text_encoder", "FluxTextEncoderStep"),
("vae_encoder.preprocess", "FluxKontextProcessImagesInputStep"),
("vae_encoder.encode", "FluxVaeEncoderStep"),
("denoise.input.set_resolution", "FluxKontextSetResolutionStep"),
("denoise.input.text_inputs", "FluxTextInputStep"),
("denoise.input.additional_inputs", "FluxKontextAdditionalInputsStep"),
("denoise.before_denoise.prepare_latents", "FluxPrepareLatentsStep"),
("denoise.before_denoise.set_timesteps", "FluxSetTimestepsStep"),
("denoise.before_denoise.prepare_rope_inputs", "FluxKontextRoPEInputsStep"),
("denoise.denoise", "FluxKontextDenoiseStep"),
("decode", "FluxDecodeStep"),
]
}
class TestFluxKontextModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = FluxKontextModularPipeline
@@ -188,7 +132,6 @@ class TestFluxKontextModularPipelineFast(ModularPipelineTesterMixin):
params = frozenset(["prompt", "height", "width", "guidance_scale", "image"])
batch_params = frozenset(["prompt", "image"])
expected_workflow_blocks = FLUX_KONTEXT_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)

View File

@@ -27,19 +27,6 @@ from diffusers.modular_pipelines import (
from ...testing_utils import floats_tensor, torch_device
from ..test_modular_pipelines_common import ModularPipelineTesterMixin
FLUX2_TEXT2IMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "Flux2TextEncoderStep"),
("text_input", "Flux2TextInputStep"),
("prepare_latents", "Flux2PrepareLatentsStep"),
("set_timesteps", "Flux2SetTimestepsStep"),
("prepare_guidance", "Flux2PrepareGuidanceStep"),
("prepare_rope_inputs", "Flux2RoPEInputsStep"),
("denoise", "Flux2DenoiseStep"),
("after_denoise", "Flux2UnpackLatentsStep"),
("decode", "Flux2DecodeStep"),
],
}
class TestFlux2ModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = Flux2ModularPipeline
@@ -48,7 +35,6 @@ class TestFlux2ModularPipelineFast(ModularPipelineTesterMixin):
params = frozenset(["prompt", "height", "width", "guidance_scale"])
batch_params = frozenset(["prompt"])
expected_workflow_blocks = FLUX2_TEXT2IMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
@@ -69,22 +55,6 @@ class TestFlux2ModularPipelineFast(ModularPipelineTesterMixin):
def test_float16_inference(self):
super().test_float16_inference(9e-2)
FLUX2_IMAGE_CONDITIONED_WORKFLOWS = {
"image_conditioned": [
("text_encoder", "Flux2TextEncoderStep"),
("preprocess_images", "Flux2ProcessImagesInputStep"),
("vae_encoder", "Flux2VaeEncoderStep"),
("text_input", "Flux2TextInputStep"),
("prepare_image_latents", "Flux2PrepareImageLatentsStep"),
("prepare_latents", "Flux2PrepareLatentsStep"),
("set_timesteps", "Flux2SetTimestepsStep"),
("prepare_guidance", "Flux2PrepareGuidanceStep"),
("prepare_rope_inputs", "Flux2RoPEInputsStep"),
("denoise", "Flux2DenoiseStep"),
("after_denoise", "Flux2UnpackLatentsStep"),
("decode", "Flux2DecodeStep"),
],
}
class TestFlux2ImageConditionedModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = Flux2ModularPipeline
@@ -93,7 +63,6 @@ class TestFlux2ImageConditionedModularPipelineFast(ModularPipelineTesterMixin):
params = frozenset(["prompt", "height", "width", "guidance_scale", "image"])
batch_params = frozenset(["prompt", "image"])
expected_workflow_blocks = FLUX2_IMAGE_CONDITIONED_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)

View File

@@ -30,102 +30,6 @@ from ...testing_utils import torch_device
from ..test_modular_pipelines_common import ModularGuiderTesterMixin, ModularPipelineTesterMixin
QWEN_IMAGE_TEXT2IMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("denoise.input", "QwenImageTextInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.denoise", "QwenImageDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"image2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_img2img_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.denoise", "QwenImageDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"inpainting": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageInpaintProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_inpaint_latents.add_noise_to_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_inpaint_latents.create_mask_latents", "QwenImageCreateMaskLatentsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.denoise", "QwenImageInpaintDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageInpaintProcessImagesOutputStep"),
],
"controlnet_text2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("controlnet_vae_encoder", "QwenImageControlNetVaeEncoderStep"),
("denoise.input", "QwenImageTextInputsStep"),
("denoise.controlnet_input", "QwenImageControlNetInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.controlnet_before_denoise", "QwenImageControlNetBeforeDenoiserStep"),
("denoise.controlnet_denoise", "QwenImageControlNetDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"controlnet_image2image": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("controlnet_vae_encoder", "QwenImageControlNetVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.controlnet_input", "QwenImageControlNetInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_img2img_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.controlnet_before_denoise", "QwenImageControlNetBeforeDenoiserStep"),
("denoise.controlnet_denoise", "QwenImageControlNetDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"controlnet_inpainting": [
("text_encoder", "QwenImageTextEncoderStep"),
("vae_encoder.preprocess", "QwenImageInpaintProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("controlnet_vae_encoder", "QwenImageControlNetVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.controlnet_input", "QwenImageControlNetInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_inpaint_latents.add_noise_to_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_inpaint_latents.create_mask_latents", "QwenImageCreateMaskLatentsStep"),
("denoise.prepare_rope_inputs", "QwenImageRoPEInputsStep"),
("denoise.controlnet_before_denoise", "QwenImageControlNetBeforeDenoiserStep"),
("denoise.controlnet_denoise", "QwenImageInpaintControlNetDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageInpaintProcessImagesOutputStep"),
],
}
class TestQwenImageModularPipelineFast(ModularPipelineTesterMixin, ModularGuiderTesterMixin):
pipeline_class = QwenImageModularPipeline
pipeline_blocks_class = QwenImageAutoBlocks
@@ -133,7 +37,6 @@ class TestQwenImageModularPipelineFast(ModularPipelineTesterMixin, ModularGuider
params = frozenset(["prompt", "height", "width", "negative_prompt", "attention_kwargs", "image", "mask_image"])
batch_params = frozenset(["prompt", "negative_prompt", "image", "mask_image"])
expected_workflow_blocks = QWEN_IMAGE_TEXT2IMAGE_WORKFLOWS
def get_dummy_inputs(self):
generator = self.get_generator()
@@ -152,42 +55,6 @@ class TestQwenImageModularPipelineFast(ModularPipelineTesterMixin, ModularGuider
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=5e-4)
QWEN_IMAGE_EDIT_WORKFLOWS = {
"edit": [
("text_encoder.resize", "QwenImageEditResizeStep"),
("text_encoder.encode", "QwenImageEditTextEncoderStep"),
("vae_encoder.resize", "QwenImageEditResizeStep"),
("vae_encoder.preprocess", "QwenImageEditProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsStep"),
("denoise.prepare_rope_inputs", "QwenImageEditRoPEInputsStep"),
("denoise.denoise", "QwenImageEditDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageProcessImagesOutputStep"),
],
"edit_inpainting": [
("text_encoder.resize", "QwenImageEditResizeStep"),
("text_encoder.encode", "QwenImageEditTextEncoderStep"),
("vae_encoder.resize", "QwenImageEditResizeStep"),
("vae_encoder.preprocess", "QwenImageEditInpaintProcessImagesInputStep"),
("vae_encoder.encode", "QwenImageVaeEncoderStep"),
("denoise.input.text_inputs", "QwenImageTextInputsStep"),
("denoise.input.additional_inputs", "QwenImageAdditionalInputsStep"),
("denoise.prepare_latents", "QwenImagePrepareLatentsStep"),
("denoise.set_timesteps", "QwenImageSetTimestepsWithStrengthStep"),
("denoise.prepare_inpaint_latents.add_noise_to_latents", "QwenImagePrepareLatentsWithStrengthStep"),
("denoise.prepare_inpaint_latents.create_mask_latents", "QwenImageCreateMaskLatentsStep"),
("denoise.prepare_rope_inputs", "QwenImageEditRoPEInputsStep"),
("denoise.denoise", "QwenImageEditInpaintDenoiseStep"),
("denoise.after_denoise", "QwenImageAfterDenoiseStep"),
("decode.decode", "QwenImageDecoderStep"),
("decode.postprocess", "QwenImageInpaintProcessImagesOutputStep"),
],
}
class TestQwenImageEditModularPipelineFast(ModularPipelineTesterMixin, ModularGuiderTesterMixin):
pipeline_class = QwenImageEditModularPipeline
@@ -196,7 +63,6 @@ class TestQwenImageEditModularPipelineFast(ModularPipelineTesterMixin, ModularGu
params = frozenset(["prompt", "height", "width", "negative_prompt", "attention_kwargs", "image", "mask_image"])
batch_params = frozenset(["prompt", "negative_prompt", "image", "mask_image"])
expected_workflow_blocks = QWEN_IMAGE_EDIT_WORKFLOWS
def get_dummy_inputs(self):
generator = self.get_generator()

View File

@@ -267,60 +267,6 @@ class SDXLModularControlNetTesterMixin:
assert max_diff > 1e-2, "Output with CFG must be different from normal inference"
TEXT2IMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_union_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetUnionInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_controlnet_text2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
}
class TestSDXLModularPipelineFast(
SDXLModularTesterMixin,
SDXLModularIPAdapterTesterMixin,
@@ -345,9 +291,6 @@ class TestSDXLModularPipelineFast(
batch_params = frozenset(["prompt", "negative_prompt"])
expected_image_output_shape = (1, 3, 64, 64)
expected_workflow_blocks = TEXT2IMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
inputs = {
@@ -370,63 +313,6 @@ class TestSDXLModularPipelineFast(
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=3e-3)
IMAGE2IMAGE_WORKFLOWS = {
"image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"controlnet_union_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetUnionInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
"ip_adapter_controlnet_image2image": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLImg2ImgPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLControlNetDenoiseStep"),
("decode", "StableDiffusionXLDecodeStep"),
],
}
class TestSDXLImg2ImgModularPipelineFast(
SDXLModularTesterMixin,
@@ -452,7 +338,6 @@ class TestSDXLImg2ImgModularPipelineFast(
)
batch_params = frozenset(["prompt", "negative_prompt", "image"])
expected_image_output_shape = (1, 3, 64, 64)
expected_workflow_blocks = IMAGE2IMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)
@@ -481,63 +366,6 @@ class TestSDXLImg2ImgModularPipelineFast(
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=3e-3)
INPAINTING_WORKFLOWS = {
"inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLInpaintDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"controlnet_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLInpaintControlNetDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"controlnet_union_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetUnionInputStep"),
("denoise", "StableDiffusionXLInpaintControlNetDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"ip_adapter_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("denoise", "StableDiffusionXLInpaintDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
"ip_adapter_controlnet_inpainting": [
("text_encoder", "StableDiffusionXLTextEncoderStep"),
("ip_adapter", "StableDiffusionXLIPAdapterStep"),
("vae_encoder", "StableDiffusionXLInpaintVaeEncoderStep"),
("input", "StableDiffusionXLInputStep"),
("set_timesteps", "StableDiffusionXLImg2ImgSetTimestepsStep"),
("prepare_latents", "StableDiffusionXLInpaintPrepareLatentsStep"),
("prepare_add_cond", "StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep"),
("controlnet_input", "StableDiffusionXLControlNetInputStep"),
("denoise", "StableDiffusionXLInpaintControlNetDenoiseStep"),
("decode", "StableDiffusionXLInpaintDecodeStep"),
],
}
class SDXLInpaintingModularPipelineFastTests(
SDXLModularTesterMixin,
@@ -564,7 +392,6 @@ class SDXLInpaintingModularPipelineFastTests(
)
batch_params = frozenset(["prompt", "negative_prompt", "image", "mask_image"])
expected_image_output_shape = (1, 3, 64, 64)
expected_workflow_blocks = INPAINTING_WORKFLOWS
def get_dummy_inputs(self, device, seed=0):
generator = self.get_generator(seed)

View File

@@ -100,14 +100,6 @@ class ModularPipelineTesterMixin:
"See existing pipeline tests for reference."
)
@property
def expected_workflow_blocks(self) -> dict:
raise NotImplementedError(
"You need to set the attribute `expected_workflow_blocks` in the child test class. "
"`expected_workflow_blocks` is a dictionary that maps workflow names to list of block names. "
"See existing pipeline tests for reference."
)
def setup_method(self):
# clean up the VRAM before each test
torch.compiler.reset()
@@ -349,33 +341,6 @@ class ModularPipelineTesterMixin:
assert torch.abs(image_slices[0] - image_slices[1]).max() < 1e-3
def test_workflow_map(self):
blocks = self.pipeline_blocks_class()
if blocks._workflow_map is None:
pytest.skip("Skipping test as _workflow_map is not set")
assert hasattr(self, "expected_workflow_blocks") and self.expected_workflow_blocks, (
"expected_workflow_blocks must be defined in the test class"
)
for workflow_name, expected_blocks in self.expected_workflow_blocks.items():
workflow_blocks = blocks.get_workflow(workflow_name)
actual_blocks = list(workflow_blocks.sub_blocks.items())
# Check that the number of blocks matches
assert len(actual_blocks) == len(expected_blocks), (
f"Workflow '{workflow_name}' has {len(actual_blocks)} blocks, "
f"expected {len(expected_blocks)}"
)
# Check that each block name and type matches
for i, ((actual_name, actual_block), (expected_name, expected_class_name)) in enumerate(
zip(actual_blocks, expected_blocks)
):
assert actual_block.__class__.__name__ == expected_class_name, (
f"Workflow '{workflow_name}': block '{actual_name}' has type "
f"{actual_block.__class__.__name__}, expected {expected_class_name}"
)
class ModularGuiderTesterMixin:
def test_guider_cfg(self, expected_max_diff=1e-2):

View File

@@ -19,29 +19,6 @@ from diffusers.modular_pipelines import ZImageAutoBlocks, ZImageModularPipeline
from ..test_modular_pipelines_common import ModularPipelineTesterMixin
ZIMAGE_WORKFLOWS = {
"text2image": [
("text_encoder", "ZImageTextEncoderStep"),
("input", "ZImageTextInputStep"),
("prepare_latents", "ZImagePrepareLatentsStep"),
("set_timesteps", "ZImageSetTimestepsStep"),
("denoise", "ZImageDenoiseStep"),
("decode", "ZImageVaeDecoderStep"),
],
"image2image": [
("text_encoder", "ZImageTextEncoderStep"),
("vae_encoder", "ZImageVaeImageEncoderStep"),
("input", "ZImageTextInputStep"),
("additional_inputs", "ZImageAdditionalInputsStep"),
("prepare_latents", "ZImagePrepareLatentsStep"),
("set_timesteps", "ZImageSetTimestepsStep"),
("set_timesteps_with_strength", "ZImageSetTimestepsWithStrengthStep"),
("prepare_latents_with_image", "ZImagePrepareLatentswithImageStep"),
("denoise", "ZImageDenoiseStep"),
("decode", "ZImageVaeDecoderStep"),
],
}
class TestZImageModularPipelineFast(ModularPipelineTesterMixin):
pipeline_class = ZImageModularPipeline
pipeline_blocks_class = ZImageAutoBlocks
@@ -49,7 +26,6 @@ class TestZImageModularPipelineFast(ModularPipelineTesterMixin):
params = frozenset(["prompt", "height", "width"])
batch_params = frozenset(["prompt"])
expected_workflow_blocks = ZIMAGE_WORKFLOWS
def get_dummy_inputs(self, seed=0):
generator = self.get_generator(seed)