Compare commits

...

1 Commits

Author SHA1 Message Date
Patrick von Platen
43806ac143 Revert "[Pipeline] Add TextToVideoZeroSDXLPipeline (#4695)"
This reverts commit d63a498c3b.
2023-12-18 12:06:11 +01:00
8 changed files with 3 additions and 1346 deletions

View File

@@ -92,19 +92,6 @@ imageio.mimsave("video.mp4", result, fps=4)
```
- #### SDXL Support
In order to use the SDXL model when generating a video from prompt, use the `TextToVideoZeroSDXLPipeline` pipeline:
```python
import torch
from diffusers import TextToVideoZeroSDXLPipeline
model_id = "stabilityai/stable-diffusion-xl-base-1.0"
pipe = TextToVideoZeroSDXLPipeline.from_pretrained(
model_id, torch_dtype=torch.float16, variant="fp16", use_safetensors=True
).to("cuda")
```
### Text-To-Video with Pose Control
To generate a video from prompt with additional pose control
@@ -154,33 +141,7 @@ To generate a video from prompt with additional pose control
result = pipe(prompt=[prompt] * len(pose_images), image=pose_images, latents=latents).images
imageio.mimsave("video.mp4", result, fps=4)
```
- #### SDXL Support
Since our attention processor also works with SDXL, it can be utilized to generate a video from prompt using ControlNet models powered by SDXL:
```python
import torch
from diffusers import StableDiffusionXLControlNetPipeline, ControlNetModel
from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero import CrossFrameAttnProcessor
controlnet_model_id = 'thibaud/controlnet-openpose-sdxl-1.0'
model_id = 'stabilityai/stable-diffusion-xl-base-1.0'
controlnet = ControlNetModel.from_pretrained(controlnet_model_id, torch_dtype=torch.float16)
pipe = StableDiffusionControlNetPipeline.from_pretrained(
model_id, controlnet=controlnet, torch_dtype=torch.float16
).to('cuda')
# Set the attention processor
pipe.unet.set_attn_processor(CrossFrameAttnProcessor(batch_size=2))
pipe.controlnet.set_attn_processor(CrossFrameAttnProcessor(batch_size=2))
# fix latents for all frames
latents = torch.randn((1, 4, 128, 128), device="cuda", dtype=torch.float16).repeat(len(pose_images), 1, 1, 1)
prompt = "Darth Vader dancing in a desert"
result = pipe(prompt=[prompt] * len(pose_images), image=pose_images, latents=latents).images
imageio.mimsave("video.mp4", result, fps=4)
```
### Text-To-Video with Edge Control
@@ -292,10 +253,5 @@ Make sure to check out the Schedulers [guide](../../using-diffusers/schedulers)
- all
- __call__
## TextToVideoZeroSDXLPipeline
[[autodoc]] TextToVideoZeroSDXLPipeline
- all
- __call__
## TextToVideoPipelineOutput
[[autodoc]] pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.TextToVideoPipelineOutput

View File

@@ -285,7 +285,6 @@ else:
"StableVideoDiffusionPipeline",
"TextToVideoSDPipeline",
"TextToVideoZeroPipeline",
"TextToVideoZeroSDXLPipeline",
"UnCLIPImageVariationPipeline",
"UnCLIPPipeline",
"UniDiffuserModel",
@@ -641,7 +640,6 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
StableVideoDiffusionPipeline,
TextToVideoSDPipeline,
TextToVideoZeroPipeline,
TextToVideoZeroSDXLPipeline,
UnCLIPImageVariationPipeline,
UnCLIPPipeline,
UniDiffuserModel,

View File

@@ -188,7 +188,6 @@ else:
_import_structure["text_to_video_synthesis"] = [
"TextToVideoSDPipeline",
"TextToVideoZeroPipeline",
"TextToVideoZeroSDXLPipeline",
"VideoToVideoSDPipeline",
]
_import_structure["unclip"] = ["UnCLIPImageVariationPipeline", "UnCLIPPipeline"]
@@ -441,7 +440,6 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
from .text_to_video_synthesis import (
TextToVideoSDPipeline,
TextToVideoZeroPipeline,
TextToVideoZeroSDXLPipeline,
VideoToVideoSDPipeline,
)
from .unclip import UnCLIPImageVariationPipeline, UnCLIPPipeline

View File

@@ -25,7 +25,6 @@ else:
_import_structure["pipeline_text_to_video_synth"] = ["TextToVideoSDPipeline"]
_import_structure["pipeline_text_to_video_synth_img2img"] = ["VideoToVideoSDPipeline"]
_import_structure["pipeline_text_to_video_zero"] = ["TextToVideoZeroPipeline"]
_import_structure["pipeline_text_to_video_zero_sdxl"] = ["TextToVideoZeroSDXLPipeline"]
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
@@ -39,7 +38,6 @@ if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
from .pipeline_text_to_video_synth import TextToVideoSDPipeline
from .pipeline_text_to_video_synth_img2img import VideoToVideoSDPipeline
from .pipeline_text_to_video_zero import TextToVideoZeroPipeline
from .pipeline_text_to_video_zero_sdxl import TextToVideoZeroSDXLPipeline
else:
import sys

View File

