Compare commits

...

24 Commits

Author SHA1 Message Date
linoytsaban
3ac5a286a4 vace 2025-05-27 20:51:53 +03:00
linoytsaban
b7b1abdf13 vace 2025-05-22 11:06:49 +03:00
linoytsaban
7530559a04 vace 2025-05-21 19:49:10 +03:00
linoytsaban
b8a371ef69 vace 2025-05-21 19:47:02 +03:00
linoytsaban
f81d4c6a5d revert 2025-05-21 19:27:43 +03:00
linoytsaban
f91dae9be3 revert 2025-05-21 19:13:21 +03:00
linoytsaban
c665a0412c revert 2025-05-21 19:11:44 +03:00
linoytsaban
78a8e1e841 vace padding 2025-05-21 18:53:19 +03:00
linoytsaban
5901c2508b vace padding 2025-05-21 18:50:31 +03:00
linoytsaban
46755bdf1a vace padding 2025-05-21 18:24:05 +03:00
linoytsaban
c0d22f8757 vace padding 2025-05-21 18:18:45 +03:00
linoytsaban
a66ad3fe88 vace padding 2025-05-21 18:16:35 +03:00
linoytsaban
0279a067f1 vace padding 2025-05-21 18:07:17 +03:00
linoytsaban
b3394d4275 revert 2025-05-21 17:55:32 +03:00
linoytsaban
26dcfd00a4 test 2025-05-21 17:30:32 +03:00
Linoy Tsaban
4a4b05861e Merge branch 'main' into integrations/wan-vace 2025-05-21 15:45:03 +03:00
Aryan
23f6bc1201 fix bug with mask 2025-05-21 02:19:30 +02:00
Aryan
694dcf2cfd refactor 2025-05-21 00:52:15 +02:00
Aryan
4b14ddd80b add pipeline test 2025-05-20 21:35:30 +02:00
Aryan
ea301df80b refactor 2025-05-20 21:35:23 +02:00
Aryan
834bfc6448 add conversion script 2025-05-19 21:56:11 +02:00
Aryan
32ab1c969c fix no split modules 2025-05-19 21:53:18 +02:00
Aryan
f9865ff1ff make fix-copies 2025-05-19 21:51:16 +02:00
Aryan
d15d090d55 initial support 2025-05-19 21:50:57 +02:00
13 changed files with 1736 additions and 16 deletions

View File

