mirror of
https://github.com/huggingface/diffusers.git
synced 2026-01-27 17:22:53 +03:00
469 lines
23 KiB
Python
469 lines
23 KiB
Python
# Copyright 2025 The HuggingFace Team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.utils.checkpoint
|
|
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection
|
|
|
|
from diffusers.configuration_utils import register_to_config
|
|
from diffusers.image_processor import VaeImageProcessor
|
|
from diffusers.models.autoencoders import AutoencoderKL
|
|
from diffusers.models.unets.unet_2d_condition import UNet2DConditionModel, UNet2DConditionOutput
|
|
from diffusers.pipelines.stable_diffusion import StableDiffusionPipeline
|
|
from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker
|
|
from diffusers.schedulers import KarrasDiffusionSchedulers
|
|
from diffusers.utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
|
|
|
|
|
|
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
|
|
|
|
|
|
class UNet2DConditionModelHighResFix(UNet2DConditionModel):
|
|
r"""
|
|
A conditional 2D UNet model that applies Kohya fix proposed for high resolution image generation.
|
|
|
|
This model inherits from [`UNet2DConditionModel`]. Check the superclass documentation for learning about all the parameters.
|
|
|
|
Parameters:
|
|
high_res_fix (`List[Dict]`, *optional*, defaults to `[{'timestep': 600, 'scale_factor': 0.5, 'block_num': 1}]`):
|
|
Enables Kohya fix for high resolution generation. The activation maps are scaled based on the scale_factor up to the timestep at specified block_num.
|
|
"""
|
|
|
|
_supports_gradient_checkpointing = True
|
|
|
|
@register_to_config
|
|
def __init__(self, high_res_fix: List[Dict] = [{"timestep": 600, "scale_factor": 0.5, "block_num": 1}], **kwargs):
|
|
super().__init__(**kwargs)
|
|
if high_res_fix:
|
|
self.config.high_res_fix = sorted(high_res_fix, key=lambda x: x["timestep"], reverse=True)
|
|
|
|
@classmethod
|
|
def _resize(cls, sample, target=None, scale_factor=1, mode="bicubic"):
|
|
dtype = sample.dtype
|
|
if dtype == torch.bfloat16:
|
|
sample = sample.to(torch.float32)
|
|
|
|
if target is not None:
|
|
if sample.shape[-2:] != target.shape[-2:]:
|
|
sample = nn.functional.interpolate(sample, size=target.shape[-2:], mode=mode, align_corners=False)
|
|
elif scale_factor != 1:
|
|
sample = nn.functional.interpolate(sample, scale_factor=scale_factor, mode=mode, align_corners=False)
|
|
|
|
return sample.to(dtype)
|
|
|
|
def forward(
|
|
self,
|
|
sample: torch.FloatTensor,
|
|
timestep: Union[torch.Tensor, float, int],
|
|
encoder_hidden_states: torch.Tensor,
|
|
class_labels: Optional[torch.Tensor] = None,
|
|
timestep_cond: Optional[torch.Tensor] = None,
|
|
attention_mask: Optional[torch.Tensor] = None,
|
|
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
|
|
added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None,
|
|
down_block_additional_residuals: Optional[Tuple[torch.Tensor]] = None,
|
|
mid_block_additional_residual: Optional[torch.Tensor] = None,
|
|
down_intrablock_additional_residuals: Optional[Tuple[torch.Tensor]] = None,
|
|
encoder_attention_mask: Optional[torch.Tensor] = None,
|
|
return_dict: bool = True,
|
|
) -> Union[UNet2DConditionOutput, Tuple]:
|
|
r"""
|
|
The [`UNet2DConditionModel`] forward method.
|
|
|
|
Args:
|
|
sample (`torch.FloatTensor`):
|
|
The noisy input tensor with the following shape `(batch, channel, height, width)`.
|
|
timestep (`torch.FloatTensor` or `float` or `int`): The number of timesteps to denoise an input.
|
|
encoder_hidden_states (`torch.FloatTensor`):
|
|
The encoder hidden states with shape `(batch, sequence_length, feature_dim)`.
|
|
class_labels (`torch.Tensor`, *optional*, defaults to `None`):
|
|
Optional class labels for conditioning. Their embeddings will be summed with the timestep embeddings.
|
|
timestep_cond: (`torch.Tensor`, *optional*, defaults to `None`):
|
|
Conditional embeddings for timestep. If provided, the embeddings will be summed with the samples passed
|
|
through the `self.time_embedding` layer to obtain the timestep embeddings.
|
|
attention_mask (`torch.Tensor`, *optional*, defaults to `None`):
|
|
An attention mask of shape `(batch, key_tokens)` is applied to `encoder_hidden_states`. If `1` the mask
|
|
is kept, otherwise if `0` it is discarded. Mask will be converted into a bias, which adds large
|
|
negative values to the attention scores corresponding to "discard" tokens.
|
|
cross_attention_kwargs (`dict`, *optional*):
|
|
A kwargs dictionary that if specified is passed along to the `AttentionProcessor` as defined under
|
|
`self.processor` in
|
|
[diffusers.models.attention_processor](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
|
|
added_cond_kwargs: (`dict`, *optional*):
|
|
A kwargs dictionary containing additional embeddings that if specified are added to the embeddings that
|
|
are passed along to the UNet blocks.
|
|
down_block_additional_residuals: (`tuple` of `torch.Tensor`, *optional*):
|
|
A tuple of tensors that if specified are added to the residuals of down unet blocks.
|
|
mid_block_additional_residual: (`torch.Tensor`, *optional*):
|
|
A tensor that if specified is added to the residual of the middle unet block.
|
|
down_intrablock_additional_residuals (`tuple` of `torch.Tensor`, *optional*):
|
|
additional residuals to be added within UNet down blocks, for example from T2I-Adapter side model(s)
|
|
encoder_attention_mask (`torch.Tensor`):
|
|
A cross-attention mask of shape `(batch, sequence_length)` is applied to `encoder_hidden_states`. If
|
|
`True` the mask is kept, otherwise if `False` it is discarded. Mask will be converted into a bias,
|
|
which adds large negative values to the attention scores corresponding to "discard" tokens.
|
|
return_dict (`bool`, *optional*, defaults to `True`):
|
|
Whether or not to return a [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] instead of a plain
|
|
tuple.
|
|
|
|
Returns:
|
|
[`~models.unets.unet_2d_condition.UNet2DConditionOutput`] or `tuple`:
|
|
If `return_dict` is True, an [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] is returned,
|
|
otherwise a `tuple` is returned where the first element is the sample tensor.
|
|
"""
|
|
# By default samples have to be AT least a multiple of the overall upsampling factor.
|
|
# The overall upsampling factor is equal to 2 ** (# num of upsampling layers).
|
|
# However, the upsampling interpolation output size can be forced to fit any upsampling size
|
|
# on the fly if necessary.
|
|
default_overall_up_factor = 2**self.num_upsamplers
|
|
|
|
# upsample size should be forwarded when sample is not a multiple of `default_overall_up_factor`
|
|
forward_upsample_size = False
|
|
upsample_size = None
|
|
|
|
for dim in sample.shape[-2:]:
|
|
if dim % default_overall_up_factor != 0:
|
|
# Forward upsample size to force interpolation output size.
|
|
forward_upsample_size = True
|
|
break
|
|
|
|
# ensure attention_mask is a bias, and give it a singleton query_tokens dimension
|
|
# expects mask of shape:
|
|
# [batch, key_tokens]
|
|
# adds singleton query_tokens dimension:
|
|
# [batch, 1, key_tokens]
|
|
# this helps to broadcast it as a bias over attention scores, which will be in one of the following shapes:
|
|
# [batch, heads, query_tokens, key_tokens] (e.g. torch sdp attn)
|
|
# [batch * heads, query_tokens, key_tokens] (e.g. xformers or classic attn)
|
|
if attention_mask is not None:
|
|
# assume that mask is expressed as:
|
|
# (1 = keep, 0 = discard)
|
|
# convert mask into a bias that can be added to attention scores:
|
|
# (keep = +0, discard = -10000.0)
|
|
attention_mask = (1 - attention_mask.to(sample.dtype)) * -10000.0
|
|
attention_mask = attention_mask.unsqueeze(1)
|
|
|
|
# convert encoder_attention_mask to a bias the same way we do for attention_mask
|
|
if encoder_attention_mask is not None:
|
|
encoder_attention_mask = (1 - encoder_attention_mask.to(sample.dtype)) * -10000.0
|
|
encoder_attention_mask = encoder_attention_mask.unsqueeze(1)
|
|
|
|
# 0. center input if necessary
|
|
if self.config.center_input_sample:
|
|
sample = 2 * sample - 1.0
|
|
|
|
# 1. time
|
|
t_emb = self.get_time_embed(sample=sample, timestep=timestep)
|
|
emb = self.time_embedding(t_emb, timestep_cond)
|
|
aug_emb = None
|
|
|
|
class_emb = self.get_class_embed(sample=sample, class_labels=class_labels)
|
|
if class_emb is not None:
|
|
if self.config.class_embeddings_concat:
|
|
emb = torch.cat([emb, class_emb], dim=-1)
|
|
else:
|
|
emb = emb + class_emb
|
|
|
|
aug_emb = self.get_aug_embed(
|
|
emb=emb, encoder_hidden_states=encoder_hidden_states, added_cond_kwargs=added_cond_kwargs
|
|
)
|
|
if self.config.addition_embed_type == "image_hint":
|
|
aug_emb, hint = aug_emb
|
|
sample = torch.cat([sample, hint], dim=1)
|
|
|
|
emb = emb + aug_emb if aug_emb is not None else emb
|
|
|
|
if self.time_embed_act is not None:
|
|
emb = self.time_embed_act(emb)
|
|
|
|
encoder_hidden_states = self.process_encoder_hidden_states(
|
|
encoder_hidden_states=encoder_hidden_states, added_cond_kwargs=added_cond_kwargs
|
|
)
|
|
|
|
# 2. pre-process
|
|
sample = self.conv_in(sample)
|
|
|
|
# 2.5 GLIGEN position net
|
|
if cross_attention_kwargs is not None and cross_attention_kwargs.get("gligen", None) is not None:
|
|
cross_attention_kwargs = cross_attention_kwargs.copy()
|
|
gligen_args = cross_attention_kwargs.pop("gligen")
|
|
cross_attention_kwargs["gligen"] = {"objs": self.position_net(**gligen_args)}
|
|
|
|
# 3. down
|
|
# we're popping the `scale` instead of getting it because otherwise `scale` will be propagated
|
|
# to the internal blocks and will raise deprecation warnings. this will be confusing for our users.
|
|
if cross_attention_kwargs is not None:
|
|
cross_attention_kwargs = cross_attention_kwargs.copy()
|
|
lora_scale = cross_attention_kwargs.pop("scale", 1.0)
|
|
else:
|
|
lora_scale = 1.0
|
|
|
|
if USE_PEFT_BACKEND:
|
|
# weight the lora layers by setting `lora_scale` for each PEFT layer
|
|
scale_lora_layers(self, lora_scale)
|
|
|
|
is_controlnet = mid_block_additional_residual is not None and down_block_additional_residuals is not None
|
|
# using new arg down_intrablock_additional_residuals for T2I-Adapters, to distinguish from controlnets
|
|
is_adapter = down_intrablock_additional_residuals is not None
|
|
# maintain backward compatibility for legacy usage, where
|
|
# T2I-Adapter and ControlNet both use down_block_additional_residuals arg
|
|
# but can only use one or the other
|
|
if not is_adapter and mid_block_additional_residual is None and down_block_additional_residuals is not None:
|
|
deprecate(
|
|
"T2I should not use down_block_additional_residuals",
|
|
"1.3.0",
|
|
"Passing intrablock residual connections with `down_block_additional_residuals` is deprecated \
|
|
and will be removed in diffusers 1.3.0. `down_block_additional_residuals` should only be used \
|
|
for ControlNet. Please make sure use `down_intrablock_additional_residuals` instead. ",
|
|
standard_warn=False,
|
|
)
|
|
down_intrablock_additional_residuals = down_block_additional_residuals
|
|
is_adapter = True
|
|
|
|
down_block_res_samples = (sample,)
|
|
for down_i, downsample_block in enumerate(self.down_blocks):
|
|
if hasattr(downsample_block, "has_cross_attention") and downsample_block.has_cross_attention:
|
|
# For t2i-adapter CrossAttnDownBlock2D
|
|
additional_residuals = {}
|
|
if is_adapter and len(down_intrablock_additional_residuals) > 0:
|
|
additional_residuals["additional_residuals"] = down_intrablock_additional_residuals.pop(0)
|
|
|
|
sample, res_samples = downsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
attention_mask=attention_mask,
|
|
cross_attention_kwargs=cross_attention_kwargs,
|
|
encoder_attention_mask=encoder_attention_mask,
|
|
**additional_residuals,
|
|
)
|
|
|
|
else:
|
|
sample, res_samples = downsample_block(hidden_states=sample, temb=emb)
|
|
if is_adapter and len(down_intrablock_additional_residuals) > 0:
|
|
sample += down_intrablock_additional_residuals.pop(0)
|
|
|
|
down_block_res_samples += res_samples
|
|
|
|
# kohya high res fix
|
|
if self.config.high_res_fix:
|
|
for high_res_fix in self.config.high_res_fix:
|
|
if timestep > high_res_fix["timestep"] and down_i == high_res_fix["block_num"]:
|
|
sample = self.__class__._resize(sample, scale_factor=high_res_fix["scale_factor"])
|
|
break
|
|
|
|
if is_controlnet:
|
|
new_down_block_res_samples = ()
|
|
|
|
for down_block_res_sample, down_block_additional_residual in zip(
|
|
down_block_res_samples, down_block_additional_residuals
|
|
):
|
|
down_block_res_sample = down_block_res_sample + down_block_additional_residual
|
|
new_down_block_res_samples = new_down_block_res_samples + (down_block_res_sample,)
|
|
|
|
down_block_res_samples = new_down_block_res_samples
|
|
|
|
# 4. mid
|
|
if self.mid_block is not None:
|
|
if hasattr(self.mid_block, "has_cross_attention") and self.mid_block.has_cross_attention:
|
|
sample = self.mid_block(
|
|
sample,
|
|
emb,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
attention_mask=attention_mask,
|
|
cross_attention_kwargs=cross_attention_kwargs,
|
|
encoder_attention_mask=encoder_attention_mask,
|
|
)
|
|
else:
|
|
sample = self.mid_block(sample, emb)
|
|
|
|
# To support T2I-Adapter-XL
|
|
if (
|
|
is_adapter
|
|
and len(down_intrablock_additional_residuals) > 0
|
|
and sample.shape == down_intrablock_additional_residuals[0].shape
|
|
):
|
|
sample += down_intrablock_additional_residuals.pop(0)
|
|
|
|
if is_controlnet:
|
|
sample = sample + mid_block_additional_residual
|
|
|
|
# 5. up
|
|
for i, upsample_block in enumerate(self.up_blocks):
|
|
is_final_block = i == len(self.up_blocks) - 1
|
|
|
|
res_samples = down_block_res_samples[-len(upsample_block.resnets) :]
|
|
down_block_res_samples = down_block_res_samples[: -len(upsample_block.resnets)]
|
|
|
|
# up scaling of kohya high res fix
|
|
if self.config.high_res_fix is not None:
|
|
if res_samples[0].shape[-2:] != sample.shape[-2:]:
|
|
sample = self.__class__._resize(sample, target=res_samples[0])
|
|
res_samples_up_sampled = (res_samples[0],)
|
|
for res_sample in res_samples[1:]:
|
|
res_samples_up_sampled += (self.__class__._resize(res_sample, target=res_samples[0]),)
|
|
res_samples = res_samples_up_sampled
|
|
|
|
# if we have not reached the final block and need to forward the
|
|
# upsample size, we do it here
|
|
if not is_final_block and forward_upsample_size:
|
|
upsample_size = down_block_res_samples[-1].shape[2:]
|
|
|
|
if hasattr(upsample_block, "has_cross_attention") and upsample_block.has_cross_attention:
|
|
sample = upsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
res_hidden_states_tuple=res_samples,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
cross_attention_kwargs=cross_attention_kwargs,
|
|
upsample_size=upsample_size,
|
|
attention_mask=attention_mask,
|
|
encoder_attention_mask=encoder_attention_mask,
|
|
)
|
|
else:
|
|
sample = upsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
res_hidden_states_tuple=res_samples,
|
|
upsample_size=upsample_size,
|
|
)
|
|
|
|
# 6. post-process
|
|
if self.conv_norm_out:
|
|
sample = self.conv_norm_out(sample)
|
|
sample = self.conv_act(sample)
|
|
sample = self.conv_out(sample)
|
|
|
|
if USE_PEFT_BACKEND:
|
|
# remove `lora_scale` from each PEFT layer
|
|
unscale_lora_layers(self, lora_scale)
|
|
|
|
if not return_dict:
|
|
return (sample,)
|
|
|
|
return UNet2DConditionOutput(sample=sample)
|
|
|
|
@classmethod
|
|
def from_unet(cls, unet: UNet2DConditionModel, high_res_fix: list):
|
|
config = dict((unet.config))
|
|
config["high_res_fix"] = high_res_fix
|
|
unet_high_res = cls(**config)
|
|
unet_high_res.load_state_dict(unet.state_dict())
|
|
unet_high_res.to(unet.dtype)
|
|
return unet_high_res
|
|
|
|
|
|
EXAMPLE_DOC_STRING = """
|
|
Examples:
|
|
```py
|
|
>>> import torch
|
|
>>> from diffusers import DiffusionPipeline
|
|
|
|
>>> pipe = DiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4",
|
|
custom_pipeline="kohya_hires_fix",
|
|
torch_dtype=torch.float16,
|
|
high_res_fix=[{'timestep': 600,
|
|
'scale_factor': 0.5,
|
|
'block_num': 1}])
|
|
>>> pipe = pipe.to("cuda")
|
|
|
|
>>> prompt = "a photo of an astronaut riding a horse on mars"
|
|
>>> image = pipe(prompt, height=1000, width=1600).images[0]
|
|
```
|
|
"""
|
|
|
|
|
|
class StableDiffusionHighResFixPipeline(StableDiffusionPipeline):
|
|
r"""
|
|
Pipeline for text-to-image generation using Stable Diffusion with Kohya fix for high resolution generation.
|
|
|
|
This model inherits from [`StableDiffusionPipeline`]. Check the superclass documentation for the generic methods.
|
|
|
|
The pipeline also inherits the following loading methods:
|
|
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
|
|
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
|
|
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
|
|
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
|
|
- [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
|
|
|
|
Args:
|
|
vae ([`AutoencoderKL`]):
|
|
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
|
|
text_encoder ([`~transformers.CLIPTextModel`]):
|
|
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
|
|
tokenizer ([`~transformers.CLIPTokenizer`]):
|
|
A `CLIPTokenizer` to tokenize text.
|
|
unet ([`UNet2DConditionModel`]):
|
|
A `UNet2DConditionModel` to denoise the encoded image latents.
|
|
scheduler ([`SchedulerMixin`]):
|
|
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
|
|
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
|
|
safety_checker ([`StableDiffusionSafetyChecker`]):
|
|
Classification module that estimates whether generated images could be considered offensive or harmful.
|
|
Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
|
|
about a model's potential harms.
|
|
feature_extractor ([`~transformers.CLIPImageProcessor`]):
|
|
A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
|
|
high_res_fix (`List[Dict]`, *optional*, defaults to `[{'timestep': 600, 'scale_factor': 0.5, 'block_num': 1}]`):
|
|
Enables Kohya fix for high resolution generation. The activation maps are scaled based on the scale_factor up to the timestep at specified block_num.
|
|
"""
|
|
|
|
model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
|
|
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
|
|
_exclude_from_cpu_offload = ["safety_checker"]
|
|
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
|
|
|
|
def __init__(
|
|
self,
|
|
vae: AutoencoderKL,
|
|
text_encoder: CLIPTextModel,
|
|
tokenizer: CLIPTokenizer,
|
|
unet: UNet2DConditionModel,
|
|
scheduler: KarrasDiffusionSchedulers,
|
|
safety_checker: StableDiffusionSafetyChecker,
|
|
feature_extractor: CLIPImageProcessor,
|
|
image_encoder: CLIPVisionModelWithProjection = None,
|
|
requires_safety_checker: bool = True,
|
|
high_res_fix: List[Dict] = [{"timestep": 600, "scale_factor": 0.5, "block_num": 1}],
|
|
):
|
|
super().__init__(
|
|
vae=vae,
|
|
text_encoder=text_encoder,
|
|
tokenizer=tokenizer,
|
|
unet=unet,
|
|
scheduler=scheduler,
|
|
safety_checker=safety_checker,
|
|
feature_extractor=feature_extractor,
|
|
image_encoder=image_encoder,
|
|
requires_safety_checker=requires_safety_checker,
|
|
)
|
|
|
|
unet = UNet2DConditionModelHighResFix.from_unet(unet=unet, high_res_fix=high_res_fix)
|
|
self.register_modules(
|
|
vae=vae,
|
|
text_encoder=text_encoder,
|
|
tokenizer=tokenizer,
|
|
unet=unet,
|
|
scheduler=scheduler,
|
|
safety_checker=safety_checker,
|
|
feature_extractor=feature_extractor,
|
|
image_encoder=image_encoder,
|
|
)
|
|
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) if getattr(self, "vae", None) else 8
|
|
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
|
|
self.register_to_config(requires_safety_checker=requires_safety_checker)
|