@@ -13,7 +13,6 @@ from diffusers.models import AutoencoderKL, UNet2DConditionModel
from diffusers.pipelines.stable_diffusion import StableDiffusionPipeline, StableDiffusionSafetyChecker
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import BaseOutput
from diffusers.utils.torch_utils import randn_tensor
def rearrange_0(tensor, f):
@@ -136,7 +135,7 @@ class CrossFrameAttnProcessor2_0:
# Cross Frame Attention
if not is_cross_attention:
video_length = max(1, key.size()[0] // self.batch_size)
video_length = key.size()[0] // self.batch_size
first_frame_index = [0] * video_length
# rearrange keys to have batch and frames in the 1st and 2nd dims respectively
@@ -340,7 +339,7 @@ class TextToVideoZeroPipeline(StableDiffusionPipeline):
x_t1:
Forward process applied to x_t0 from time t0 to t1.
"""
eps = randn_tensor(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
eps = torch.randn(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
alpha_vec = torch.prod(self.scheduler.alphas[t0:t1])
x_t1 = torch.sqrt(alpha_vec) * x_t0 + torch.sqrt(1 - alpha_vec) * eps
return x_t1

View File

@@ -1,872 +0,0 @@
import copy
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import PIL
import torch
import torch.nn.functional as F
from torch.nn.functional import grid_sample
from transformers import (
CLIPImageProcessor,
CLIPTextModel,
CLIPTextModelWithProjection,
CLIPTokenizer,
CLIPVisionModelWithProjection,
)
from diffusers.models import AutoencoderKL, UNet2DConditionModel
from diffusers.pipelines.stable_diffusion_xl import StableDiffusionXLPipeline
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import BaseOutput
from diffusers.utils.torch_utils import randn_tensor
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_0
def rearrange_0(tensor, f):
F, C, H, W = tensor.size()
tensor = torch.permute(torch.reshape(tensor, (F // f, f, C, H, W)), (0, 2, 1, 3, 4))
return tensor
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_1
def rearrange_1(tensor):
B, C, F, H, W = tensor.size()
return torch.reshape(torch.permute(tensor, (0, 2, 1, 3, 4)), (B * F, C, H, W))
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_3
def rearrange_3(tensor, f):
F, D, C = tensor.size()
return torch.reshape(tensor, (F // f, f, D, C))
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_4
def rearrange_4(tensor):
B, F, D, C = tensor.size()
return torch.reshape(tensor, (B * F, D, C))
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor
class CrossFrameAttnProcessor:
"""
Cross frame attention processor. Each frame attends the first frame.
Args:
batch_size: The number that represents actual batch size, other than the frames.
For example, calling unet with a single prompt and num_images_per_prompt=1, batch_size should be equal to
2, due to classifier-free guidance.
"""
def __init__(self, batch_size=2):
self.batch_size = batch_size
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
batch_size, sequence_length, _ = hidden_states.shape
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
query = attn.to_q(hidden_states)
is_cross_attention = encoder_hidden_states is not None
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# Cross Frame Attention
if not is_cross_attention:
video_length = key.size()[0] // self.batch_size
first_frame_index = [0] * video_length
# rearrange keys to have batch and frames in the 1st and 2nd dims respectively
key = rearrange_3(key, video_length)
key = key[:, first_frame_index]
# rearrange values to have batch and frames in the 1st and 2nd dims respectively
value = rearrange_3(value, video_length)
value = value[:, first_frame_index]
# rearrange back to original shape
key = rearrange_4(key)
value = rearrange_4(value)
query = attn.head_to_batch_dim(query)
key = attn.head_to_batch_dim(key)
value = attn.head_to_batch_dim(value)
attention_probs = attn.get_attention_scores(query, key, attention_mask)
hidden_states = torch.bmm(attention_probs, value)
hidden_states = attn.batch_to_head_dim(hidden_states)
# linear proj
hidden_states = attn.to_out[0](hidden_states)
# dropout
hidden_states = attn.to_out[1](hidden_states)
return hidden_states
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor2_0
class CrossFrameAttnProcessor2_0:
"""
Cross frame attention processor with scaled_dot_product attention of Pytorch 2.0.
Args:
batch_size: The number that represents actual batch size, other than the frames.
For example, calling unet with a single prompt and num_images_per_prompt=1, batch_size should be equal to
2, due to classifier-free guidance.
"""
def __init__(self, batch_size=2):
if not hasattr(F, "scaled_dot_product_attention"):
raise ImportError("AttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.")
self.batch_size = batch_size
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
batch_size, sequence_length, _ = (
hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
)
inner_dim = hidden_states.shape[-1]
if attention_mask is not None:
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# scaled_dot_product_attention expects attention_mask shape to be
# (batch, heads, source_length, target_length)
attention_mask = attention_mask.view(batch_size, attn.heads, -1, attention_mask.shape[-1])
query = attn.to_q(hidden_states)
is_cross_attention = encoder_hidden_states is not None
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# Cross Frame Attention
if not is_cross_attention:
video_length = max(1, key.size()[0] // self.batch_size)
first_frame_index = [0] * video_length
# rearrange keys to have batch and frames in the 1st and 2nd dims respectively
key = rearrange_3(key, video_length)
key = key[:, first_frame_index]
# rearrange values to have batch and frames in the 1st and 2nd dims respectively
value = rearrange_3(value, video_length)
value = value[:, first_frame_index]
# rearrange back to original shape
key = rearrange_4(key)
value = rearrange_4(value)
head_dim = inner_dim // attn.heads
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
# the output of sdp = (batch, num_heads, seq_len, head_dim)
# TODO: add support for attn.scale when we move to Torch 2.1
hidden_states = F.scaled_dot_product_attention(
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False
)
hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
hidden_states = hidden_states.to(query.dtype)
# linear proj
hidden_states = attn.to_out[0](hidden_states)
# dropout
hidden_states = attn.to_out[1](hidden_states)
return hidden_states
@dataclass
class TextToVideoSDXLPipelineOutput(BaseOutput):
"""
Output class for zero-shot text-to-video pipeline.
Args:
images (`List[PIL.Image.Image]` or `np.ndarray`)
List of denoised PIL images of length `batch_size` or numpy array of shape `(batch_size, height, width,
num_channels)`. PIL images or numpy array present the denoised images of the diffusion pipeline.
"""
images: Union[List[PIL.Image.Image], np.ndarray]
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.coords_grid
def coords_grid(batch, ht, wd, device):
# Adapted from https://github.com/princeton-vl/RAFT/blob/master/core/utils/utils.py
coords = torch.meshgrid(torch.arange(ht, device=device), torch.arange(wd, device=device))
coords = torch.stack(coords[::-1], dim=0).float()
return coords[None].repeat(batch, 1, 1, 1)
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.warp_single_latent
def warp_single_latent(latent, reference_flow):
"""
Warp latent of a single frame with given flow
Args:
latent: latent code of a single frame
reference_flow: flow which to warp the latent with
Returns:
warped: warped latent
"""
_, _, H, W = reference_flow.size()
_, _, h, w = latent.size()
coords0 = coords_grid(1, H, W, device=latent.device).to(latent.dtype)
coords_t0 = coords0 + reference_flow
coords_t0[:, 0] /= W
coords_t0[:, 1] /= H
coords_t0 = coords_t0 * 2.0 - 1.0
coords_t0 = F.interpolate(coords_t0, size=(h, w), mode="bilinear")
coords_t0 = torch.permute(coords_t0, (0, 2, 3, 1))
warped = grid_sample(latent, coords_t0, mode="nearest", padding_mode="reflection")
return warped
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.create_motion_field
def create_motion_field(motion_field_strength_x, motion_field_strength_y, frame_ids, device, dtype):
"""
Create translation motion field
Args:
motion_field_strength_x: motion strength along x-axis
motion_field_strength_y: motion strength along y-axis
frame_ids: indexes of the frames the latents of which are being processed.
This is needed when we perform chunk-by-chunk inference
device: device
dtype: dtype
Returns:
"""
seq_length = len(frame_ids)
reference_flow = torch.zeros((seq_length, 2, 512, 512), device=device, dtype=dtype)
for fr_idx in range(seq_length):
reference_flow[fr_idx, 0, :, :] = motion_field_strength_x * (frame_ids[fr_idx])
reference_flow[fr_idx, 1, :, :] = motion_field_strength_y * (frame_ids[fr_idx])
return reference_flow
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.create_motion_field_and_warp_latents
def create_motion_field_and_warp_latents(motion_field_strength_x, motion_field_strength_y, frame_ids, latents):
"""
Creates translation motion and warps the latents accordingly
Args:
motion_field_strength_x: motion strength along x-axis
motion_field_strength_y: motion strength along y-axis
frame_ids: indexes of the frames the latents of which are being processed.
This is needed when we perform chunk-by-chunk inference
latents: latent codes of frames
Returns:
warped_latents: warped latents
"""
motion_field = create_motion_field(
motion_field_strength_x=motion_field_strength_x,
motion_field_strength_y=motion_field_strength_y,
frame_ids=frame_ids,
device=latents.device,
dtype=latents.dtype,
)
warped_latents = latents.clone().detach()
for i in range(len(warped_latents)):
warped_latents[i] = warp_single_latent(latents[i][None], motion_field[i][None])
return warped_latents
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
Rescale `noise_cfg` according to `guidance_rescale`. Based on findings of [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). See Section 3.4
"""
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# rescale the results from guidance (fixes overexposure)
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# mix with the original results from guidance by factor guidance_rescale to avoid "plain looking" images
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
return noise_cfg
class TextToVideoZeroSDXLPipeline(StableDiffusionXLPipeline):
r"""
Pipeline for zero-shot text-to-video generation using Stable Diffusion XL.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
Args:
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModel`]):
Frozen text-encoder. Stable Diffusion XL uses the text portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.
text_encoder_2 ([` CLIPTextModelWithProjection`]):
Second frozen text-encoder. Stable Diffusion XL uses the text and pool portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the
[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
variant.
tokenizer (`CLIPTokenizer`):
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_2 (`CLIPTokenizer`):
Second Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
"""
def __init__(
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
text_encoder_2: CLIPTextModelWithProjection,
tokenizer: CLIPTokenizer,
tokenizer_2: CLIPTokenizer,
unet: UNet2DConditionModel,
scheduler: KarrasDiffusionSchedulers,
image_encoder: CLIPVisionModelWithProjection = None,
feature_extractor: CLIPImageProcessor = None,
force_zeros_for_empty_prompt: bool = True,
add_watermarker: Optional[bool] = None,
):
super().__init__(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
scheduler=scheduler,
image_encoder=image_encoder,
feature_extractor=feature_extractor,
force_zeros_for_empty_prompt=force_zeros_for_empty_prompt,
add_watermarker=add_watermarker,
)
processor = (
CrossFrameAttnProcessor2_0(batch_size=2)
if hasattr(F, "scaled_dot_product_attention")
else CrossFrameAttnProcessor(batch_size=2)
)
self.unet.set_attn_processor(processor)
# Copied from diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.TextToVideoZeroPipeline.forward_loop
def forward_loop(self, x_t0, t0, t1, generator):
"""
Perform DDPM forward process from time t0 to t1. This is the same as adding noise with corresponding variance.
Args:
x_t0:
Latent code at time t0.
t0:
Timestep at t0.
t1:
Timestamp at t1.
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make
generation deterministic.
Returns:
x_t1:
Forward process applied to x_t0 from time t0 to t1.
"""
eps = randn_tensor(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
alpha_vec = torch.prod(self.scheduler.alphas[t0:t1])
x_t1 = torch.sqrt(alpha_vec) * x_t0 + torch.sqrt(1 - alpha_vec) * eps
return x_t1
def backward_loop(
self,
latents,
timesteps,
prompt_embeds,
guidance_scale,
callback,
callback_steps,
num_warmup_steps,
extra_step_kwargs,
add_text_embeds,
add_time_ids,
cross_attention_kwargs=None,
guidance_rescale: float = 0.0,
):
"""
Perform backward process given list of time steps
Args:
latents:
Latents at time timesteps[0].
timesteps:
Time steps along which to perform backward process.
prompt_embeds:
Pre-generated text embeddings.
guidance_scale:
A higher guidance scale value encourages the model to generate images closely linked to the text
`prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`.
callback (`Callable`, *optional*):
A function that calls every `callback_steps` steps during inference. The function is called with the
following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.
callback_steps (`int`, *optional*, defaults to 1):
The frequency at which the `callback` function is called. If not specified, the callback is called at
every step.
extra_step_kwargs:
Extra_step_kwargs.
cross_attention_kwargs:
A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in
[`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
num_warmup_steps:
number of warmup steps.
Returns:
latents: latents of backward process output at time timesteps[-1]
"""
do_classifier_free_guidance = guidance_scale > 1.0
num_steps = (len(timesteps) - num_warmup_steps) // self.scheduler.order
with self.progress_bar(total=num_steps) as progress_bar:
for i, t in enumerate(timesteps):
# expand the latents if we are doing classifier free guidance
latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
# predict the noise residual
added_cond_kwargs = {"text_embeds": add_text_embeds, "time_ids": add_time_ids}
noise_pred = self.unet(
latent_model_input,
t,
encoder_hidden_states=prompt_embeds,
cross_attention_kwargs=cross_attention_kwargs,
added_cond_kwargs=added_cond_kwargs,
return_dict=False,
)[0]
# perform guidance
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
if do_classifier_free_guidance and guidance_rescale > 0.0:
# Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf
noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=guidance_rescale)
# compute the previous noisy sample x_t -> x_t-1
latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample
# call the callback, if provided
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
progress_bar.update()
if callback is not None and i % callback_steps == 0:
callback(i, t, latents)
return latents.clone().detach()
@torch.no_grad()
def __call__(
self,
prompt: Union[str, List[str]],
prompt_2: Optional[Union[str, List[str]]] = None,
video_length: Optional[int] = 8,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
denoising_end: Optional[float] = None,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
negative_prompt_2: Optional[Union[str, List[str]]] = None,
num_videos_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
frame_ids: Optional[List[int]] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
latents: Optional[torch.FloatTensor] = None,
motion_field_strength_x: float = 12,
motion_field_strength_y: float = 12,
output_type: Optional[str] = "tensor",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,
callback_steps: int = 1,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
guidance_rescale: float = 0.0,
original_size: Optional[Tuple[int, int]] = None,
crops_coords_top_left: Tuple[int, int] = (0, 0),
target_size: Optional[Tuple[int, int]] = None,
t0: int = 44,
t1: int = 47,
):
"""
Function invoked when calling the pipeline for generation.
Args:
prompt (`str` or `List[str]`, *optional*):
The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.
instead.
prompt_2 (`str` or `List[str]`, *optional*):
The prompt or prompts to be sent to the `tokenizer_2` and `text_encoder_2`. If not defined, `prompt` is
used in both text-encoders
video_length (`int`, *optional*, defaults to 8):
The number of generated video frames.
height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):
The width in pixels of the generated image.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
denoising_end (`float`, *optional*):
When specified, determines the fraction (between 0.0 and 1.0) of the total denoising process to be
completed before it is intentionally prematurely terminated. As a result, the returned sample will
still retain a substantial amount of noise as determined by the discrete timesteps selected by the
scheduler. The denoising_end parameter should ideally be utilized when this pipeline forms a part of a
"Mixture of Denoisers" multi-pipeline setup, as elaborated in [**Refining the Image
Output**](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion_xl#refining-the-image-output)
guidance_scale (`float`, *optional*, defaults to 7.5):
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).
`guidance_scale` is defined as `w` of equation 2. of [Imagen
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,
usually at the expense of lower image quality.
negative_prompt (`str` or `List[str]`, *optional*):
The prompt or prompts not to guide the image generation. If not defined, one has to pass
`negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is
less than `1`).
negative_prompt_2 (`str` or `List[str]`, *optional*):
The prompt or prompts not to guide the image generation to be sent to `tokenizer_2` and
`text_encoder_2`. If not defined, `negative_prompt` is used in both text-encoders
num_videos_per_prompt (`int`, *optional*, defaults to 1):
The number of videos to generate per prompt.
eta (`float`, *optional*, defaults to 0.0):
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to
[`schedulers.DDIMScheduler`], will be ignored for others.
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)
to make generation deterministic.
frame_ids (`List[int]`, *optional*):
Indexes of the frames that are being generated. This is used when generating longer videos
chunk-by-chunk.
prompt_embeds (`torch.FloatTensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`torch.FloatTensor`, *optional*):
Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input
argument.
pooled_prompt_embeds (`torch.FloatTensor`, *optional*):
Pre-generated pooled text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting.
If not provided, pooled text embeddings will be generated from `prompt` input argument.
negative_pooled_prompt_embeds (`torch.FloatTensor`, *optional*):
Pre-generated negative pooled text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
weighting. If not provided, pooled negative_prompt_embeds will be generated from `negative_prompt`
input argument.
latents (`torch.FloatTensor`, *optional*):
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
tensor will ge generated by sampling using the supplied random `generator`.
motion_field_strength_x (`float`, *optional*, defaults to 12):
Strength of motion in generated video along x-axis. See the [paper](https://arxiv.org/abs/2303.13439),
Sect. 3.3.1.
motion_field_strength_y (`float`, *optional*, defaults to 12):
Strength of motion in generated video along y-axis. See the [paper](https://arxiv.org/abs/2303.13439),
Sect. 3.3.1.
output_type (`str`, *optional*, defaults to `"pil"`):
The output format of the generate image. Choose between
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`~pipelines.stable_diffusion_xl.StableDiffusionXLPipelineOutput`] instead
of a plain tuple.
callback (`Callable`, *optional*):
A function that will be called every `callback_steps` steps during inference. The function will be
called with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.
callback_steps (`int`, *optional*, defaults to 1):
The frequency at which the `callback` function will be called. If not specified, the callback will be
called at every step.
cross_attention_kwargs (`dict`, *optional*):
A kwargs dictionary that if specified is passed along to the `AttentionProcessor` as defined under
`self.processor` in
[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).
guidance_rescale (`float`, *optional*, defaults to 0.7):
Guidance rescale factor proposed by [Common Diffusion Noise Schedules and Sample Steps are
Flawed](https://arxiv.org/pdf/2305.08891.pdf) `guidance_scale` is defined as `φ` in equation 16. of
[Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf).
Guidance rescale factor should fix overexposure when using zero terminal SNR.
original_size (`Tuple[int]`, *optional*, defaults to (1024, 1024)):
If `original_size` is not the same as `target_size` the image will appear to be down- or upsampled.
`original_size` defaults to `(width, height)` if not specified. Part of SDXL's micro-conditioning as
explained in section 2.2 of
[https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952).
crops_coords_top_left (`Tuple[int]`, *optional*, defaults to (0, 0)):
`crops_coords_top_left` can be used to generate an image that appears to be "cropped" from the position
`crops_coords_top_left` downwards. Favorable, well-centered images are usually achieved by setting
`crops_coords_top_left` to (0, 0). Part of SDXL's micro-conditioning as explained in section 2.2 of
[https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952).
target_size (`Tuple[int]`, *optional*, defaults to (1024, 1024)):
For most cases, `target_size` should be set to the desired height and width of the generated image. If
not specified it will default to `(width, height)`. Part of SDXL's micro-conditioning as explained in
section 2.2 of [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952).
t0 (`int`, *optional*, defaults to 44):
Timestep t0. Should be in the range [0, num_inference_steps - 1]. See the
[paper](https://arxiv.org/abs/2303.13439), Sect. 3.3.1.
t1 (`int`, *optional*, defaults to 47):
Timestep t0. Should be in the range [t0 + 1, num_inference_steps - 1]. See the
[paper](https://arxiv.org/abs/2303.13439), Sect. 3.3.1.
Returns:
[`~pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.TextToVideoSDXLPipelineOutput`] or
`tuple`: [`~pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.TextToVideoSDXLPipelineOutput`]
if `return_dict` is True, otherwise a `tuple`. When returning a tuple, the first element is a list with the
generated images.
"""
assert video_length > 0
if frame_ids is None:
frame_ids = list(range(video_length))
assert len(frame_ids) == video_length
assert num_videos_per_prompt == 1
if isinstance(prompt, str):
prompt = [prompt]
if isinstance(negative_prompt, str):
negative_prompt = [negative_prompt]
# 0. Default height and width to unet
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
original_size = original_size or (height, width)
target_size = target_size or (height, width)
# 1. Check inputs. Raise error if not correct
self.check_inputs(
prompt,
prompt_2,
height,
width,
callback_steps,
negative_prompt,
negative_prompt_2,
prompt_embeds,
negative_prompt_embeds,
pooled_prompt_embeds,
negative_pooled_prompt_embeds,
)
# 2. Define call parameters
batch_size = (
1 if isinstance(prompt, str) else len(prompt) if isinstance(prompt, list) else prompt_embeds.shape[0]
)
device = self._execution_device
# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
# corresponds to doing no classifier free guidance.
do_classifier_free_guidance = guidance_scale > 1.0
# 3. Encode input prompt
text_encoder_lora_scale = (
cross_attention_kwargs.get("scale", None) if cross_attention_kwargs is not None else None
)
(
prompt_embeds,
negative_prompt_embeds,
pooled_prompt_embeds,
negative_pooled_prompt_embeds,
) = self.encode_prompt(
prompt=prompt,
prompt_2=prompt_2,
device=device,
num_images_per_prompt=num_videos_per_prompt,
do_classifier_free_guidance=do_classifier_free_guidance,
negative_prompt=negative_prompt,
negative_prompt_2=negative_prompt_2,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
negative_pooled_prompt_embeds=negative_pooled_prompt_embeds,
lora_scale=text_encoder_lora_scale,
)
# 4. Prepare timesteps
self.scheduler.set_timesteps(num_inference_steps, device=device)
timesteps = self.scheduler.timesteps
# 5. Prepare latent variables
num_channels_latents = self.unet.config.in_channels
latents = self.prepare_latents(
batch_size * num_videos_per_prompt,
num_channels_latents,
height,
width,
prompt_embeds.dtype,
device,
generator,
latents,
)
# 6. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)
# 7. Prepare added time ids & embeddings
add_text_embeds = pooled_prompt_embeds
if self.text_encoder_2 is None:
text_encoder_projection_dim = int(pooled_prompt_embeds.shape[-1])
else:
text_encoder_projection_dim = self.text_encoder_2.config.projection_dim
add_time_ids = self._get_add_time_ids(
original_size,
crops_coords_top_left,
target_size,
dtype=prompt_embeds.dtype,
text_encoder_projection_dim=text_encoder_projection_dim,
)
if do_classifier_free_guidance:
prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds], dim=0)
add_text_embeds = torch.cat([negative_pooled_prompt_embeds, add_text_embeds], dim=0)
add_time_ids = torch.cat([add_time_ids, add_time_ids], dim=0)
prompt_embeds = prompt_embeds.to(device)
add_text_embeds = add_text_embeds.to(device)
add_time_ids = add_time_ids.to(device).repeat(batch_size * num_videos_per_prompt, 1)
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
# Perform the first backward process up to time T_1
x_1_t1 = self.backward_loop(
timesteps=timesteps[: -t1 - 1],
prompt_embeds=prompt_embeds,
latents=latents,
guidance_scale=guidance_scale,
callback=callback,
callback_steps=callback_steps,
extra_step_kwargs=extra_step_kwargs,
num_warmup_steps=num_warmup_steps,
add_text_embeds=add_text_embeds,
add_time_ids=add_time_ids,
)
scheduler_copy = copy.deepcopy(self.scheduler)
# Perform the second backward process up to time T_0
x_1_t0 = self.backward_loop(
timesteps=timesteps[-t1 - 1 : -t0 - 1],
prompt_embeds=prompt_embeds,
latents=x_1_t1,
guidance_scale=guidance_scale,
callback=callback,
callback_steps=callback_steps,
extra_step_kwargs=extra_step_kwargs,
num_warmup_steps=0,
add_text_embeds=add_text_embeds,
add_time_ids=add_time_ids,
)
# Propagate first frame latents at time T_0 to remaining frames
x_2k_t0 = x_1_t0.repeat(video_length - 1, 1, 1, 1)
# Add motion in latents at time T_0
x_2k_t0 = create_motion_field_and_warp_latents(
motion_field_strength_x=motion_field_strength_x,
motion_field_strength_y=motion_field_strength_y,
latents=x_2k_t0,
frame_ids=frame_ids[1:],
)
# Perform forward process up to time T_1
x_2k_t1 = self.forward_loop(
x_t0=x_2k_t0,
t0=timesteps[-t0 - 1].to(torch.long),
t1=timesteps[-t1 - 1].to(torch.long),
generator=generator,
)
# Perform backward process from time T_1 to 0
latents = torch.cat([x_1_t1, x_2k_t1])
self.scheduler = scheduler_copy
timesteps = timesteps[-t1 - 1 :]
b, l, d = prompt_embeds.size()
prompt_embeds = prompt_embeds[:, None].repeat(1, video_length, 1, 1).reshape(b * video_length, l, d)
b, k = add_text_embeds.size()
add_text_embeds = add_text_embeds[:, None].repeat(1, video_length, 1).reshape(b * video_length, k)
b, k = add_time_ids.size()
add_time_ids = add_time_ids[:, None].repeat(1, video_length, 1).reshape(b * video_length, k)
# 7.1 Apply denoising_end
if denoising_end is not None and isinstance(denoising_end, float) and denoising_end > 0 and denoising_end < 1:
discrete_timestep_cutoff = int(
round(
self.scheduler.config.num_train_timesteps
- (denoising_end * self.scheduler.config.num_train_timesteps)
)
)
num_inference_steps = len(list(filter(lambda ts: ts >= discrete_timestep_cutoff, timesteps)))
timesteps = timesteps[:num_inference_steps]
x_1k_0 = self.backward_loop(
timesteps=timesteps,
prompt_embeds=prompt_embeds,
latents=latents,
guidance_scale=guidance_scale,
callback=callback,
callback_steps=callback_steps,
extra_step_kwargs=extra_step_kwargs,
num_warmup_steps=0,
add_text_embeds=add_text_embeds,
add_time_ids=add_time_ids,
)
latents = x_1k_0
if not output_type == "latent":
# make sure the VAE is in float32 mode, as it overflows in float16
needs_upcasting = self.vae.dtype == torch.float16 and self.vae.config.force_upcast
if needs_upcasting:
self.upcast_vae()
latents = latents.to(next(iter(self.vae.post_quant_conv.parameters())).dtype)
image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False)[0]
# cast back to fp16 if needed
if needs_upcasting:
self.vae.to(dtype=torch.float16)
else:
image = latents
return TextToVideoSDXLPipelineOutput(images=image)
# apply watermark if available
if self.watermark is not None:
image = self.watermark.apply_watermark(image)
image = self.image_processor.postprocess(image, output_type=output_type)
# Offload last model to CPU manually for max memory savings
if hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:
self.final_offload_hook.offload()
if not return_dict:
return (image,)
return TextToVideoSDXLPipelineOutput(images=image)