@@ -1,6 +1,6 @@
import argparse
import pathlib
from typing import Any, Dict
from typing import Any, Dict, Tuple
import torch
from accelerate import init_empty_weights
@@ -14,6 +14,8 @@ from diffusers import (
WanImageToVideoPipeline,
WanPipeline,
WanTransformer3DModel,
WanVACEPipeline,
WanVACETransformer3DModel,
)
@@ -59,7 +61,52 @@ TRANSFORMER_KEYS_RENAME_DICT = {
"attn2.norm_k_img": "attn2.norm_added_k",
}
VACE_TRANSFORMER_KEYS_RENAME_DICT = {
"time_embedding.0": "condition_embedder.time_embedder.linear_1",
"time_embedding.2": "condition_embedder.time_embedder.linear_2",
"text_embedding.0": "condition_embedder.text_embedder.linear_1",
"text_embedding.2": "condition_embedder.text_embedder.linear_2",
"time_projection.1": "condition_embedder.time_proj",
"head.modulation": "scale_shift_table",
"head.head": "proj_out",
"modulation": "scale_shift_table",
"ffn.0": "ffn.net.0.proj",
"ffn.2": "ffn.net.2",
# Hack to swap the layer names
# The original model calls the norms in following order: norm1, norm3, norm2
# We convert it to: norm1, norm2, norm3
"norm2": "norm__placeholder",
"norm3": "norm2",
"norm__placeholder": "norm3",
# # For the I2V model
# "img_emb.proj.0": "condition_embedder.image_embedder.norm1",
# "img_emb.proj.1": "condition_embedder.image_embedder.ff.net.0.proj",
# "img_emb.proj.3": "condition_embedder.image_embedder.ff.net.2",
# "img_emb.proj.4": "condition_embedder.image_embedder.norm2",
# # for the FLF2V model
# "img_emb.emb_pos": "condition_embedder.image_embedder.pos_embed",
# Add attention component mappings
"self_attn.q": "attn1.to_q",
"self_attn.k": "attn1.to_k",
"self_attn.v": "attn1.to_v",
"self_attn.o": "attn1.to_out.0",
"self_attn.norm_q": "attn1.norm_q",
"self_attn.norm_k": "attn1.norm_k",
"cross_attn.q": "attn2.to_q",
"cross_attn.k": "attn2.to_k",
"cross_attn.v": "attn2.to_v",
"cross_attn.o": "attn2.to_out.0",
"cross_attn.norm_q": "attn2.norm_q",
"cross_attn.norm_k": "attn2.norm_k",
"attn2.to_k_img": "attn2.add_k_proj",
"attn2.to_v_img": "attn2.add_v_proj",
"attn2.norm_k_img": "attn2.norm_added_k",
"before_proj": "proj_in",
"after_proj": "proj_out",
}
TRANSFORMER_SPECIAL_KEYS_REMAP = {}
VACE_TRANSFORMER_SPECIAL_KEYS_REMAP = {}
def update_state_dict_(state_dict: Dict[str, Any], old_key: str, new_key: str) -> Dict[str, Any]:
@@ -74,7 +121,7 @@ def load_sharded_safetensors(dir: pathlib.Path):
return state_dict
def get_transformer_config(model_type: str) -> Dict[str, Any]:
def get_transformer_config(model_type: str) -> Tuple[Dict[str, Any], ...]:
if model_type == "Wan-T2V-1.3B":
config = {
"model_id": "StevenZhang/Wan2.1-T2V-1.3B-Diff",
@@ -94,6 +141,8 @@ def get_transformer_config(model_type: str) -> Dict[str, Any]:
"text_dim": 4096,
},
}
RENAME_DICT = TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = TRANSFORMER_SPECIAL_KEYS_REMAP
elif model_type == "Wan-T2V-14B":
config = {
"model_id": "StevenZhang/Wan2.1-T2V-14B-Diff",
@@ -113,6 +162,8 @@ def get_transformer_config(model_type: str) -> Dict[str, Any]:
"text_dim": 4096,
},
}
RENAME_DICT = TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = TRANSFORMER_SPECIAL_KEYS_REMAP
elif model_type == "Wan-I2V-14B-480p":
config = {
"model_id": "StevenZhang/Wan2.1-I2V-14B-480P-Diff",
@@ -133,6 +184,8 @@ def get_transformer_config(model_type: str) -> Dict[str, Any]:
"text_dim": 4096,
},
}
RENAME_DICT = TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = TRANSFORMER_SPECIAL_KEYS_REMAP
elif model_type == "Wan-I2V-14B-720p":
config = {
"model_id": "StevenZhang/Wan2.1-I2V-14B-720P-Diff",
@@ -153,6 +206,8 @@ def get_transformer_config(model_type: str) -> Dict[str, Any]:
"text_dim": 4096,
},
}
RENAME_DICT = TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = TRANSFORMER_SPECIAL_KEYS_REMAP
elif model_type == "Wan-FLF2V-14B-720P":
config = {
"model_id": "ypyp/Wan2.1-FLF2V-14B-720P", # This is just a placeholder
@@ -175,11 +230,60 @@ def get_transformer_config(model_type: str) -> Dict[str, Any]:
"pos_embed_seq_len": 257 * 2,
},
}
return config
RENAME_DICT = TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = TRANSFORMER_SPECIAL_KEYS_REMAP
elif model_type == "Wan-VACE-1.3B":
config = {
"model_id": "Wan-AI/Wan2.1-VACE-1.3B",
"diffusers_config": {
"added_kv_proj_dim": None,
"attention_head_dim": 128,
"cross_attn_norm": True,
"eps": 1e-06,
"ffn_dim": 8960,
"freq_dim": 256,
"in_channels": 16,
"num_attention_heads": 12,
"num_layers": 30,
"out_channels": 16,
"patch_size": [1, 2, 2],
"qk_norm": "rms_norm_across_heads",
"text_dim": 4096,
"vace_layers": [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28],
"vace_in_channels": 96,
},
}
RENAME_DICT = VACE_TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = VACE_TRANSFORMER_SPECIAL_KEYS_REMAP
elif model_type == "Wan-VACE-14B":
config = {
"model_id": "Wan-AI/Wan2.1-VACE-14B",
"diffusers_config": {
"added_kv_proj_dim": None,
"attention_head_dim": 128,
"cross_attn_norm": True,
"eps": 1e-06,
"ffn_dim": 13824,
"freq_dim": 256,
"in_channels": 16,
"num_attention_heads": 40,
"num_layers": 40,
"out_channels": 16,
"patch_size": [1, 2, 2],
"qk_norm": "rms_norm_across_heads",
"text_dim": 4096,
"vace_layers": [0, 5, 10, 15, 20, 25, 30, 35],
"vace_in_channels": 96,
},
}
RENAME_DICT = VACE_TRANSFORMER_KEYS_RENAME_DICT
SPECIAL_KEYS_REMAP = VACE_TRANSFORMER_SPECIAL_KEYS_REMAP
return config, RENAME_DICT, SPECIAL_KEYS_REMAP
def convert_transformer(model_type: str):
config = get_transformer_config(model_type)
config, RENAME_DICT, SPECIAL_KEYS_REMAP = get_transformer_config(model_type)
diffusers_config = config["diffusers_config"]
model_id = config["model_id"]
model_dir = pathlib.Path(snapshot_download(model_id, repo_type="model"))
@@ -187,16 +291,19 @@ def convert_transformer(model_type: str):
original_state_dict = load_sharded_safetensors(model_dir)
with init_empty_weights():
transformer = WanTransformer3DModel.from_config(diffusers_config)
if "VACE" not in model_type:
transformer = WanTransformer3DModel.from_config(diffusers_config)
else:
transformer = WanVACETransformer3DModel.from_config(diffusers_config)
for key in list(original_state_dict.keys()):
new_key = key[:]
for replace_key, rename_key in TRANSFORMER_KEYS_RENAME_DICT.items():
for replace_key, rename_key in RENAME_DICT.items():
new_key = new_key.replace(replace_key, rename_key)
update_state_dict_(original_state_dict, key, new_key)
for key in list(original_state_dict.keys()):
for special_key, handler_fn_inplace in TRANSFORMER_SPECIAL_KEYS_REMAP.items():
for special_key, handler_fn_inplace in SPECIAL_KEYS_REMAP.items():
if special_key not in key:
continue
handler_fn_inplace(key, original_state_dict)
@@ -412,7 +519,7 @@ def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--model_type", type=str, default=None)
parser.add_argument("--output_path", type=str, required=True)
parser.add_argument("--dtype", default="fp32")
parser.add_argument("--dtype", default="fp32", choices=["fp32", "fp16", "bf16", "none"])
return parser.parse_args()
@@ -426,18 +533,20 @@ DTYPE_MAPPING = {
if __name__ == "__main__":
args = get_args()
transformer = None
dtype = DTYPE_MAPPING[args.dtype]
transformer = convert_transformer(args.model_type).to(dtype=dtype)
transformer = convert_transformer(args.model_type)
vae = convert_vae()
text_encoder = UMT5EncoderModel.from_pretrained("google/umt5-xxl")
text_encoder = UMT5EncoderModel.from_pretrained("google/umt5-xxl", torch_dtype=torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained("google/umt5-xxl")
flow_shift = 16.0 if "FLF2V" in args.model_type else 3.0
scheduler = UniPCMultistepScheduler(
prediction_type="flow_prediction", use_flow_sigmas=True, num_train_timesteps=1000, flow_shift=flow_shift
)
# If user has specified "none", we keep the original dtypes of the state dict without any conversion
if args.dtype != "none":
dtype = DTYPE_MAPPING[args.dtype]
transformer.to(dtype)
if "I2V" in args.model_type or "FLF2V" in args.model_type:
image_encoder = CLIPVisionModelWithProjection.from_pretrained(
"laion/CLIP-ViT-H-14-laion2B-s32B-b79K", torch_dtype=torch.bfloat16
@@ -452,6 +561,14 @@ if __name__ == "__main__":
image_encoder=image_encoder,
image_processor=image_processor,
)
elif "VACE" in args.model_type:
pipe = WanVACEPipeline(
transformer=transformer,
text_encoder=text_encoder,
tokenizer=tokenizer,
vae=vae,
scheduler=scheduler,
)
else:
pipe = WanPipeline(
transformer=transformer,

View File

@@ -215,6 +215,7 @@ else:
"UVit2DModel",
"VQModel",
"WanTransformer3DModel",
"WanVACETransformer3DModel",
]
)
_import_structure["optimization"] = [
@@ -526,6 +527,7 @@ else:
"VQDiffusionPipeline",
"WanImageToVideoPipeline",
"WanPipeline",
"WanVACEPipeline",
"WanVideoToVideoPipeline",
"WuerstchenCombinedPipeline",
"WuerstchenDecoderPipeline",
@@ -819,6 +821,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
UVit2DModel,
VQModel,
WanTransformer3DModel,
WanVACETransformer3DModel,
)
from .optimization import (
get_constant_schedule,
@@ -1109,6 +1112,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
VQDiffusionPipeline,
WanImageToVideoPipeline,
WanPipeline,
WanVACEPipeline,
WanVideoToVideoPipeline,
WuerstchenCombinedPipeline,
WuerstchenDecoderPipeline,

View File

@@ -4852,6 +4852,65 @@ class WanLoraLoaderMixin(LoraBaseMixin):
return state_dict
@classmethod
def _maybe_expand_t2v_lora_for_vace(
cls,
transformer: torch.nn.Module,
state_dict,
):
target_device = transformer.device
if hasattr(transformer, 'vace_blocks'):
print("HERE 1")
inferred_rank_for_vace = None
lora_weights_dtype_for_vace = next(iter(transformer.parameters())).dtype # Fallback dtype
for k_lora_any, v_lora_tensor_any in state_dict.items():
if k_lora_any.endswith(".lora_A.weight"):
print("HERE 2")
inferred_rank_for_vace = v_lora_tensor_any.shape[0]
lora_weights_dtype_for_vace = v_lora_tensor_any.dtype
break # Found one, good enough for rank and dtype
if inferred_rank_for_vace is not None:
print("HERE 3")
# Determine if the LoRA format (as potentially modified by I2V expansion) includes bias
# This re-checks 'has_bias' based on the *current* state_dict.
current_lora_has_bias = any(".lora_B.bias" in k for k in state_dict.keys())
for i, vace_block_module_in_model in enumerate(transformer.vace_blocks):
# Specifically target proj_out as per the error message
if hasattr(vace_block_module_in_model, 'proj_out'):
print("HERE 4")
proj_out_linear_layer_in_model = vace_block_module_in_model.proj_out
vace_lora_A_key = f"vace_blocks.{i}.proj_out.lora_A.weight"
vace_lora_B_key = f"vace_blocks.{i}.proj_out.lora_B.weight"
if vace_lora_A_key not in state_dict:
state_dict[vace_lora_A_key] = torch.zeros(
(inferred_rank_for_vace, proj_out_linear_layer_in_model.in_features),
device=target_device, dtype=lora_weights_dtype_for_vace
)
if vace_lora_B_key not in state_dict:
state_dict[vace_lora_B_key] = torch.zeros(
(proj_out_linear_layer_in_model.out_features, inferred_rank_for_vace),
device=target_device, dtype=lora_weights_dtype_for_vace
)
# Use 'current_lora_has_bias' to decide on padding bias for VACE blocks
if current_lora_has_bias and proj_out_linear_layer_in_model.bias is not None:
print("HERE 5")
vace_lora_B_bias_key = f"vace_blocks.{i}.proj_out.lora_B.bias"
if vace_lora_B_bias_key not in state_dict:
state_dict[vace_lora_B_bias_key] = torch.zeros_like(
proj_out_linear_layer_in_model.bias, # Shape from model's bias
device=target_device # Dtype from model's bias implicitly by zeros_like
)
print("AFTER 2:", list(state_dict.keys()))
return state_dict
def load_lora_weights(
self,
pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]],
@@ -4900,10 +4959,15 @@ class WanLoraLoaderMixin(LoraBaseMixin):
transformer=getattr(self, self.transformer_name) if not hasattr(self, "transformer") else self.transformer,
state_dict=state_dict,
)
state_dict = self._maybe_expand_t2v_lora_for_vace(
transformer=getattr(self, self.transformer_name) if not hasattr(self, "transformer") else self.transformer,
state_dict=state_dict,
)
is_correct_format = all("lora" in key for key in state_dict.keys())
if not is_correct_format:
raise ValueError("Invalid LoRA checkpoint.")
print("WTF")
self.load_lora_into_transformer(
state_dict,
transformer=getattr(self, self.transformer_name) if not hasattr(self, "transformer") else self.transformer,
@@ -5713,4 +5777,4 @@ class LoraLoaderMixin(StableDiffusionLoraLoaderMixin):
def __init__(self, *args, **kwargs):
deprecation_message = "LoraLoaderMixin is deprecated and this will be removed in a future version. Please use `StableDiffusionLoraLoaderMixin`, instead."
deprecate("LoraLoaderMixin", "1.0.0", deprecation_message)
super().__init__(*args, **kwargs)
super().__init__(*args, **kwargs)

View File

@@ -58,6 +58,7 @@ _SET_ADAPTER_SCALE_FN_MAPPING = {
"CogView4Transformer2DModel": lambda model_cls, weights: weights,
"HiDreamImageTransformer2DModel": lambda model_cls, weights: weights,
"HunyuanVideoFramepackTransformer3DModel": lambda model_cls, weights: weights,
"WanVACETransformer3DModel": lambda model_cls, weights: weights,
}

View File

@@ -89,6 +89,7 @@ if is_torch_available():
_import_structure["transformers.transformer_sd3"] = ["SD3Transformer2DModel"]
_import_structure["transformers.transformer_temporal"] = ["TransformerTemporalModel"]
_import_structure["transformers.transformer_wan"] = ["WanTransformer3DModel"]
_import_structure["transformers.transformer_wan_vace"] = ["WanVACETransformer3DModel"]
_import_structure["unets.unet_1d"] = ["UNet1DModel"]
_import_structure["unets.unet_2d"] = ["UNet2DModel"]
_import_structure["unets.unet_2d_condition"] = ["UNet2DConditionModel"]
@@ -178,6 +179,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
Transformer2DModel,
TransformerTemporalModel,
WanTransformer3DModel,
WanVACETransformer3DModel,
)
from .unets import (
I2VGenXLUNet,

View File

@@ -32,3 +32,4 @@ if is_torch_available():
from .transformer_sd3 import SD3Transformer2DModel
from .transformer_temporal import TransformerTemporalModel
from .transformer_wan import WanTransformer3DModel
from .transformer_wan_vace import WanVACETransformer3DModel

View File

@@ -0,0 +1,393 @@
# Copyright 2025 The Wan Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from typing import Any, Dict, List, Optional, Tuple, Union
import torch
import torch.nn as nn
from ...configuration_utils import ConfigMixin, register_to_config
from ...loaders import FromOriginalModelMixin, PeftAdapterMixin
from ...utils import USE_PEFT_BACKEND, logging, scale_lora_layers, unscale_lora_layers
from ..attention import FeedForward
from ..attention_processor import Attention
from ..cache_utils import CacheMixin
from ..modeling_outputs import Transformer2DModelOutput
from ..modeling_utils import ModelMixin
from ..normalization import FP32LayerNorm
from .transformer_wan import WanAttnProcessor2_0, WanRotaryPosEmbed, WanTimeTextImageEmbedding, WanTransformerBlock
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
class WanVACETransformerBlock(nn.Module):
def __init__(
self,
dim: int,
ffn_dim: int,
num_heads: int,
qk_norm: str = "rms_norm_across_heads",
cross_attn_norm: bool = False,
eps: float = 1e-6,
added_kv_proj_dim: Optional[int] = None,
apply_input_projection: bool = False,
apply_output_projection: bool = False,
):
super().__init__()
# 1. Input projection
self.proj_in = None
if apply_input_projection:
self.proj_in = nn.Linear(dim, dim)
# 2. Self-attention
self.norm1 = FP32LayerNorm(dim, eps, elementwise_affine=False)
self.attn1 = Attention(
query_dim=dim,
heads=num_heads,
kv_heads=num_heads,
dim_head=dim // num_heads,
qk_norm=qk_norm,
eps=eps,
bias=True,
cross_attention_dim=None,
out_bias=True,
processor=WanAttnProcessor2_0(),
)
# 3. Cross-attention
self.attn2 = Attention(
query_dim=dim,
heads=num_heads,
kv_heads=num_heads,
dim_head=dim // num_heads,
qk_norm=qk_norm,
eps=eps,
bias=True,
cross_attention_dim=None,
out_bias=True,
added_kv_proj_dim=added_kv_proj_dim,
added_proj_bias=True,
processor=WanAttnProcessor2_0(),
)
self.norm2 = FP32LayerNorm(dim, eps, elementwise_affine=True) if cross_attn_norm else nn.Identity()
# 4. Feed-forward
self.ffn = FeedForward(dim, inner_dim=ffn_dim, activation_fn="gelu-approximate")
self.norm3 = FP32LayerNorm(dim, eps, elementwise_affine=False)
# 5. Output projection
self.proj_out = None
if apply_output_projection:
self.proj_out = nn.Linear(dim, dim)
self.scale_shift_table = nn.Parameter(torch.randn(1, 6, dim) / dim**0.5)
def forward(
self,
hidden_states: torch.Tensor,
encoder_hidden_states: torch.Tensor,
control_hidden_states: torch.Tensor,
temb: torch.Tensor,
rotary_emb: torch.Tensor,
) -> torch.Tensor:
if self.proj_in is not None:
control_hidden_states = self.proj_in(control_hidden_states)
control_hidden_states = control_hidden_states + hidden_states
shift_msa, scale_msa, gate_msa, c_shift_msa, c_scale_msa, c_gate_msa = (
self.scale_shift_table + temb.float()
).chunk(6, dim=1)
# 1. Self-attention
norm_hidden_states = (self.norm1(control_hidden_states.float()) * (1 + scale_msa) + shift_msa).type_as(
control_hidden_states
)
attn_output = self.attn1(hidden_states=norm_hidden_states, rotary_emb=rotary_emb)
control_hidden_states = (control_hidden_states.float() + attn_output * gate_msa).type_as(control_hidden_states)
# 2. Cross-attention
norm_hidden_states = self.norm2(control_hidden_states.float()).type_as(control_hidden_states)
attn_output = self.attn2(hidden_states=norm_hidden_states, encoder_hidden_states=encoder_hidden_states)
control_hidden_states = control_hidden_states + attn_output
# 3. Feed-forward
norm_hidden_states = (self.norm3(control_hidden_states.float()) * (1 + c_scale_msa) + c_shift_msa).type_as(
control_hidden_states
)
ff_output = self.ffn(norm_hidden_states)
control_hidden_states = (control_hidden_states.float() + ff_output.float() * c_gate_msa).type_as(
control_hidden_states
)
conditioning_states = None
if self.proj_out is not None:
conditioning_states = self.proj_out(control_hidden_states)
return conditioning_states, control_hidden_states
class WanVACETransformer3DModel(ModelMixin, ConfigMixin, PeftAdapterMixin, FromOriginalModelMixin, CacheMixin):
r"""
A Transformer model for video-like data used in the Wan model.
Args:
patch_size (`Tuple[int]`, defaults to `(1, 2, 2)`):
3D patch dimensions for video embedding (t_patch, h_patch, w_patch).
num_attention_heads (`int`, defaults to `40`):
Fixed length for text embeddings.
attention_head_dim (`int`, defaults to `128`):
The number of channels in each head.
in_channels (`int`, defaults to `16`):
The number of channels in the input.
out_channels (`int`, defaults to `16`):
The number of channels in the output.
text_dim (`int`, defaults to `512`):
Input dimension for text embeddings.
freq_dim (`int`, defaults to `256`):
Dimension for sinusoidal time embeddings.
ffn_dim (`int`, defaults to `13824`):
Intermediate dimension in feed-forward network.
num_layers (`int`, defaults to `40`):
The number of layers of transformer blocks to use.
window_size (`Tuple[int]`, defaults to `(-1, -1)`):
Window size for local attention (-1 indicates global attention).
cross_attn_norm (`bool`, defaults to `True`):
Enable cross-attention normalization.
qk_norm (`bool`, defaults to `True`):
Enable query/key normalization.
eps (`float`, defaults to `1e-6`):
Epsilon value for normalization layers.
add_img_emb (`bool`, defaults to `False`):
Whether to use img_emb.
added_kv_proj_dim (`int`, *optional*, defaults to `None`):
The number of channels to use for the added key and value projections. If `None`, no projection is used.
"""
_supports_gradient_checkpointing = True
_skip_layerwise_casting_patterns = ["patch_embedding", "vace_patch_embedding", "condition_embedder", "norm"]
_no_split_modules = ["WanTransformerBlock", "WanVACETransformerBlock"]
_keep_in_fp32_modules = ["time_embedder", "scale_shift_table", "norm1", "norm2", "norm3"]
_keys_to_ignore_on_load_unexpected = ["norm_added_q"]
@register_to_config
def __init__(
self,
patch_size: Tuple[int] = (1, 2, 2),
num_attention_heads: int = 40,
attention_head_dim: int = 128,
in_channels: int = 16,
out_channels: int = 16,
text_dim: int = 4096,
freq_dim: int = 256,
ffn_dim: int = 13824,
num_layers: int = 40,
cross_attn_norm: bool = True,
qk_norm: Optional[str] = "rms_norm_across_heads",
eps: float = 1e-6,
image_dim: Optional[int] = None,
added_kv_proj_dim: Optional[int] = None,
rope_max_seq_len: int = 1024,
pos_embed_seq_len: Optional[int] = None,
vace_layers: List[int] = [0, 5, 10, 15, 20, 25, 30, 35],
vace_in_channels: int = 96,
) -> None:
super().__init__()
inner_dim = num_attention_heads * attention_head_dim
out_channels = out_channels or in_channels
if max(vace_layers) >= num_layers:
raise ValueError(f"VACE layers {vace_layers} exceed the number of transformer layers {num_layers}.")
if 0 not in vace_layers:
raise ValueError("VACE layers must include layer 0.")
# 1. Patch & position embedding
self.rope = WanRotaryPosEmbed(attention_head_dim, patch_size, rope_max_seq_len)
self.patch_embedding = nn.Conv3d(in_channels, inner_dim, kernel_size=patch_size, stride=patch_size)
self.vace_patch_embedding = nn.Conv3d(vace_in_channels, inner_dim, kernel_size=patch_size, stride=patch_size)
# 2. Condition embeddings
# image_embedding_dim=1280 for I2V model
self.condition_embedder = WanTimeTextImageEmbedding(
dim=inner_dim,
time_freq_dim=freq_dim,
time_proj_dim=inner_dim * 6,
text_embed_dim=text_dim,
image_embed_dim=image_dim,
pos_embed_seq_len=pos_embed_seq_len,
)
# 3. Transformer blocks
self.blocks = nn.ModuleList(
[
WanTransformerBlock(
inner_dim, ffn_dim, num_attention_heads, qk_norm, cross_attn_norm, eps, added_kv_proj_dim
)
for _ in range(num_layers)
]
)
self.vace_blocks = nn.ModuleList(
[
WanVACETransformerBlock(
inner_dim,
ffn_dim,
num_attention_heads,
qk_norm,
cross_attn_norm,
eps,
added_kv_proj_dim,
apply_input_projection=i == 0, # Layer 0 always has input projection and is in vace_layers
apply_output_projection=True,
)
for i in range(len(vace_layers))
]
)
# 4. Output norm & projection
self.norm_out = FP32LayerNorm(inner_dim, eps, elementwise_affine=False)
self.proj_out = nn.Linear(inner_dim, out_channels * math.prod(patch_size))
self.scale_shift_table = nn.Parameter(torch.randn(1, 2, inner_dim) / inner_dim**0.5)
self.gradient_checkpointing = False
def forward(
self,
hidden_states: torch.Tensor,
timestep: torch.LongTensor,
encoder_hidden_states: torch.Tensor,
encoder_hidden_states_image: Optional[torch.Tensor] = None,
control_hidden_states: torch.Tensor = None,
control_hidden_states_scale: torch.Tensor = None,
return_dict: bool = True,
attention_kwargs: Optional[Dict[str, Any]] = None,
) -> Union[torch.Tensor, Dict[str, torch.Tensor]]:
if attention_kwargs is not None:
attention_kwargs = attention_kwargs.copy()
lora_scale = attention_kwargs.pop("scale", 1.0)
else:
lora_scale = 1.0
if USE_PEFT_BACKEND:
# weight the lora layers by setting `lora_scale` for each PEFT layer
scale_lora_layers(self, lora_scale)
else:
if attention_kwargs is not None and attention_kwargs.get("scale", None) is not None:
logger.warning(
"Passing `scale` via `attention_kwargs` when not using the PEFT backend is ineffective."
)
batch_size, num_channels, num_frames, height, width = hidden_states.shape
p_t, p_h, p_w = self.config.patch_size
post_patch_num_frames = num_frames // p_t
post_patch_height = height // p_h
post_patch_width = width // p_w
if control_hidden_states_scale is None:
control_hidden_states_scale = control_hidden_states.new_ones(len(self.config.vace_layers))
control_hidden_states_scale = torch.unbind(control_hidden_states_scale)
if len(control_hidden_states_scale) != len(self.config.vace_layers):
raise ValueError(
f"Length of `control_hidden_states_scale` {len(control_hidden_states_scale)} should be "
f"equal to {len(self.config.vace_layers)}."
)
# 1. Rotary position embedding
rotary_emb = self.rope(hidden_states)
# 2. Patch embedding
hidden_states = self.patch_embedding(hidden_states)
hidden_states = hidden_states.flatten(2).transpose(1, 2)
control_hidden_states = self.vace_patch_embedding(control_hidden_states)
control_hidden_states = control_hidden_states.flatten(2).transpose(1, 2)
control_hidden_states_padding = control_hidden_states.new_zeros(
batch_size, hidden_states.size(1) - control_hidden_states.size(1), control_hidden_states.size(2)
)
control_hidden_states = torch.cat([control_hidden_states, control_hidden_states_padding], dim=1)
# 3. Time embedding
temb, timestep_proj, encoder_hidden_states, encoder_hidden_states_image = self.condition_embedder(
timestep, encoder_hidden_states, encoder_hidden_states_image
)
timestep_proj = timestep_proj.unflatten(1, (6, -1))
# 4. Image embedding
if encoder_hidden_states_image is not None:
encoder_hidden_states = torch.concat([encoder_hidden_states_image, encoder_hidden_states], dim=1)
# 5. Transformer blocks
if torch.is_grad_enabled() and self.gradient_checkpointing:
# Prepare VACE hints
control_hidden_states_list = []
for i, block in enumerate(self.vace_blocks):
conditioning_states, control_hidden_states = self._gradient_checkpointing_func(
block, hidden_states, encoder_hidden_states, control_hidden_states, timestep_proj, rotary_emb
)
control_hidden_states_list.append((conditioning_states, control_hidden_states_scale[i]))
control_hidden_states_list = control_hidden_states_list[::-1]
for i, block in enumerate(self.blocks):
hidden_states = self._gradient_checkpointing_func(
block, hidden_states, encoder_hidden_states, timestep_proj, rotary_emb
)
if i in self.config.vace_layers:
control_hint, scale = control_hidden_states_list.pop()
hidden_states = hidden_states + control_hint * scale
else:
# Prepare VACE hints
control_hidden_states_list = []
for i, block in enumerate(self.vace_blocks):
conditioning_states, control_hidden_states = block(
hidden_states, encoder_hidden_states, control_hidden_states, timestep_proj, rotary_emb
)
control_hidden_states_list.append((conditioning_states, control_hidden_states_scale[i]))
control_hidden_states_list = control_hidden_states_list[::-1]
for i, block in enumerate(self.blocks):
hidden_states = block(hidden_states, encoder_hidden_states, timestep_proj, rotary_emb)
if i in self.config.vace_layers:
control_hint, scale = control_hidden_states_list.pop()
hidden_states = hidden_states + control_hint * scale
# 6. Output norm, projection & unpatchify
shift, scale = (self.scale_shift_table + temb.unsqueeze(1)).chunk(2, dim=1)
# Move the shift and scale tensors to the same device as hidden_states.
# When using multi-GPU inference via accelerate these will be on the
# first device rather than the last device, which hidden_states ends up
# on.
shift = shift.to(hidden_states.device)
scale = scale.to(hidden_states.device)
hidden_states = (self.norm_out(hidden_states.float()) * (1 + scale) + shift).type_as(hidden_states)
hidden_states = self.proj_out(hidden_states)
hidden_states = hidden_states.reshape(
batch_size, post_patch_num_frames, post_patch_height, post_patch_width, p_t, p_h, p_w, -1
)
hidden_states = hidden_states.permute(0, 7, 1, 4, 2, 5, 3, 6)
output = hidden_states.flatten(6, 7).flatten(4, 5).flatten(2, 3)
if USE_PEFT_BACKEND:
# remove `lora_scale` from each PEFT layer
unscale_lora_layers(self, lora_scale)
if not return_dict:
return (output,)
return Transformer2DModelOutput(sample=output)

View File

@@ -366,7 +366,7 @@ else:
"WuerstchenDecoderPipeline",
"WuerstchenPriorPipeline",
]
_import_structure["wan"] = ["WanPipeline", "WanImageToVideoPipeline", "WanVideoToVideoPipeline"]
_import_structure["wan"] = ["WanPipeline", "WanImageToVideoPipeline", "WanVideoToVideoPipeline", "WanVACEPipeline"]
try:
if not is_onnx_available():
raise OptionalDependencyNotAvailable()
@@ -734,7 +734,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
UniDiffuserTextDecoder,
)
from .visualcloze import VisualClozeGenerationPipeline, VisualClozePipeline
from .wan import WanImageToVideoPipeline, WanPipeline, WanVideoToVideoPipeline
from .wan import WanImageToVideoPipeline, WanPipeline, WanVACEPipeline, WanVideoToVideoPipeline
from .wuerstchen import (
WuerstchenCombinedPipeline,
WuerstchenDecoderPipeline,

View File

@@ -24,6 +24,7 @@ except OptionalDependencyNotAvailable:
else:
_import_structure["pipeline_wan"] = ["WanPipeline"]
_import_structure["pipeline_wan_i2v"] = ["WanImageToVideoPipeline"]
_import_structure["pipeline_wan_vace"] = ["WanVACEPipeline"]
_import_structure["pipeline_wan_video2video"] = ["WanVideoToVideoPipeline"]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
@@ -35,6 +36,7 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
else:
from .pipeline_wan import WanPipeline
from .pipeline_wan_i2v import WanImageToVideoPipeline
from .pipeline_wan_vace import WanVACEPipeline
from .pipeline_wan_video2video import WanVideoToVideoPipeline
else:

View File

@@ -0,0 +1,917 @@
# Copyright 2025 The Wan Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import html
from typing import Any, Callable, Dict, List, Optional, Union
import PIL.Image
import regex as re
import torch
from transformers import AutoTokenizer, UMT5EncoderModel
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
from ...image_processor import PipelineImageInput
from ...loaders import WanLoraLoaderMixin
from ...models import AutoencoderKLWan, WanVACETransformer3DModel
from ...schedulers import FlowMatchEulerDiscreteScheduler
from ...utils import is_ftfy_available, is_torch_xla_available, logging, replace_example_docstring
from ...utils.torch_utils import randn_tensor
from ...video_processor import VideoProcessor
from ..pipeline_utils import DiffusionPipeline
from .pipeline_output import WanPipelineOutput
if is_torch_xla_available():
import torch_xla.core.xla_model as xm
XLA_AVAILABLE = True
else:
XLA_AVAILABLE = False
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
if is_ftfy_available():
import ftfy
EXAMPLE_DOC_STRING = """
Examples:
```python
>>> import torch
>>> from diffusers.utils import export_to_video
>>> from diffusers import AutoencoderKLWan, WanPipeline
>>> from diffusers.schedulers.scheduling_unipc_multistep import UniPCMultistepScheduler
>>> # Available models: Wan-AI/Wan2.1-T2V-14B-Diffusers, Wan-AI/Wan2.1-T2V-1.3B-Diffusers
>>> model_id = "Wan-AI/Wan2.1-T2V-14B-Diffusers"
>>> vae = AutoencoderKLWan.from_pretrained(model_id, subfolder="vae", torch_dtype=torch.float32)
>>> pipe = WanPipeline.from_pretrained(model_id, vae=vae, torch_dtype=torch.bfloat16)
>>> flow_shift = 5.0 # 5.0 for 720P, 3.0 for 480P
>>> pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config, flow_shift=flow_shift)
>>> pipe.to("cuda")
>>> prompt = "A cat and a dog baking a cake together in a kitchen. The cat is carefully measuring flour, while the dog is stirring the batter with a wooden spoon. The kitchen is cozy, with sunlight streaming through the window."
>>> negative_prompt = "Bright tones, overexposed, static, blurred details, subtitles, style, works, paintings, images, static, overall gray, worst quality, low quality, JPEG compression residue, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn faces, deformed, disfigured, misshapen limbs, fused fingers, still picture, messy background, three legs, many people in the background, walking backwards"
>>> output = pipe(
... prompt=prompt,
... negative_prompt=negative_prompt,
... height=720,
... width=1280,
... num_frames=81,
... guidance_scale=5.0,
... ).frames[0]
>>> export_to_video(output, "output.mp4", fps=16)
```
"""
def basic_clean(text):
text = ftfy.fix_text(text)
text = html.unescape(html.unescape(text))
return text.strip()
def whitespace_clean(text):
text = re.sub(r"\s+", " ", text)
text = text.strip()
return text
def prompt_clean(text):
text = whitespace_clean(basic_clean(text))
return text
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents
def retrieve_latents(
encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
return encoder_output.latent_dist.sample(generator)
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
return encoder_output.latent_dist.mode()
elif hasattr(encoder_output, "latents"):
return encoder_output.latents
else:
raise AttributeError("Could not access latents of provided encoder_output")
class WanVACEPipeline(DiffusionPipeline, WanLoraLoaderMixin):
r"""
Pipeline for controllable generation using Wan.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
Args:
tokenizer ([`T5Tokenizer`]):
Tokenizer from [T5](https://huggingface.co/docs/transformers/en/model_doc/t5#transformers.T5Tokenizer),
specifically the [google/umt5-xxl](https://huggingface.co/google/umt5-xxl) variant.
text_encoder ([`T5EncoderModel`]):
[T5](https://huggingface.co/docs/transformers/en/model_doc/t5#transformers.T5EncoderModel), specifically
the [google/umt5-xxl](https://huggingface.co/google/umt5-xxl) variant.
transformer ([`WanTransformer3DModel`]):
Conditional Transformer to denoise the input latents.
scheduler ([`UniPCMultistepScheduler`]):
A scheduler to be used in combination with `transformer` to denoise the encoded image latents.
vae ([`AutoencoderKLWan`]):
Variational Auto-Encoder (VAE) Model to encode and decode videos to and from latent representations.
"""
model_cpu_offload_seq = "text_encoder->transformer->vae"
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
def __init__(
self,
tokenizer: AutoTokenizer,
text_encoder: UMT5EncoderModel,
transformer: WanVACETransformer3DModel,
vae: AutoencoderKLWan,
scheduler: FlowMatchEulerDiscreteScheduler,
):
super().__init__()
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
transformer=transformer,
scheduler=scheduler,
)
self.vae_scale_factor_temporal = 2 ** sum(self.vae.temperal_downsample) if getattr(self, "vae", None) else 4
self.vae_scale_factor_spatial = 2 ** len(self.vae.temperal_downsample) if getattr(self, "vae", None) else 8
self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)
# Copied from diffusers.pipelines.wan.pipeline_wan.WanPipeline._get_t5_prompt_embeds
def _get_t5_prompt_embeds(
self,
prompt: Union[str, List[str]] = None,
num_videos_per_prompt: int = 1,
max_sequence_length: int = 226,
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
):
device = device or self._execution_device
dtype = dtype or self.text_encoder.dtype
prompt = [prompt] if isinstance(prompt, str) else prompt
prompt = [prompt_clean(u) for u in prompt]
batch_size = len(prompt)
text_inputs = self.tokenizer(
prompt,
padding="max_length",
max_length=max_sequence_length,
truncation=True,
add_special_tokens=True,
return_attention_mask=True,
return_tensors="pt",
)
text_input_ids, mask = text_inputs.input_ids, text_inputs.attention_mask
seq_lens = mask.gt(0).sum(dim=1).long()
prompt_embeds = self.text_encoder(text_input_ids.to(device), mask.to(device)).last_hidden_state
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
prompt_embeds = [u[:v] for u, v in zip(prompt_embeds, seq_lens)]
prompt_embeds = torch.stack(
[torch.cat([u, u.new_zeros(max_sequence_length - u.size(0), u.size(1))]) for u in prompt_embeds], dim=0
)
# duplicate text embeddings for each generation per prompt, using mps friendly method
_, seq_len, _ = prompt_embeds.shape
prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)
prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1)
return prompt_embeds
# Copied from diffusers.pipelines.wan.pipeline_wan.WanPipeline.encode_prompt
def encode_prompt(
self,
prompt: Union[str, List[str]],
negative_prompt: Optional[Union[str, List[str]]] = None,
do_classifier_free_guidance: bool = True,
num_videos_per_prompt: int = 1,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
max_sequence_length: int = 226,
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
):
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
negative_prompt (`str` or `List[str]`, *optional*):
The prompt or prompts not to guide the image generation. If not defined, one has to pass
`negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is
less than `1`).
do_classifier_free_guidance (`bool`, *optional*, defaults to `True`):
Whether to use classifier free guidance or not.
num_videos_per_prompt (`int`, *optional*, defaults to 1):
Number of videos that should be generated per prompt. torch device to place the resulting embeddings on
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input
argument.
device: (`torch.device`, *optional*):
torch device
dtype: (`torch.dtype`, *optional*):
torch dtype
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
if prompt is not None:
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if prompt_embeds is None:
prompt_embeds = self._get_t5_prompt_embeds(
prompt=prompt,
num_videos_per_prompt=num_videos_per_prompt,
max_sequence_length=max_sequence_length,
device=device,
dtype=dtype,
)
if do_classifier_free_guidance and negative_prompt_embeds is None:
negative_prompt = negative_prompt or ""
negative_prompt = batch_size * [negative_prompt] if isinstance(negative_prompt, str) else negative_prompt
if prompt is not None and type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
elif batch_size != len(negative_prompt):
raise ValueError(
f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
negative_prompt_embeds = self._get_t5_prompt_embeds(
prompt=negative_prompt,
num_videos_per_prompt=num_videos_per_prompt,
max_sequence_length=max_sequence_length,
device=device,
dtype=dtype,
)
return prompt_embeds, negative_prompt_embeds
def check_inputs(
self,
prompt,
negative_prompt,
height,
width,
prompt_embeds=None,
negative_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
video=None,
mask=None,
reference_images=None,
):
base = self.vae_scale_factor_spatial * self.transformer.config.patch_size[1]
if height % base != 0 or width % base != 0:
raise ValueError(f"`height` and `width` have to be divisible by {base} but are {height} and {width}.")
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
elif negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`: {negative_prompt_embeds}. Please make sure to"
" only forward one of the two."
)
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
elif negative_prompt is not None and (
not isinstance(negative_prompt, str) and not isinstance(negative_prompt, list)
):
raise ValueError(f"`negative_prompt` has to be of type `str` or `list` but is {type(negative_prompt)}")
if video is not None:
if mask is not None:
if len(video) != len(mask):
raise ValueError(
f"Length of `video` {len(video)} and `mask` {len(mask)} do not match. Please make sure that"
" they have the same length."
)
if reference_images is not None:
is_pil_image = isinstance(reference_images, PIL.Image.Image)
is_list_of_pil_images = isinstance(reference_images, list) and all(
isinstance(ref_img, PIL.Image.Image) for ref_img in reference_images
)
is_list_of_list_of_pil_images = isinstance(reference_images, list) and all(
isinstance(ref_img, list) and all(isinstance(ref_img_, PIL.Image.Image) for ref_img_ in ref_img)
for ref_img in reference_images
)
if not (is_pil_image or is_list_of_pil_images or is_list_of_list_of_pil_images):
raise ValueError(
"`reference_images` has to be of type `PIL.Image.Image` or `list` of `PIL.Image.Image`, or "
"`list` of `list` of `PIL.Image.Image`, but is {type(reference_images)}"
)
if is_list_of_list_of_pil_images and len(reference_images) != 1:
raise ValueError(
"The pipeline only supports generating one video at a time at the moment. When passing a list "
"of list of reference images, where the outer list corresponds to the batch size and the inner "
"list corresponds to list of conditioning images per video, please make sure to only pass "
"one inner list of reference images (i.e., `[[<image1>, <image2>, ...]]`"
)
elif mask is not None:
raise ValueError("`mask` can only be passed if `video` is passed as well.")
def preprocess_conditions(
self,
video: Optional[List[PipelineImageInput]] = None,
mask: Optional[List[PipelineImageInput]] = None,
reference_images: Optional[List[PipelineImageInput]] = None,
batch_size: int = 1,
height: int = 480,
width: int = 832,
num_frames: int = 81,
dtype: Optional[torch.dtype] = None,
device: Optional[torch.device] = None,
):
if video is not None:
base = self.vae_scale_factor_spatial * self.transformer.config.patch_size[1]
video_height, video_width = self.video_processor.get_default_height_width(video[0])
if video_height * video_width > height * width:
scale = min(width / video_width, height / video_height)
video_height, video_width = int(video_height * scale), int(video_width * scale)
if video_height % base != 0 or video_width % base != 0:
logger.warning(
f"Video height and width should be divisible by {base}, but got {video_height} and {video_width}. "
)
video_height = (video_height // base) * base
video_width = (video_width // base) * base
assert video_height * video_width <= height * width
video = self.video_processor.preprocess_video(video, video_height, video_width)
image_size = (video_height, video_width) # Use the height/width of video (with possible rescaling)
else:
video = torch.zeros(batch_size, 3, num_frames, height, width, dtype=dtype, device=device)
image_size = (height, width) # Use the height/width provider by user
if mask is not None:
mask = self.video_processor.preprocess_video(mask, image_size[0], image_size[1])
mask = torch.clamp((mask + 1) / 2, min=0, max=1)
else:
mask = torch.ones_like(video)
video = video.to(dtype=dtype, device=device)
mask = mask.to(dtype=dtype, device=device)
# Make a list of list of images where the outer list corresponds to video batch size and the inner list
# corresponds to list of conditioning images per video
if reference_images is None or isinstance(reference_images, PIL.Image.Image):
reference_images = [[reference_images] for _ in range(video.shape[0])]
elif isinstance(reference_images, (list, tuple)) and isinstance(next(iter(reference_images)), PIL.Image.Image):
reference_images = [reference_images]
elif (
isinstance(reference_images, (list, tuple))
and isinstance(next(iter(reference_images)), list)
and isinstance(next(iter(reference_images[0])), PIL.Image.Image)
):
reference_images = reference_images
else:
raise ValueError(
"`reference_images` has to be of type `PIL.Image.Image` or `list` of `PIL.Image.Image`, or "
"`list` of `list` of `PIL.Image.Image`, but is {type(reference_images)}"
)
if video.shape[0] != len(reference_images):
raise ValueError(
f"Batch size of `video` {video.shape[0]} and length of `reference_images` {len(reference_images)} does not match."
)
ref_images_lengths = [len(reference_images_batch) for reference_images_batch in reference_images]
if any(l != ref_images_lengths[0] for l in ref_images_lengths):
raise ValueError(
f"All batches of `reference_images` should have the same length, but got {ref_images_lengths}. Support for this "
"may be added in the future."
)
reference_images_preprocessed = []
for i, reference_images_batch in enumerate(reference_images):
preprocessed_images = []
for j, image in enumerate(reference_images_batch):
if image is None:
continue
image = self.video_processor.preprocess(image, None, None)
img_height, img_width = image.shape[-2:]
scale = min(image_size[0] / img_height, image_size[1] / img_width)
new_height, new_width = int(img_height * scale), int(img_width * scale)
resized_image = torch.nn.functional.interpolate(
image, size=(new_height, new_width), mode="bilinear", align_corners=False
).squeeze(0) # [C, H, W]
top = (image_size[0] - new_height) // 2
left = (image_size[1] - new_width) // 2
canvas = torch.ones(3, *image_size, device=device, dtype=dtype)
canvas[:, top : top + new_height, left : left + new_width] = resized_image
preprocessed_images.append(canvas)
reference_images_preprocessed.append(preprocessed_images)
return video, mask, reference_images_preprocessed
def prepare_video_latents(
self,
video: torch.Tensor,
mask: torch.Tensor,
reference_images: Optional[List[List[torch.Tensor]]] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
device: Optional[torch.device] = None,
) -> torch.Tensor:
device = device or self._execution_device
if isinstance(generator, list):
# TODO: support this
raise ValueError("Passing a list of generators is not yet supported. This may be supported in the future.")
if reference_images is None:
# For each batch of video, we set no re
# ference image (as one or more can be passed by user)
reference_images = [[None] for _ in range(video.shape[0])]
else:
if video.shape[0] != len(reference_images):
raise ValueError(
f"Batch size of `video` {video.shape[0]} and length of `reference_images` {len(reference_images)} does not match."
)
if video.shape[0] != 1:
# TODO: support this
raise ValueError(
"Generating with more than one video is not yet supported. This may be supported in the future."
)
vae_dtype = self.vae.dtype
video = video.to(dtype=vae_dtype)
latents_mean = torch.tensor(self.vae.config.latents_mean, device=device, dtype=torch.float32).view(
1, self.vae.config.z_dim, 1, 1, 1
)
latents_std = 1.0 / torch.tensor(self.vae.config.latents_std, device=device, dtype=torch.float32).view(
1, self.vae.config.z_dim, 1, 1, 1
)
if mask is None:
latents = retrieve_latents(self.vae.encode(video), generator, sample_mode="argmax").unbind(0)
latents = ((latents.float() - latents_mean) * latents_std).to(vae_dtype)
else:
mask = mask.to(dtype=vae_dtype)
mask = torch.where(mask > 0.5, 1.0, 0.0)
inactive = video * (1 - mask)
reactive = video * mask
inactive = retrieve_latents(self.vae.encode(inactive), generator, sample_mode="argmax")
reactive = retrieve_latents(self.vae.encode(reactive), generator, sample_mode="argmax")
inactive = ((inactive.float() - latents_mean) * latents_std).to(vae_dtype)
reactive = ((reactive.float() - latents_mean) * latents_std).to(vae_dtype)
latents = torch.cat([inactive, reactive], dim=1)
latent_list = []
for latent, reference_images_batch in zip(latents, reference_images):
for reference_image in reference_images_batch:
assert reference_image.ndim == 3
reference_image = reference_image.to(dtype=vae_dtype)
reference_image = reference_image[None, :, None, :, :] # [1, C, 1, H, W]
reference_latent = retrieve_latents(self.vae.encode(reference_image), generator, sample_mode="argmax")
reference_latent = ((reference_latent.float() - latents_mean) * latents_std).to(vae_dtype)
reference_latent = torch.cat([reference_latent, torch.zeros_like(reference_latent)], dim=1)
latent = torch.cat([reference_latent.squeeze(0), latent], dim=1) # Concat across frame dimension
latent_list.append(latent)
return torch.stack(latent_list)
def prepare_masks(
self,
mask: torch.Tensor,
reference_images: Optional[List[torch.Tensor]] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
) -> torch.Tensor:
if isinstance(generator, list):
# TODO: support this
raise ValueError("Passing a list of generators is not yet supported. This may be supported in the future.")
if reference_images is None:
# For each batch of video, we set no reference image (as one or more can be passed by user)
reference_images = [[None] for _ in range(mask.shape[0])]
else:
if mask.shape[0] != len(reference_images):
raise ValueError(
f"Batch size of `mask` {mask.shape[0]} and length of `reference_images` {len(reference_images)} does not match."
)
if mask.shape[0] != 1:
# TODO: support this
raise ValueError(
"Generating with more than one video is not yet supported. This may be supported in the future."
)
transformer_patch_size = self.transformer.config.patch_size[1]
mask_list = []
for mask_, reference_images_batch in zip(mask, reference_images):
num_channels, num_frames, height, width = mask_.shape
new_num_frames = (num_frames + self.vae_scale_factor_temporal - 1) // self.vae_scale_factor_temporal
new_height = height // (self.vae_scale_factor_spatial * transformer_patch_size) * transformer_patch_size
new_width = width // (self.vae_scale_factor_spatial * transformer_patch_size) * transformer_patch_size
mask_ = mask_[0, :, :, :]
mask_ = mask_.view(
num_frames, new_height, self.vae_scale_factor_spatial, new_width, self.vae_scale_factor_spatial
)
mask_ = mask_.permute(2, 4, 0, 1, 3).flatten(0, 1) # [8x8, num_frames, new_height, new_width]
mask_ = torch.nn.functional.interpolate(
mask_.unsqueeze(0), size=(new_num_frames, new_height, new_width), mode="nearest-exact"
).squeeze(0)
num_ref_images = len(reference_images_batch)
if num_ref_images > 0:
mask_padding = torch.zeros_like(mask_[:, :num_ref_images, :, :])
mask_ = torch.cat([mask_, mask_padding], dim=1)
mask_list.append(mask_)
return torch.stack(mask_list)
def prepare_latents(
self,
batch_size: int,
num_channels_latents: int = 16,
height: int = 480,
width: int = 832,
num_frames: int = 81,
dtype: Optional[torch.dtype] = None,
device: Optional[torch.device] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
) -> torch.Tensor:
if latents is not None:
return latents.to(device=device, dtype=dtype)
num_latent_frames = (num_frames - 1) // self.vae_scale_factor_temporal + 1
shape = (
batch_size,
num_channels_latents,
num_latent_frames,
int(height) // self.vae_scale_factor_spatial,
int(width) // self.vae_scale_factor_spatial,
)
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
return latents
@property
def guidance_scale(self):
return self._guidance_scale
@property
def do_classifier_free_guidance(self):
return self._guidance_scale > 1.0
@property
def num_timesteps(self):
return self._num_timesteps
@property
def current_timestep(self):
return self._current_timestep
@property
def interrupt(self):
return self._interrupt
@property
def attention_kwargs(self):
return self._attention_kwargs
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
self,
prompt: Union[str, List[str]] = None,
negative_prompt: Union[str, List[str]] = None,
video: Optional[List[PipelineImageInput]] = None,
mask: Optional[List[PipelineImageInput]] = None,
reference_images: Optional[List[PipelineImageInput]] = None,
conditioning_scale: Union[float, List[float], torch.Tensor] = 1.0,
height: int = 480,
width: int = 832,
num_frames: int = 81,
num_inference_steps: int = 50,
guidance_scale: float = 5.0,
num_videos_per_prompt: Optional[int] = 1,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
output_type: Optional[str] = "np",
return_dict: bool = True,
attention_kwargs: Optional[Dict[str, Any]] = None,
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
max_sequence_length: int = 512,
):
r"""
The call function to the pipeline for generation.
Args:
prompt (`str` or `List[str]`, *optional*):
The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.
instead.
height (`int`, defaults to `480`):
The height in pixels of the generated image.
width (`int`, defaults to `832`):
The width in pixels of the generated image.
num_frames (`int`, defaults to `81`):
The number of frames in the generated video.
num_inference_steps (`int`, defaults to `50`):
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
guidance_scale (`float`, defaults to `5.0`):
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).
`guidance_scale` is defined as `w` of equation 2. of [Imagen
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,
usually at the expense of lower image quality.
num_videos_per_prompt (`int`, *optional*, defaults to 1):
The number of images to generate per prompt.
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make
generation deterministic.
latents (`torch.Tensor`, *optional*):
Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
tensor is generated by sampling using the supplied random `generator`.
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not
provided, text embeddings are generated from the `prompt` input argument.
output_type (`str`, *optional*, defaults to `"np"`):
The output format of the generated image. Choose between `PIL.Image` or `np.array`.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`WanPipelineOutput`] instead of a plain tuple.
attention_kwargs (`dict`, *optional*):
A kwargs dictionary that if specified is passed along to the `AttentionProcessor` as defined under
`self.processor` in
[diffusers.models.attention_processor](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
callback_on_step_end (`Callable`, `PipelineCallback`, `MultiPipelineCallbacks`, *optional*):
A function or a subclass of `PipelineCallback` or `MultiPipelineCallbacks` that is called at the end of
each denoising step during the inference. with the following arguments: `callback_on_step_end(self:
DiffusionPipeline, step: int, timestep: int, callback_kwargs: Dict)`. `callback_kwargs` will include a
list of all tensors as specified by `callback_on_step_end_tensor_inputs`.
callback_on_step_end_tensor_inputs (`List`, *optional*):
The list of tensor inputs for the `callback_on_step_end` function. The tensors specified in the list
will be passed as `callback_kwargs` argument. You will only be able to include variables listed in the
`._callback_tensor_inputs` attribute of your pipeline class.
autocast_dtype (`torch.dtype`, *optional*, defaults to `torch.bfloat16`):
The dtype to use for the torch.amp.autocast.
Examples:
Returns:
[`~WanPipelineOutput`] or `tuple`:
If `return_dict` is `True`, [`WanPipelineOutput`] is returned, otherwise a `tuple` is returned where
the first element is a list with the generated images and the second element is a list of `bool`s
indicating whether the corresponding generated image contains "not-safe-for-work" (nsfw) content.
"""
if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)):
callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs
# Simplification of implementation for now
if not isinstance(prompt, str):
raise ValueError("Passing a list of prompts is not yet supported. This may be supported in the future.")
if num_videos_per_prompt != 1:
raise ValueError(
"Generating multiple videos per prompt is not yet supported. This may be supported in the future."
)
# 1. Check inputs. Raise error if not correct
self.check_inputs(
prompt,
negative_prompt,
height,
width,
prompt_embeds,
negative_prompt_embeds,
callback_on_step_end_tensor_inputs,
video,
mask,
reference_images,
)
if num_frames % self.vae_scale_factor_temporal != 1:
logger.warning(
f"`num_frames - 1` has to be divisible by {self.vae_scale_factor_temporal}. Rounding to the nearest number."
)
num_frames = num_frames // self.vae_scale_factor_temporal * self.vae_scale_factor_temporal + 1
num_frames = max(num_frames, 1)
self._guidance_scale = guidance_scale
self._attention_kwargs = attention_kwargs
self._current_timestep = None
self._interrupt = False
device = self._execution_device
# 2. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
vae_dtype = self.vae.dtype
transformer_dtype = self.transformer.dtype
if isinstance(conditioning_scale, (int, float)):
conditioning_scale = [conditioning_scale] * len(self.transformer.config.vace_layers)
if isinstance(conditioning_scale, list):
if len(conditioning_scale) != len(self.transformer.config.vace_layers):
raise ValueError(
f"Length of `conditioning_scale` {len(conditioning_scale)} does not match number of layers {len(self.transformer.config.vace_layers)}."
)
conditioning_scale = torch.tensor(conditioning_scale)
if isinstance(conditioning_scale, torch.Tensor):
if conditioning_scale.size(0) != len(self.transformer.config.vace_layers):
raise ValueError(
f"Length of `conditioning_scale` {conditioning_scale.size(0)} does not match number of layers {len(self.transformer.config.vace_layers)}."
)
conditioning_scale = conditioning_scale.to(device=device, dtype=transformer_dtype)
# 3. Encode input prompt
prompt_embeds, negative_prompt_embeds = self.encode_prompt(
prompt=prompt,
negative_prompt=negative_prompt,
do_classifier_free_guidance=self.do_classifier_free_guidance,
num_videos_per_prompt=num_videos_per_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
max_sequence_length=max_sequence_length,
device=device,
)
prompt_embeds = prompt_embeds.to(transformer_dtype)
if negative_prompt_embeds is not None:
negative_prompt_embeds = negative_prompt_embeds.to(transformer_dtype)
# 4. Prepare timesteps
self.scheduler.set_timesteps(num_inference_steps, device=device)
timesteps = self.scheduler.timesteps
# 5. Prepare latent variables
video, mask, reference_images = self.preprocess_conditions(
video,
mask,
reference_images,
batch_size,
height,
width,
num_frames,
torch.float32,
device,
)
conditioning_latents = self.prepare_video_latents(video, mask, reference_images, generator, device)
mask = self.prepare_masks(mask, reference_images, generator)
conditioning_latents = torch.cat([conditioning_latents, mask], dim=1)
conditioning_latents = conditioning_latents.to(transformer_dtype)
num_channels_latents = self.transformer.config.in_channels
latents = self.prepare_latents(
batch_size * num_videos_per_prompt,
num_channels_latents,
height,
width,
num_frames,
torch.float32,
device,
generator,
latents,
)
if conditioning_latents.shape[2] != latents.shape[2]:
logger.warning(
"The number of frames in the conditioning latents does not match the number of frames to be generated. Generation quality may be affected."
)
# 6. Denoising loop
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
self._num_timesteps = len(timesteps)
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if self.interrupt:
continue
self._current_timestep = t
latent_model_input = latents.to(transformer_dtype)
timestep = t.expand(latents.shape[0])
noise_pred = self.transformer(
hidden_states=latent_model_input,
timestep=timestep,
encoder_hidden_states=prompt_embeds,
control_hidden_states=conditioning_latents,
control_hidden_states_scale=conditioning_scale,
attention_kwargs=attention_kwargs,
return_dict=False,
)[0]
if self.do_classifier_free_guidance:
noise_uncond = self.transformer(
hidden_states=latent_model_input,
timestep=timestep,
encoder_hidden_states=negative_prompt_embeds,
control_hidden_states=conditioning_latents,
control_hidden_states_scale=conditioning_scale,
attention_kwargs=attention_kwargs,
return_dict=False,
)[0]
noise_pred = noise_uncond + guidance_scale * (noise_pred - noise_uncond)
# compute the previous noisy sample x_t -> x_t-1
latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0]
if callback_on_step_end is not None:
callback_kwargs = {}
for k in callback_on_step_end_tensor_inputs:
callback_kwargs[k] = locals()[k]
callback_outputs = callback_on_step_end(self, i, t, callback_kwargs)
latents = callback_outputs.pop("latents", latents)
prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds)
negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds)
# call the callback, if provided
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
progress_bar.update()
if XLA_AVAILABLE:
xm.mark_step()
self._current_timestep = None
if not output_type == "latent":
latents = latents.to(vae_dtype)
latents_mean = (
torch.tensor(self.vae.config.latents_mean)
.view(1, self.vae.config.z_dim, 1, 1, 1)
.to(latents.device, latents.dtype)
)
latents_std = 1.0 / torch.tensor(self.vae.config.latents_std).view(1, self.vae.config.z_dim, 1, 1, 1).to(
latents.device, latents.dtype
)
latents = latents / latents_std + latents_mean
video = self.vae.decode(latents, return_dict=False)[0]
video = self.video_processor.postprocess_video(video, output_type=output_type)
else:
video = latents
# Offload all models
self.maybe_free_model_hooks()
if not return_dict:
return (video,)
return WanPipelineOutput(frames=video)

View File

@@ -1150,6 +1150,21 @@ class WanTransformer3DModel(metaclass=DummyObject):
requires_backends(cls, ["torch"])
class WanVACETransformer3DModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch"])
def get_constant_schedule(*args, **kwargs):
requires_backends(get_constant_schedule, ["torch"])

View File

@@ -2882,6 +2882,21 @@ class WanPipeline(metaclass=DummyObject):
requires_backends(cls, ["torch", "transformers"])
class WanVACEPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class WanVideoToVideoPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]

View File

@@ -0,0 +1,189 @@
# Copyright 2024 The HuggingFace Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from PIL import Image
from transformers import AutoTokenizer, T5EncoderModel
from diffusers import AutoencoderKLWan, FlowMatchEulerDiscreteScheduler, WanVACEPipeline, WanVACETransformer3DModel
from diffusers.utils.testing_utils import enable_full_determinism
from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_IMAGE_PARAMS, TEXT_TO_IMAGE_PARAMS
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
class WanVACEPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = WanVACEPipeline
params = TEXT_TO_IMAGE_PARAMS - {"cross_attention_kwargs"}
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
image_params = TEXT_TO_IMAGE_IMAGE_PARAMS
image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS
required_optional_params = frozenset(
[
"num_inference_steps",
"generator",
"latents",
"return_dict",
"callback_on_step_end",
"callback_on_step_end_tensor_inputs",
]
)
test_xformers_attention = False
supports_dduf = False
def get_dummy_components(self):
torch.manual_seed(0)
vae = AutoencoderKLWan(
base_dim=3,
z_dim=16,
dim_mult=[1, 1, 1, 1],
num_res_blocks=1,
temperal_downsample=[False, True, True],
)
torch.manual_seed(0)
scheduler = FlowMatchEulerDiscreteScheduler(shift=7.0)
text_encoder = T5EncoderModel.from_pretrained("hf-internal-testing/tiny-random-t5")
tokenizer = AutoTokenizer.from_pretrained("hf-internal-testing/tiny-random-t5")
torch.manual_seed(0)
transformer = WanVACETransformer3DModel(
patch_size=(1, 2, 2),
num_attention_heads=2,
attention_head_dim=12,
in_channels=16,
out_channels=16,
text_dim=32,
freq_dim=256,
ffn_dim=32,
num_layers=3,
cross_attn_norm=True,
qk_norm="rms_norm_across_heads",
rope_max_seq_len=32,
vace_layers=[0, 2],
vace_in_channels=96,
)
components = {
"transformer": transformer,
"vae": vae,
"scheduler": scheduler,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
num_frames = 17
height = 16
width = 16
video = [Image.new("RGB", (height, width))] * num_frames
mask = [Image.new("L", (height, width), 0)] * num_frames
inputs = {
"video": video,
"mask": mask,
"prompt": "dance monkey",
"negative_prompt": "negative", # TODO
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"height": 16,
"width": 16,
"num_frames": num_frames,
"max_sequence_length": 16,
"output_type": "pt",
}
return inputs
def test_inference(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
video = pipe(**inputs).frames
generated_video = video[0]
self.assertEqual(generated_video.shape, (17, 3, 16, 16))
expected_video = torch.randn(17, 3, 16, 16)
max_diff = np.abs(generated_video - expected_video).max()
self.assertLessEqual(max_diff, 1e10)
def test_inference_with_single_reference_image(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
inputs["reference_images"] = Image.new("RGB", (16, 16))
video = pipe(**inputs).frames
generated_video = video[0]
self.assertEqual(generated_video.shape, (17, 3, 16, 16))
expected_video = torch.randn(17, 3, 16, 16)
max_diff = np.abs(generated_video - expected_video).max()
self.assertLessEqual(max_diff, 1e10)
def test_inference_with_multiple_reference_image(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
inputs["reference_images"] = [[Image.new("RGB", (16, 16))] * 2]
video = pipe(**inputs).frames
generated_video = video[0]
self.assertEqual(generated_video.shape, (17, 3, 16, 16))
expected_video = torch.randn(17, 3, 16, 16)
max_diff = np.abs(generated_video - expected_video).max()
self.assertLessEqual(max_diff, 1e10)
@unittest.skip("Test not supported")
def test_attention_slicing_forward_pass(self):
pass
@unittest.skip("Errors out because passing multiple prompts at once is not yet supported by this pipeline.")
def test_encode_prompt_works_in_isolation(self):
pass
@unittest.skip("Batching is not yet supported with this pipeline")
def test_inference_batch_consistent(self):
pass
@unittest.skip("Batching is not yet supported with this pipeline")
def test_inference_batch_single_identical(self):
return super().test_inference_batch_single_identical()