View File

@@ -1247,21 +1247,6 @@ class TextToVideoZeroPipeline(metaclass=DummyObject):
requires_backends(cls, ["torch", "transformers"])
class TextToVideoZeroSDXLPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class UnCLIPImageVariationPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]

View File

@@ -1,405 +0,0 @@
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import inspect
import io
import re
import tempfile
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTextModelWithProjection, CLIPTokenizer
from diffusers import AutoencoderKL, DDIMScheduler, TextToVideoZeroSDXLPipeline, UNet2DConditionModel
from diffusers.utils.import_utils import is_accelerate_available, is_accelerate_version
from diffusers.utils.testing_utils import enable_full_determinism, nightly, require_torch_gpu, torch_device
from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_IMAGE_PARAMS, TEXT_TO_IMAGE_PARAMS
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
def to_np(tensor):
if isinstance(tensor, torch.Tensor):
tensor = tensor.detach().cpu().numpy()
return tensor
class TextToVideoZeroSDXLPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = TextToVideoZeroSDXLPipeline
params = TEXT_TO_IMAGE_PARAMS
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
image_params = TEXT_TO_IMAGE_IMAGE_PARAMS
image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS
generator_device = "cpu"
def get_dummy_components(self, seed=0):
torch.manual_seed(seed)
unet = UNet2DConditionModel(
block_out_channels=(2, 4),
layers_per_block=2,
sample_size=2,
norm_num_groups=2,
in_channels=4,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
# SD2-specific config below
attention_head_dim=(2, 4),
use_linear_projection=True,
addition_embed_type="text_time",
addition_time_embed_dim=8,
transformer_layers_per_block=(1, 2),
projection_class_embeddings_input_dim=80, # 6 * 8 + 32
cross_attention_dim=64,
)
scheduler = DDIMScheduler(
num_train_timesteps=1000,
beta_start=0.0001,
beta_end=0.02,
beta_schedule="linear",
trained_betas=None,
clip_sample=True,
set_alpha_to_one=True,
steps_offset=0,
prediction_type="epsilon",
thresholding=False,
dynamic_thresholding_ratio=0.995,
clip_sample_range=1.0,
sample_max_value=1.0,
timestep_spacing="leading",
rescale_betas_zero_snr=False,
)
torch.manual_seed(seed)
vae = AutoencoderKL(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4,
sample_size=128,
)
torch.manual_seed(seed)
text_encoder_config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=32,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
# SD2-specific config below
hidden_act="gelu",
projection_dim=32,
)
text_encoder = CLIPTextModel(text_encoder_config)
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config)
tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"text_encoder_2": text_encoder_2,
"tokenizer_2": tokenizer_2,
"image_encoder": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A panda dancing in Antarctica",
"generator": generator,
"num_inference_steps": 5,
"t0": 1,
"t1": 3,
"height": 64,
"width": 64,
"video_length": 3,
"output_type": "np",
}
return inputs
def get_generator(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
return generator
def test_text_to_video_zero_sdxl(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
inputs = self.get_dummy_inputs(self.generator_device)
result = pipe(**inputs).images
first_frame_slice = result[0, -3:, -3:, -1]
last_frame_slice = result[-1, -3:, -3:, 0]
expected_slice1 = np.array([0.48, 0.58, 0.53, 0.59, 0.50, 0.44, 0.60, 0.65, 0.52])
expected_slice2 = np.array([0.66, 0.49, 0.40, 0.70, 0.47, 0.51, 0.73, 0.65, 0.52])
assert np.abs(first_frame_slice.flatten() - expected_slice1).max() < 1e-2
assert np.abs(last_frame_slice.flatten() - expected_slice2).max() < 1e-2
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_attention_slicing_forward_pass(self):
pass
def test_cfg(self):
sig = inspect.signature(self.pipeline_class.__call__)
if "guidance_scale" not in sig.parameters:
return
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
inputs["guidance_scale"] = 1.0
out_no_cfg = pipe(**inputs)[0]
inputs["guidance_scale"] = 7.5
out_cfg = pipe(**inputs)[0]
assert out_cfg.shape == out_no_cfg.shape
def test_dict_tuple_outputs_equivalent(self, expected_max_difference=1e-4):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
output = pipe(**self.get_dummy_inputs(self.generator_device))[0]
output_tuple = pipe(**self.get_dummy_inputs(self.generator_device), return_dict=False)[0]
max_diff = np.abs(to_np(output) - to_np(output_tuple)).max()
self.assertLess(max_diff, expected_max_difference)
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_float16_inference(self, expected_max_diff=5e-2):
components = self.get_dummy_components()
for name, module in components.items():
if hasattr(module, "half"):
components[name] = module.to(torch_device).half()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
components = self.get_dummy_components()
pipe_fp16 = self.pipeline_class(**components)
pipe_fp16.to(torch_device, torch.float16)
pipe_fp16.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
# # Reset generator in case it is used inside dummy inputs
if "generator" in inputs:
inputs["generator"] = self.get_generator(self.generator_device)
output = pipe(**inputs)[0]
fp16_inputs = self.get_dummy_inputs(self.generator_device)
# Reset generator in case it is used inside dummy inputs
if "generator" in fp16_inputs:
fp16_inputs["generator"] = self.get_generator(self.generator_device)
output_fp16 = pipe_fp16(**fp16_inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_fp16)).max()
self.assertLess(max_diff, expected_max_diff, "The outputs of the fp16 and fp32 pipelines are too different.")
@unittest.skip(reason="Batching needs to be properly figured out first for this pipeline.")
def test_inference_batch_consistent(self):
pass
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_inference_batch_single_identical(self):
pass
@unittest.skipIf(
torch_device != "cuda" or not is_accelerate_available() or is_accelerate_version("<", "0.17.0"),
reason="CPU offload is only available with CUDA and `accelerate v0.17.0` or higher",
)
def test_model_cpu_offload_forward_pass(self, expected_max_diff=2e-4):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
output_without_offload = pipe(**inputs)[0]
pipe.enable_model_cpu_offload()
inputs = self.get_dummy_inputs(self.generator_device)
output_with_offload = pipe(**inputs)[0]
max_diff = np.abs(to_np(output_with_offload) - to_np(output_without_offload)).max()
self.assertLess(max_diff, expected_max_diff, "CPU offloading should not affect the inference results")
@unittest.skip(reason="`num_images_per_prompt` argument is not supported for this pipeline.")
def test_pipeline_call_signature(self):
pass
def test_progress_bar(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
inputs = self.get_dummy_inputs(self.generator_device)
with io.StringIO() as stderr, contextlib.redirect_stderr(stderr):
_ = pipe(**inputs)
stderr = stderr.getvalue()
# we can't calculate the number of progress steps beforehand e.g. for strength-dependent img2img,
# so we just match "5" in "#####| 1/5 [00:01<00:00]"
max_steps = re.search("/(.*?) ", stderr).group(1)
self.assertTrue(max_steps is not None and len(max_steps) > 0)
self.assertTrue(
f"{max_steps}/{max_steps}" in stderr, "Progress bar should be enabled and stopped at the max step"
)
pipe.set_progress_bar_config(disable=True)
with io.StringIO() as stderr, contextlib.redirect_stderr(stderr):
_ = pipe(**inputs)
self.assertTrue(stderr.getvalue() == "", "Progress bar should be disabled")
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self, expected_max_diff=1e-2):
components = self.get_dummy_components()
for name, module in components.items():
if hasattr(module, "half"):
components[name] = module.to(torch_device).half()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
output = pipe(**inputs)[0]
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
pipe_loaded = self.pipeline_class.from_pretrained(tmpdir, torch_dtype=torch.float16)
pipe_loaded.to(torch_device)
pipe_loaded.set_progress_bar_config(disable=None)
for name, component in pipe_loaded.components.items():
if hasattr(component, "dtype"):
self.assertTrue(
component.dtype == torch.float16,
f"`{name}.dtype` switched from `float16` to {component.dtype} after loading.",
)
inputs = self.get_dummy_inputs(self.generator_device)
output_loaded = pipe_loaded(**inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_loaded)).max()
self.assertLess(
max_diff, expected_max_diff, "The output of the fp16 pipeline changed after saving and loading."
)
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_save_load_local(self):
pass
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_save_load_optional_components(self):
pass
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_sequential_cpu_offload_forward_pass(self):
pass
@unittest.skipIf(torch_device != "cuda", reason="CUDA and CPU are required to switch devices")
def test_to_device(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.set_progress_bar_config(disable=None)
pipe.to("cpu")
model_devices = [component.device.type for component in components.values() if hasattr(component, "device")]
self.assertTrue(all(device == "cpu" for device in model_devices))
output_cpu = pipe(**self.get_dummy_inputs("cpu"))[0] # generator set to cpu
self.assertTrue(np.isnan(output_cpu).sum() == 0)
pipe.to("cuda")
model_devices = [component.device.type for component in components.values() if hasattr(component, "device")]
self.assertTrue(all(device == "cuda" for device in model_devices))
output_cuda = pipe(**self.get_dummy_inputs("cpu"))[0] # generator set to cpu
self.assertTrue(np.isnan(to_np(output_cuda)).sum() == 0)
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_xformers_attention_forwardGenerator_pass(self):
pass
@nightly
@require_torch_gpu
class TextToVideoZeroSDXLPipelineSlowTests(unittest.TestCase):
def test_full_model(self):
model_id = "stabilityai/stable-diffusion-xl-base-1.0"
pipe = self.pipeline_class.from_pretrained(
model_id, torch_dtype=torch.float16, variant="fp16", use_safetensors=True
)
pipe.enable_model_cpu_offload()
pipe.enable_vae_slicing()
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
generator = torch.Generator(device="cpu").manual_seed(0)
prompt = "A panda dancing in Antarctica"
result = pipe(prompt=prompt, generator=generator).images
first_frame_slice = result[0, -3:, -3:, -1]
last_frame_slice = result[-1, -3:, -3:, 0]
expected_slice1 = np.array([0.57, 0.57, 0.57, 0.57, 0.57, 0.56, 0.55, 0.56, 0.56])
expected_slice2 = np.array([0.54, 0.53, 0.53, 0.53, 0.53, 0.52, 0.53, 0.53, 0.53])
assert np.abs(first_frame_slice.flatten() - expected_slice1).max() < 1e-2
assert np.abs(last_frame_slice.flatten() - expected_slice2).max() < 1e-2