1
0
mirror of https://github.com/huggingface/diffusers.git synced 2026-01-29 07:22:12 +03:00

fix auto denoise so all tests pass

This commit is contained in:
yiyixuxu
2025-05-09 08:19:24 +02:00
parent 2b361a2413
commit 2017ae5624
2 changed files with 700 additions and 683 deletions

View File

@@ -2134,268 +2134,6 @@ class StableDiffusionXLPrepareAdditionalConditioningStep(PipelineBlock):
self.add_block_state(state, block_state)
return components, state
from .pipeline_stable_diffusion_xl_modular_denoise_loop import StableDiffusionXLDenoiseStep
# class StableDiffusionXLDenoiseStep(PipelineBlock):
# model_name = "stable-diffusion-xl"
# @property
# def expected_components(self) -> List[ComponentSpec]:
# return [
# ComponentSpec(
# "guider",
# ClassifierFreeGuidance,
# config=FrozenDict({"guidance_scale": 7.5}),
# default_creation_method="from_config"),
# ComponentSpec("scheduler", EulerDiscreteScheduler),
# ComponentSpec("unet", UNet2DConditionModel),
# ]
# @property
# def description(self) -> str:
# return (
# "Step that iteratively denoise the latents for the text-to-image/image-to-image/inpainting generation process"
# )
# @property
# def inputs(self) -> List[Tuple[str, Any]]:
# return [
# InputParam("cross_attention_kwargs"),
# InputParam("generator"),
# InputParam("eta", default=0.0),
# InputParam("num_images_per_prompt", default=1),
# ]
# @property
# def intermediates_inputs(self) -> List[str]:
# return [
# InputParam(
# "latents",
# required=True,
# type_hint=torch.Tensor,
# description="The initial latents to use for the denoising process. Can be generated in prepare_latent step."
# ),
# InputParam(
# "batch_size",
# required=True,
# type_hint=int,
# description="Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can be generated in input step."
# ),
# InputParam(
# "timesteps",
# required=True,
# type_hint=torch.Tensor,
# description="The timesteps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(
# "num_inference_steps",
# required=True,
# type_hint=int,
# description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(
# "pooled_prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The pooled prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_pooled_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative pooled prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step. "
# ),
# InputParam(
# "add_time_ids",
# required=True,
# type_hint=torch.Tensor,
# description="The time ids to use as additional conditioning for the denoising process. Can be generated in prepare_additional_conditioning step."
# ),
# InputParam(
# "negative_add_time_ids",
# type_hint=Optional[torch.Tensor],
# description="The negative time ids to use as additional conditioning for the denoising process. Can be generated in prepare_additional_conditioning step."
# ),
# InputParam(
# "prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step. "
# ),
# InputParam(
# "timestep_cond",
# type_hint=Optional[torch.Tensor],
# description="The guidance scale embedding to use for Latent Consistency Models(LCMs). Can be generated in prepare_additional_conditioning step."
# ),
# InputParam(
# "mask",
# type_hint=Optional[torch.Tensor],
# description="The mask to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "masked_image_latents",
# type_hint=Optional[torch.Tensor],
# description="The masked image latents to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "noise",
# type_hint=Optional[torch.Tensor],
# description="The noise added to the image latents, for inpainting task only. Can be generated in prepare_latent step."
# ),
# InputParam(
# "image_latents",
# type_hint=Optional[torch.Tensor],
# description="The image latents to use for the denoising process, for inpainting/image-to-image task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# InputParam(
# "negative_ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# ]
# @property
# def intermediates_outputs(self) -> List[OutputParam]:
# return [OutputParam("latents", type_hint=torch.Tensor, description="The denoised latents")]
# @staticmethod
# def check_inputs(components, block_state):
# num_channels_unet = components.unet.config.in_channels
# if num_channels_unet == 9:
# # default case for runwayml/stable-diffusion-inpainting
# if block_state.mask is None or block_state.masked_image_latents is None:
# raise ValueError("mask and masked_image_latents must be provided for inpainting-specific Unet")
# num_channels_latents = block_state.latents.shape[1]
# num_channels_mask = block_state.mask.shape[1]
# num_channels_masked_image = block_state.masked_image_latents.shape[1]
# if num_channels_latents + num_channels_mask + num_channels_masked_image != num_channels_unet:
# raise ValueError(
# f"Incorrect configuration settings! The config of `components.unet`: {components.unet.config} expects"
# f" {components.unet.config.in_channels} but received `num_channels_latents`: {num_channels_latents} +"
# f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}"
# f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of"
# " `components.unet` or your `mask_image` or `image` input."
# )
# # Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs with self -> components
# @staticmethod
# def prepare_extra_step_kwargs(components, generator, eta):
# # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# # and should be between [0, 1]
# accepts_eta = "eta" in set(inspect.signature(components.scheduler.step).parameters.keys())
# extra_step_kwargs = {}
# if accepts_eta:
# extra_step_kwargs["eta"] = eta
# # check if the scheduler accepts generator
# accepts_generator = "generator" in set(inspect.signature(components.scheduler.step).parameters.keys())
# if accepts_generator:
# extra_step_kwargs["generator"] = generator
# return extra_step_kwargs
# @torch.no_grad()
# def __call__(self, components: StableDiffusionXLModularLoader, state: PipelineState) -> PipelineState:
# block_state = self.get_block_state(state)
# self.check_inputs(components, block_state)
# block_state.num_channels_unet = components.unet.config.in_channels
# block_state.disable_guidance = True if components.unet.config.time_cond_proj_dim is not None else False
# if block_state.disable_guidance:
# components.guider.disable()
# else:
# components.guider.enable()
# # Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
# block_state.extra_step_kwargs = self.prepare_extra_step_kwargs(components, block_state.generator, block_state.eta)
# block_state.num_warmup_steps = max(len(block_state.timesteps) - block_state.num_inference_steps * components.scheduler.order, 0)
# components.guider.set_input_fields(
# prompt_embeds=("prompt_embeds", "negative_prompt_embeds"),
# add_time_ids=("add_time_ids", "negative_add_time_ids"),
# pooled_prompt_embeds=("pooled_prompt_embeds", "negative_pooled_prompt_embeds"),
# ip_adapter_embeds=("ip_adapter_embeds", "negative_ip_adapter_embeds"),
# )
# with self.progress_bar(total=block_state.num_inference_steps) as progress_bar:
# for i, t in enumerate(block_state.timesteps):
# components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t)
# guider_data = components.guider.prepare_inputs(block_state)
# block_state.scaled_latents = components.scheduler.scale_model_input(block_state.latents, t)
# # Prepare for inpainting
# if block_state.num_channels_unet == 9:
# block_state.scaled_latents = torch.cat([block_state.scaled_latents, block_state.mask, block_state.masked_image_latents], dim=1)
# for batch in guider_data:
# components.guider.prepare_models(components.unet)
# # Prepare additional conditionings
# batch.added_cond_kwargs = {
# "text_embeds": batch.pooled_prompt_embeds,
# "time_ids": batch.add_time_ids,
# }
# if batch.ip_adapter_embeds is not None:
# batch.added_cond_kwargs["image_embeds"] = batch.ip_adapter_embeds
# # Predict the noise residual
# batch.noise_pred = components.unet(
# block_state.scaled_latents,
# t,
# encoder_hidden_states=batch.prompt_embeds,
# timestep_cond=block_state.timestep_cond,
# cross_attention_kwargs=block_state.cross_attention_kwargs,
# added_cond_kwargs=batch.added_cond_kwargs,
# return_dict=False,
# )[0]
# components.guider.cleanup_models(components.unet)
# # Perform guidance
# block_state.noise_pred, scheduler_step_kwargs = components.guider(guider_data)
# # Perform scheduler step using the predicted output
# block_state.latents_dtype = block_state.latents.dtype
# block_state.latents = components.scheduler.step(block_state.noise_pred, t, block_state.latents, **block_state.extra_step_kwargs, **scheduler_step_kwargs, return_dict=False)[0]
# if block_state.latents.dtype != block_state.latents_dtype:
# if torch.backends.mps.is_available():
# # some platforms (eg. apple mps) misbehave due to a pytorch bug: https://github.com/pytorch/pytorch/pull/99272
# block_state.latents = block_state.latents.to(block_state.latents_dtype)
# if block_state.num_channels_unet == 4 and block_state.mask is not None and block_state.image_latents is not None:
# block_state.init_latents_proper = block_state.image_latents
# if i < len(block_state.timesteps) - 1:
# block_state.noise_timestep = block_state.timesteps[i + 1]
# block_state.init_latents_proper = components.scheduler.add_noise(
# block_state.init_latents_proper, block_state.noise, torch.tensor([block_state.noise_timestep])
# )
# block_state.latents = (1 - block_state.mask) * block_state.init_latents_proper + block_state.mask * block_state.latents
# if i == len(block_state.timesteps) - 1 or ((i + 1) > block_state.num_warmup_steps and (i + 1) % components.scheduler.order == 0):
# progress_bar.update()
# self.add_block_state(state, block_state)
# return components, state
class StableDiffusionXLControlNetInputStep(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -2593,355 +2331,6 @@ class StableDiffusionXLControlNetInputStep(PipelineBlock):
return components, state
from diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_modular_denoise_loop import StableDiffusionXLControlNetDenoiseStep
# class StableDiffusionXLControlNetDenoiseStep(PipelineBlock):
# model_name = "stable-diffusion-xl"
# @property
# def expected_components(self) -> List[ComponentSpec]:
# return [
# ComponentSpec(
# "guider",
# ClassifierFreeGuidance,
# config=FrozenDict({"guidance_scale": 7.5}),
# default_creation_method="from_config"),
# ComponentSpec("scheduler", EulerDiscreteScheduler),
# ComponentSpec("unet", UNet2DConditionModel),
# ComponentSpec("controlnet", ControlNetModel),
# ]
# @property
# def description(self) -> str:
# return "step that iteratively denoise the latents for the text-to-image/image-to-image/inpainting generation process. Using ControlNet to condition the denoising process"
# @property
# def inputs(self) -> List[Tuple[str, Any]]:
# return [
# InputParam("num_images_per_prompt", default=1),
# InputParam("cross_attention_kwargs"),
# InputParam("generator"),
# InputParam("eta", default=0.0),
# InputParam("controlnet_conditioning_scale", type_hint=float, default=1.0), # can expect either input or intermediate input, (intermediate input if both are passed)
# ]
# @property
# def intermediates_inputs(self) -> List[str]:
# return [
# InputParam(
# "controlnet_cond",
# required=True,
# type_hint=torch.Tensor,
# description="The control image to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "control_guidance_start",
# required=True,
# type_hint=float,
# description="The control guidance start value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "control_guidance_end",
# required=True,
# type_hint=float,
# description="The control guidance end value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "conditioning_scale",
# type_hint=float,
# description="The controlnet conditioning scale value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "guess_mode",
# required=True,
# type_hint=bool,
# description="The guess mode value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "controlnet_keep",
# required=True,
# type_hint=List[float],
# description="The controlnet keep values to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "latents",
# required=True,
# type_hint=torch.Tensor,
# description="The initial latents to use for the denoising process. Can be generated in prepare_latent step."
# ),
# InputParam(
# "batch_size",
# required=True,
# type_hint=int,
# description="Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can be generated in input step."
# ),
# InputParam(
# "timesteps",
# required=True,
# type_hint=torch.Tensor,
# description="The timesteps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(
# "prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The prompt embeddings used to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative prompt embeddings used to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "add_time_ids",
# required=True,
# type_hint=torch.Tensor,
# description="The time ids used to condition the denoising process. Can be generated in parepare_additional_conditioning step."
# ),
# InputParam(
# "negative_add_time_ids",
# type_hint=Optional[torch.Tensor],
# description="The negative time ids used to condition the denoising process. Can be generated in parepare_additional_conditioning step."
# ),
# InputParam(
# "pooled_prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The pooled prompt embeddings used to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_pooled_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative pooled prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "timestep_cond",
# type_hint=Optional[torch.Tensor],
# description="The guidance scale embedding to use for Latent Consistency Models(LCMs), can be generated by prepare_additional_conditioning step"
# ),
# InputParam(
# "mask",
# type_hint=Optional[torch.Tensor],
# description="The mask to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "masked_image_latents",
# type_hint=Optional[torch.Tensor],
# description="The masked image latents to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "noise",
# type_hint=Optional[torch.Tensor],
# description="The noise added to the image latents, for inpainting task only. Can be generated in prepare_latent step."
# ),
# InputParam(
# "image_latents",
# type_hint=Optional[torch.Tensor],
# description="The image latents to use for the denoising process, for inpainting/image-to-image task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "crops_coords",
# type_hint=Optional[Tuple[int]],
# description="The crop coordinates to use for preprocess/postprocess the image and mask, for inpainting task only. Can be generated in vae_encode step."
# ),
# InputParam(
# "ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# InputParam(
# "negative_ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# InputParam(
# "num_inference_steps",
# required=True,
# type_hint=int,
# description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(kwargs_type="controlnet_kwargs", description="additional kwargs for controlnet")
# ]
# @property
# def intermediates_outputs(self) -> List[OutputParam]:
# return [OutputParam("latents", type_hint=torch.Tensor, description="The denoised latents")]
# @staticmethod
# def check_inputs(components, block_state):
# num_channels_unet = components.unet.config.in_channels
# if num_channels_unet == 9:
# # default case for runwayml/stable-diffusion-inpainting
# if block_state.mask is None or block_state.masked_image_latents is None:
# raise ValueError("mask and masked_image_latents must be provided for inpainting-specific Unet")
# num_channels_latents = block_state.latents.shape[1]
# num_channels_mask = block_state.mask.shape[1]
# num_channels_masked_image = block_state.masked_image_latents.shape[1]
# if num_channels_latents + num_channels_mask + num_channels_masked_image != num_channels_unet:
# raise ValueError(
# f"Incorrect configuration settings! The config of `components.unet`: {components.unet.config} expects"
# f" {components.unet.config.in_channels} but received `num_channels_latents`: {num_channels_latents} +"
# f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}"
# f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of"
# " `components.unet` or your `mask_image` or `image` input."
# )
# @staticmethod
# def prepare_extra_kwargs(func, exclude_kwargs=[], **kwargs):
# accepted_kwargs = set(inspect.signature(func).parameters.keys())
# extra_kwargs = {}
# for key, value in kwargs.items():
# if key in accepted_kwargs and key not in exclude_kwargs:
# extra_kwargs[key] = value
# return extra_kwargs
# @torch.no_grad()
# def __call__(self, components: StableDiffusionXLModularLoader, state: PipelineState) -> PipelineState:
# block_state = self.get_block_state(state)
# self.check_inputs(components, block_state)
# block_state.device = components._execution_device
# print(f" block_state: {block_state}")
# controlnet = unwrap_module(components.controlnet)
# # Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
# block_state.extra_step_kwargs = self.prepare_extra_kwargs(components.scheduler.step, generator=block_state.generator, eta=block_state.eta)
# block_state.extra_controlnet_kwargs = self.prepare_extra_kwargs(controlnet.forward, exclude_kwargs=["controlnet_cond", "conditioning_scale", "guess_mode"], **block_state.controlnet_kwargs)
# block_state.num_warmup_steps = max(len(block_state.timesteps) - block_state.num_inference_steps * components.scheduler.order, 0)
# # (1) setup guider
# # disable for LCMs
# block_state.disable_guidance = True if components.unet.config.time_cond_proj_dim is not None else False
# if block_state.disable_guidance:
# components.guider.disable()
# else:
# components.guider.enable()
# components.guider.set_input_fields(
# prompt_embeds=("prompt_embeds", "negative_prompt_embeds"),
# add_time_ids=("add_time_ids", "negative_add_time_ids"),
# pooled_prompt_embeds=("pooled_prompt_embeds", "negative_pooled_prompt_embeds"),
# ip_adapter_embeds=("ip_adapter_embeds", "negative_ip_adapter_embeds"),
# )
# # (5) Denoise loop
# with self.progress_bar(total=block_state.num_inference_steps) as progress_bar:
# for i, t in enumerate(block_state.timesteps):
# # prepare latent input for unet
# block_state.scaled_latents = components.scheduler.scale_model_input(block_state.latents, t)
# # adjust latent input for inpainting
# block_state.num_channels_unet = components.unet.config.in_channels
# if block_state.num_channels_unet == 9:
# block_state.scaled_latents = torch.cat([block_state.scaled_latents, block_state.mask, block_state.masked_image_latents], dim=1)
# # cond_scale (controlnet input)
# if isinstance(block_state.controlnet_keep[i], list):
# block_state.cond_scale = [c * s for c, s in zip(block_state.conditioning_scale, block_state.controlnet_keep[i])]
# else:
# block_state.controlnet_cond_scale = block_state.conditioning_scale
# if isinstance(block_state.controlnet_cond_scale, list):
# block_state.controlnet_cond_scale = block_state.controlnet_cond_scale[0]
# block_state.cond_scale = block_state.controlnet_cond_scale * block_state.controlnet_keep[i]
# # default controlnet output/unet input for guess mode + conditional path
# block_state.down_block_res_samples_zeros = None
# block_state.mid_block_res_sample_zeros = None
# # guided denoiser step
# components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t)
# guider_state = components.guider.prepare_inputs(block_state)
# for guider_state_batch in guider_state:
# components.guider.prepare_models(components.unet)
# # Prepare additional conditionings
# guider_state_batch.added_cond_kwargs = {
# "text_embeds": guider_state_batch.pooled_prompt_embeds,
# "time_ids": guider_state_batch.add_time_ids,
# }
# if guider_state_batch.ip_adapter_embeds is not None:
# guider_state_batch.added_cond_kwargs["image_embeds"] = guider_state_batch.ip_adapter_embeds
# # Prepare controlnet additional conditionings
# guider_state_batch.controlnet_added_cond_kwargs = {
# "text_embeds": guider_state_batch.pooled_prompt_embeds,
# "time_ids": guider_state_batch.add_time_ids,
# }
# if block_state.guess_mode and not components.guider.is_conditional:
# # guider always run uncond batch first, so these tensors should be set already
# guider_state_batch.down_block_res_samples = block_state.down_block_res_samples_zeros
# guider_state_batch.mid_block_res_sample = block_state.mid_block_res_sample_zeros
# else:
# guider_state_batch.down_block_res_samples, guider_state_batch.mid_block_res_sample = components.controlnet(
# block_state.scaled_latents,
# t,
# encoder_hidden_states=guider_state_batch.prompt_embeds,
# controlnet_cond=block_state.controlnet_cond,
# conditioning_scale=block_state.conditioning_scale,
# guess_mode=block_state.guess_mode,
# added_cond_kwargs=guider_state_batch.controlnet_added_cond_kwargs,
# return_dict=False,
# **block_state.extra_controlnet_kwargs,
# )
# if block_state.down_block_res_samples_zeros is None:
# block_state.down_block_res_samples_zeros = [torch.zeros_like(d) for d in guider_state_batch.down_block_res_samples]
# if block_state.mid_block_res_sample_zeros is None:
# block_state.mid_block_res_sample_zeros = torch.zeros_like(guider_state_batch.mid_block_res_sample)
# guider_state_batch.noise_pred = components.unet(
# block_state.scaled_latents,
# t,
# encoder_hidden_states=guider_state_batch.prompt_embeds,
# timestep_cond=block_state.timestep_cond,
# cross_attention_kwargs=block_state.cross_attention_kwargs,
# added_cond_kwargs=guider_state_batch.added_cond_kwargs,
# down_block_additional_residuals=guider_state_batch.down_block_res_samples,
# mid_block_additional_residual=guider_state_batch.mid_block_res_sample,
# return_dict=False,
# )[0]
# components.guider.cleanup_models(components.unet)
# # Perform guidance
# block_state.noise_pred, scheduler_step_kwargs = components.guider(guider_state)
# # Perform scheduler step using the predicted output
# block_state.latents_dtype = block_state.latents.dtype
# block_state.latents = components.scheduler.step(block_state.noise_pred, t, block_state.latents, **block_state.extra_step_kwargs, **scheduler_step_kwargs, return_dict=False)[0]
# if block_state.latents.dtype != block_state.latents_dtype:
# if torch.backends.mps.is_available():
# # some platforms (eg. apple mps) misbehave due to a pytorch bug: https://github.com/pytorch/pytorch/pull/99272
# block_state.latents = block_state.latents.to(block_state.latents_dtype)
# # adjust latent for inpainting
# if block_state.num_channels_unet == 4 and block_state.mask is not None and block_state.image_latents is not None:
# block_state.init_latents_proper = block_state.image_latents
# if i < len(block_state.timesteps) - 1:
# block_state.noise_timestep = block_state.timesteps[i + 1]
# block_state.init_latents_proper = components.scheduler.add_noise(
# block_state.init_latents_proper, block_state.noise, torch.tensor([block_state.noise_timestep])
# )
# block_state.latents = (1 - block_state.mask) * block_state.init_latents_proper + block_state.mask * block_state.latents
# if i == len(block_state.timesteps) - 1 or ((i + 1) > block_state.num_warmup_steps and (i + 1) % components.scheduler.order == 0):
# progress_bar.update()
# self.add_block_state(state, block_state)
# return components, state
class StableDiffusionXLControlNetUnionInputStep(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -3123,6 +2512,13 @@ class StableDiffusionXLControlNetUnionInputStep(PipelineBlock):
return components, state
class StableDiffusionXLControlNetAutoInput(AutoPipelineBlocks):
block_classes = [StableDiffusionXLControlNetUnionInputStep, StableDiffusionXLControlNetInputStep]
block_names = ["controlnet_union", "controlnet"]
block_trigger_inputs = ["control_mode", "control_image"]
class StableDiffusionXLDecodeLatentsStep(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -3316,8 +2712,8 @@ class StableDiffusionXLAutoVaeEncoderStep(AutoPipelineBlocks):
# Before denoise
class StableDiffusionXLBeforeDenoiseStep(SequentialPipelineBlocks):
block_classes = [StableDiffusionXLInputStep, StableDiffusionXLSetTimestepsStep, StableDiffusionXLPrepareLatentsStep, StableDiffusionXLPrepareAdditionalConditioningStep]
block_names = ["input", "set_timesteps", "prepare_latents", "prepare_add_cond"]
block_classes = [StableDiffusionXLInputStep, StableDiffusionXLSetTimestepsStep, StableDiffusionXLPrepareLatentsStep, StableDiffusionXLPrepareAdditionalConditioningStep, StableDiffusionXLControlNetAutoInput]
block_names = ["input", "set_timesteps", "prepare_latents", "prepare_add_cond", "controlnet_input"]
@property
def description(self):
@@ -3326,12 +2722,13 @@ class StableDiffusionXLBeforeDenoiseStep(SequentialPipelineBlocks):
" - `StableDiffusionXLInputStep` is used to adjust the batch size of the model inputs\n" + \
" - `StableDiffusionXLSetTimestepsStep` is used to set the timesteps\n" + \
" - `StableDiffusionXLPrepareLatentsStep` is used to prepare the latents\n" + \
" - `StableDiffusionXLPrepareAdditionalConditioningStep` is used to prepare the additional conditioning"
" - `StableDiffusionXLPrepareAdditionalConditioningStep` is used to prepare the additional conditioning\n" + \
" - `StableDiffusionXLControlNetAutoInput` is used to prepare the controlnet input"
class StableDiffusionXLImg2ImgBeforeDenoiseStep(SequentialPipelineBlocks):
block_classes = [StableDiffusionXLInputStep, StableDiffusionXLImg2ImgSetTimestepsStep, StableDiffusionXLImg2ImgPrepareLatentsStep, StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep]
block_names = ["input", "set_timesteps", "prepare_latents", "prepare_add_cond"]
block_classes = [StableDiffusionXLInputStep, StableDiffusionXLImg2ImgSetTimestepsStep, StableDiffusionXLImg2ImgPrepareLatentsStep, StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep, StableDiffusionXLControlNetAutoInput]
block_names = ["input", "set_timesteps", "prepare_latents", "prepare_add_cond", "controlnet_input"]
@property
def description(self):
@@ -3340,12 +2737,13 @@ class StableDiffusionXLImg2ImgBeforeDenoiseStep(SequentialPipelineBlocks):
" - `StableDiffusionXLInputStep` is used to adjust the batch size of the model inputs\n" + \
" - `StableDiffusionXLImg2ImgSetTimestepsStep` is used to set the timesteps\n" + \
" - `StableDiffusionXLImg2ImgPrepareLatentsStep` is used to prepare the latents\n" + \
" - `StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep` is used to prepare the additional conditioning"
" - `StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep` is used to prepare the additional conditioning\n" + \
" - `StableDiffusionXLControlNetAutoInput` is used to prepare the controlnet input"
class StableDiffusionXLInpaintBeforeDenoiseStep(SequentialPipelineBlocks):
block_classes = [StableDiffusionXLInputStep, StableDiffusionXLImg2ImgSetTimestepsStep, StableDiffusionXLInpaintPrepareLatentsStep, StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep]
block_names = ["input", "set_timesteps", "prepare_latents", "prepare_add_cond"]
block_classes = [StableDiffusionXLInputStep, StableDiffusionXLImg2ImgSetTimestepsStep, StableDiffusionXLInpaintPrepareLatentsStep, StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep, StableDiffusionXLControlNetAutoInput]
block_names = ["input", "set_timesteps", "prepare_latents", "prepare_add_cond", "controlnet_input"]
@property
def description(self):
@@ -3354,29 +2752,8 @@ class StableDiffusionXLInpaintBeforeDenoiseStep(SequentialPipelineBlocks):
" - `StableDiffusionXLInputStep` is used to adjust the batch size of the model inputs\n" + \
" - `StableDiffusionXLImg2ImgSetTimestepsStep` is used to set the timesteps\n" + \
" - `StableDiffusionXLInpaintPrepareLatentsStep` is used to prepare the latents\n" + \
" - `StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep` is used to prepare the additional conditioning"
class StableDiffusionXLControlNetStep(SequentialPipelineBlocks):
block_classes = [StableDiffusionXLControlNetInputStep, StableDiffusionXLControlNetDenoiseStep]
block_names = ["prepare_input", "denoise"]
@property
def description(self):
return "Controlnet step that denoise the latents.\n" + \
"This is a sequential pipeline blocks:\n" + \
" - `StableDiffusionXLControlNetInputStep` is used to prepare the inputs for the denoise step.\n" + \
" - `StableDiffusionXLControlNetDenoiseStep` is used to denoise the latents."
class StableDiffusionXLControlNetUnionStep(SequentialPipelineBlocks):
block_classes = [StableDiffusionXLControlNetUnionInputStep, StableDiffusionXLControlNetDenoiseStep]
block_names = ["prepare_input", "denoise"]
@property
def description(self):
return "ControlNetUnion step that denoises the latents.\n" + \
"This is a sequential pipeline blocks:\n" + \
" - `StableDiffusionXLControlNetUnionInputStep` is used to prepare the inputs for the denoise step.\n" + \
" - `StableDiffusionXLControlNetDenoiseStep` is used to denoise the latents using the ControlNetUnion model."
" - `StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep` is used to prepare the additional conditioning\n" + \
" - `StableDiffusionXLControlNetAutoInput` is used to prepare the controlnet input"
class StableDiffusionXLAutoBeforeDenoiseStep(AutoPipelineBlocks):
@@ -3387,24 +2764,27 @@ class StableDiffusionXLAutoBeforeDenoiseStep(AutoPipelineBlocks):
@property
def description(self):
return "Before denoise step that prepare the inputs for the denoise step.\n" + \
"This is an auto pipeline block that works for text2img, img2img and inpainting tasks.\n" + \
"This is an auto pipeline block that works for text2img, img2img and inpainting tasks as well as controlnet, controlnet_union.\n" + \
" - `StableDiffusionXLInpaintBeforeDenoiseStep` (inpaint) is used when both `mask` and `image_latents` are provided.\n" + \
" - `StableDiffusionXLImg2ImgBeforeDenoiseStep` (img2img) is used when only `image_latents` is provided.\n" + \
" - `StableDiffusionXLBeforeDenoiseStep` (text2img) is used when both `image_latents` and `mask` are not provided."
" - `StableDiffusionXLBeforeDenoiseStep` (text2img) is used when both `image_latents` and `mask` are not provided.\n" + \
" - `StableDiffusionXLControlNetUnionInputStep` is called to prepare the controlnet input when `control_mode` and `control_image` are provided.\n" + \
" - `StableDiffusionXLControlNetInputStep` is called to prepare the controlnet input when `control_image` is provided."
# Denoise
class StableDiffusionXLAutoDenoiseStep(AutoPipelineBlocks):
block_classes = [StableDiffusionXLControlNetUnionStep, StableDiffusionXLControlNetStep, StableDiffusionXLDenoiseStep]
block_names = ["controlnet_union", "controlnet", "unet"]
block_trigger_inputs = ["control_mode", "control_image", None]
# # Denoise
from .pipeline_stable_diffusion_xl_modular_denoise_loop import StableDiffusionXLDenoiseStep, StableDiffusionXLControlNetDenoiseStep, StableDiffusionXLAutoDenoiseStep
# class StableDiffusionXLAutoDenoiseStep(AutoPipelineBlocks):
# block_classes = [StableDiffusionXLControlNetUnionStep, StableDiffusionXLControlNetStep, StableDiffusionXLDenoiseStep]
# block_names = ["controlnet_union", "controlnet", "unet"]
# block_trigger_inputs = ["control_mode", "control_image", None]
@property
def description(self):
return "Denoise step that denoise the latents.\n" + \
"This is an auto pipeline block that works for controlnet, controlnet_union and no controlnet.\n" + \
" - `StableDiffusionXLControlNetUnionStep` (controlnet_union) is used when both `control_mode` and `control_image` are provided.\n" + \
" - `StableDiffusionXLControlNetStep` (controlnet) is used when `control_image` is provided.\n" + \
" - `StableDiffusionXLDenoiseStep` (unet only) is used when both `control_mode` and `control_image` are not provided."
# @property
# def description(self):
# return "Denoise step that denoise the latents.\n" + \
# "This is an auto pipeline block that works for controlnet, controlnet_union and no controlnet.\n" + \
# " - `StableDiffusionXLControlNetUnionStep` (controlnet_union) is used when both `control_mode` and `control_image` are provided.\n" + \
# " - `StableDiffusionXLControlNetStep` (controlnet) is used when `control_image` is provided.\n" + \
# " - `StableDiffusionXLDenoiseStep` (unet only) is used when both `control_mode` and `control_image` are not provided."
# After denoise
class StableDiffusionXLDecodeStep(SequentialPipelineBlocks):
@@ -3474,6 +2854,7 @@ class StableDiffusionXLAutoPipeline(SequentialPipelineBlocks):
# always assuming you want to do guidance in the Guiders. So, negative embeddings are prepared regardless of what the
# configuration of guider is.
# block mapping
TEXT2IMAGE_BLOCKS = OrderedDict([
("text_encoder", StableDiffusionXLTextEncoderStep),
@@ -3511,11 +2892,13 @@ INPAINT_BLOCKS = OrderedDict([
])
CONTROLNET_BLOCKS = OrderedDict([
("denoise", StableDiffusionXLControlNetStep),
("controlnet_input", StableDiffusionXLControlNetInputStep),
("denoise", StableDiffusionXLControlNetDenoiseStep),
])
CONTROLNET_UNION_BLOCKS = OrderedDict([
("denoise", StableDiffusionXLControlNetUnionStep),
("controlnet_input", StableDiffusionXLControlNetUnionInputStep),
("denoise", StableDiffusionXLControlNetDenoiseStep),
])
IP_ADAPTER_BLOCKS = OrderedDict([

View File

@@ -22,10 +22,11 @@ from ...configuration_utils import FrozenDict
from ...models import ControlNetModel, UNet2DConditionModel
from ...schedulers import EulerDiscreteScheduler
from ...utils import logging
from ...utils.torch_utils import unwrap_module
from ...utils.torch_utils import unwrap_module
from ..modular_pipeline import (
PipelineBlock,
PipelineState,
AutoPipelineBlocks,
LoopSequentialPipelineBlocks,
InputParam,
OutputParam,
@@ -42,7 +43,7 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# YiYi experimenting composible denoise loop
# loop step (1): prepare latent input for denoiser
class StableDiffusionXLDenoiseLoopLatentsStep(PipelineBlock):
class StableDiffusionXLDenoiseLoopBeforeDenoiser(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -83,7 +84,7 @@ class StableDiffusionXLDenoiseLoopLatentsStep(PipelineBlock):
return components, block_state
# loop step (1): prepare latent input for denoiser (with inpainting)
class StableDiffusionXLDenoiseLoopInpaintLatentsStep(PipelineBlock):
class StableDiffusionXLInpaintDenoiseLoopBeforeDenoiser(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -145,7 +146,7 @@ class StableDiffusionXLDenoiseLoopInpaintLatentsStep(PipelineBlock):
)
@torch.no_grad()
def __call__(self, components: StableDiffusionXLModularLoader, block_state: BlockState, loop_idx: int, t: int):
def __call__(self, components: StableDiffusionXLModularLoader, block_state: BlockState, i: int, t: int):
self.check_inputs(components, block_state)
@@ -157,7 +158,7 @@ class StableDiffusionXLDenoiseLoopInpaintLatentsStep(PipelineBlock):
return components, block_state
# loop step (2): denoise the latents with guidance
class StableDiffusionXLDenoiseLoopDenoiserStep(PipelineBlock):
class StableDiffusionXLDenoiseLoopDenoiser(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -267,7 +268,7 @@ class StableDiffusionXLDenoiseLoopDenoiserStep(PipelineBlock):
return components, block_state
# loop step (2): denoise the latents with guidance (with controlnet)
class StableDiffusionXLDenoiseLoopControlNetDenoiserStep(PipelineBlock):
class StableDiffusionXLControlNetDenoiseLoopDenoiser(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -468,7 +469,7 @@ class StableDiffusionXLDenoiseLoopControlNetDenoiserStep(PipelineBlock):
return components, block_state
# loop step (3): scheduler step to update latents
class StableDiffusionXLDenoiseLoopUpdateLatentsStep(PipelineBlock):
class StableDiffusionXLDenoiseLoopAfterDenoiser(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -535,8 +536,8 @@ class StableDiffusionXLDenoiseLoopUpdateLatentsStep(PipelineBlock):
return components, block_state
class StableDiffusionXLDenoiseLoopInpaintUpdateLatentsStep(PipelineBlock):
# loop step (3): scheduler step to update latents (with inpainting)
class StableDiffusionXLInpaintDenoiseLoopAfterDenoiser(PipelineBlock):
model_name = "stable-diffusion-xl"
@@ -643,7 +644,7 @@ class StableDiffusionXLDenoiseLoopInpaintUpdateLatentsStep(PipelineBlock):
# the loop wrapper that iterates over the timesteps
class StableDiffusionXLDenoiseLoop(LoopSequentialPipelineBlocks):
class StableDiffusionXLDenoiseLoopWrapper(LoopSequentialPipelineBlocks):
model_name = "stable-diffusion-xl"
@@ -706,24 +707,657 @@ class StableDiffusionXLDenoiseLoop(LoopSequentialPipelineBlocks):
return components, state
# composing the denoising loops
class StableDiffusionXLDenoiseLoop(StableDiffusionXLDenoiseLoopWrapper):
block_classes = [StableDiffusionXLDenoiseLoopBeforeDenoiser, StableDiffusionXLDenoiseLoopDenoiser, StableDiffusionXLDenoiseLoopAfterDenoiser]
block_names = ["before_denoiser", "denoiser", "after_denoiser"]
# StableDiffusionXLControlNetDenoiseStep
# control_cond
class StableDiffusionXLControlNetDenoiseLoop(StableDiffusionXLDenoiseLoopWrapper):
block_classes = [StableDiffusionXLDenoiseLoopBeforeDenoiser, StableDiffusionXLControlNetDenoiseLoopDenoiser, StableDiffusionXLDenoiseLoopAfterDenoiser]
block_names = ["before_denoiser", "denoiser", "after_denoiser"]
class StableDiffusionXLDenoiseStep(StableDiffusionXLDenoiseLoop):
block_classes = [StableDiffusionXLDenoiseLoopLatentsStep, StableDiffusionXLDenoiseLoopDenoiserStep, StableDiffusionXLDenoiseLoopUpdateLatentsStep]
block_names = ["prepare_latents", "denoiser", "update_latents"]
# mask
class StableDiffusionXLInpaintDenoiseLoop(StableDiffusionXLDenoiseLoopWrapper):
block_classes = [StableDiffusionXLInpaintDenoiseLoopBeforeDenoiser, StableDiffusionXLDenoiseLoopDenoiser, StableDiffusionXLInpaintDenoiseLoopAfterDenoiser]
block_names = ["before_denoiser", "denoiser", "after_denoiser"]
class StableDiffusionXLControlNetDenoiseStep(StableDiffusionXLDenoiseLoop):
block_classes = [StableDiffusionXLDenoiseLoopLatentsStep, StableDiffusionXLDenoiseLoopControlNetDenoiserStep, StableDiffusionXLDenoiseLoopUpdateLatentsStep]
block_names = ["prepare_latents", "denoiser", "update_latents"]
class StableDiffusionXLInpaintDenoiseStep(StableDiffusionXLDenoiseLoop):
block_classes = [StableDiffusionXLDenoiseLoopInpaintLatentsStep, StableDiffusionXLDenoiseLoopDenoiserStep, StableDiffusionXLDenoiseLoopInpaintUpdateLatentsStep]
block_names = ["prepare_latents", "denoiser", "update_latents"]
class StableDiffusionXLInpaintControlNetDenoiseStep(StableDiffusionXLDenoiseLoop):
block_classes = [StableDiffusionXLDenoiseLoopInpaintLatentsStep, StableDiffusionXLDenoiseLoopControlNetDenoiserStep, StableDiffusionXLDenoiseLoopInpaintUpdateLatentsStep]
block_names = ["prepare_latents", "denoiser", "update_latents"]
# control_cond + mask
class StableDiffusionXLInpaintControlNetDenoiseLoop(StableDiffusionXLDenoiseLoopWrapper):
block_classes = [StableDiffusionXLInpaintDenoiseLoopBeforeDenoiser, StableDiffusionXLControlNetDenoiseLoopDenoiser, StableDiffusionXLInpaintDenoiseLoopAfterDenoiser]
block_names = ["before_denoiser", "denoiser", "after_denoiser"]
# all task without controlnet
class StableDiffusionXLDenoiseStep(AutoPipelineBlocks):
block_classes = [StableDiffusionXLInpaintDenoiseLoop, StableDiffusionXLDenoiseLoop]
block_names = ["inpaint_denoise", "denoise"]
block_trigger_inputs = ["mask", None]
# all task with controlnet
class StableDiffusionXLControlNetDenoiseStep(AutoPipelineBlocks):
block_classes = [StableDiffusionXLInpaintControlNetDenoiseLoop, StableDiffusionXLControlNetDenoiseLoop]
block_names = ["inpaint_controlnet_denoise", "controlnet_denoise"]
block_trigger_inputs = ["mask", None]
# all task with or without controlnet
class StableDiffusionXLAutoDenoiseStep(AutoPipelineBlocks):
block_classes = [StableDiffusionXLControlNetDenoiseStep, StableDiffusionXLDenoiseStep]
block_names = ["controlnet_denoise", "denoise"]
block_trigger_inputs = ["controlnet_cond", None]
# YiYi Notes: alternatively, this is you can just write the denoise loop using a pipeline block, easier but not composible
# class StableDiffusionXLDenoiseStep(PipelineBlock):
# model_name = "stable-diffusion-xl"
# @property
# def expected_components(self) -> List[ComponentSpec]:
# return [
# ComponentSpec(
# "guider",
# ClassifierFreeGuidance,
# config=FrozenDict({"guidance_scale": 7.5}),
# default_creation_method="from_config"),
# ComponentSpec("scheduler", EulerDiscreteScheduler),
# ComponentSpec("unet", UNet2DConditionModel),
# ]
# @property
# def description(self) -> str:
# return (
# "Step that iteratively denoise the latents for the text-to-image/image-to-image/inpainting generation process"
# )
# @property
# def inputs(self) -> List[Tuple[str, Any]]:
# return [
# InputParam("cross_attention_kwargs"),
# InputParam("generator"),
# InputParam("eta", default=0.0),
# InputParam("num_images_per_prompt", default=1),
# ]
# @property
# def intermediates_inputs(self) -> List[str]:
# return [
# InputParam(
# "latents",
# required=True,
# type_hint=torch.Tensor,
# description="The initial latents to use for the denoising process. Can be generated in prepare_latent step."
# ),
# InputParam(
# "batch_size",
# required=True,
# type_hint=int,
# description="Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can be generated in input step."
# ),
# InputParam(
# "timesteps",
# required=True,
# type_hint=torch.Tensor,
# description="The timesteps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(
# "num_inference_steps",
# required=True,
# type_hint=int,
# description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(
# "pooled_prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The pooled prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_pooled_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative pooled prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step. "
# ),
# InputParam(
# "add_time_ids",
# required=True,
# type_hint=torch.Tensor,
# description="The time ids to use as additional conditioning for the denoising process. Can be generated in prepare_additional_conditioning step."
# ),
# InputParam(
# "negative_add_time_ids",
# type_hint=Optional[torch.Tensor],
# description="The negative time ids to use as additional conditioning for the denoising process. Can be generated in prepare_additional_conditioning step."
# ),
# InputParam(
# "prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step. "
# ),
# InputParam(
# "timestep_cond",
# type_hint=Optional[torch.Tensor],
# description="The guidance scale embedding to use for Latent Consistency Models(LCMs). Can be generated in prepare_additional_conditioning step."
# ),
# InputParam(
# "mask",
# type_hint=Optional[torch.Tensor],
# description="The mask to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "masked_image_latents",
# type_hint=Optional[torch.Tensor],
# description="The masked image latents to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "noise",
# type_hint=Optional[torch.Tensor],
# description="The noise added to the image latents, for inpainting task only. Can be generated in prepare_latent step."
# ),
# InputParam(
# "image_latents",
# type_hint=Optional[torch.Tensor],
# description="The image latents to use for the denoising process, for inpainting/image-to-image task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# InputParam(
# "negative_ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# ]
# @property
# def intermediates_outputs(self) -> List[OutputParam]:
# return [OutputParam("latents", type_hint=torch.Tensor, description="The denoised latents")]
# @staticmethod
# def check_inputs(components, block_state):
# num_channels_unet = components.unet.config.in_channels
# if num_channels_unet == 9:
# # default case for runwayml/stable-diffusion-inpainting
# if block_state.mask is None or block_state.masked_image_latents is None:
# raise ValueError("mask and masked_image_latents must be provided for inpainting-specific Unet")
# num_channels_latents = block_state.latents.shape[1]
# num_channels_mask = block_state.mask.shape[1]
# num_channels_masked_image = block_state.masked_image_latents.shape[1]
# if num_channels_latents + num_channels_mask + num_channels_masked_image != num_channels_unet:
# raise ValueError(
# f"Incorrect configuration settings! The config of `components.unet`: {components.unet.config} expects"
# f" {components.unet.config.in_channels} but received `num_channels_latents`: {num_channels_latents} +"
# f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}"
# f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of"
# " `components.unet` or your `mask_image` or `image` input."
# )
# # Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs with self -> components
# @staticmethod
# def prepare_extra_step_kwargs(components, generator, eta):
# # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# # and should be between [0, 1]
# accepts_eta = "eta" in set(inspect.signature(components.scheduler.step).parameters.keys())
# extra_step_kwargs = {}
# if accepts_eta:
# extra_step_kwargs["eta"] = eta
# # check if the scheduler accepts generator
# accepts_generator = "generator" in set(inspect.signature(components.scheduler.step).parameters.keys())
# if accepts_generator:
# extra_step_kwargs["generator"] = generator
# return extra_step_kwargs
# @torch.no_grad()
# def __call__(self, components: StableDiffusionXLModularLoader, state: PipelineState) -> PipelineState:
# block_state = self.get_block_state(state)
# self.check_inputs(components, block_state)
# block_state.num_channels_unet = components.unet.config.in_channels
# block_state.disable_guidance = True if components.unet.config.time_cond_proj_dim is not None else False
# if block_state.disable_guidance:
# components.guider.disable()
# else:
# components.guider.enable()
# # Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
# block_state.extra_step_kwargs = self.prepare_extra_step_kwargs(components, block_state.generator, block_state.eta)
# block_state.num_warmup_steps = max(len(block_state.timesteps) - block_state.num_inference_steps * components.scheduler.order, 0)
# components.guider.set_input_fields(
# prompt_embeds=("prompt_embeds", "negative_prompt_embeds"),
# add_time_ids=("add_time_ids", "negative_add_time_ids"),
# pooled_prompt_embeds=("pooled_prompt_embeds", "negative_pooled_prompt_embeds"),
# ip_adapter_embeds=("ip_adapter_embeds", "negative_ip_adapter_embeds"),
# )
# with self.progress_bar(total=block_state.num_inference_steps) as progress_bar:
# for i, t in enumerate(block_state.timesteps):
# components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t)
# guider_data = components.guider.prepare_inputs(block_state)
# block_state.scaled_latents = components.scheduler.scale_model_input(block_state.latents, t)
# # Prepare for inpainting
# if block_state.num_channels_unet == 9:
# block_state.scaled_latents = torch.cat([block_state.scaled_latents, block_state.mask, block_state.masked_image_latents], dim=1)
# for batch in guider_data:
# components.guider.prepare_models(components.unet)
# # Prepare additional conditionings
# batch.added_cond_kwargs = {
# "text_embeds": batch.pooled_prompt_embeds,
# "time_ids": batch.add_time_ids,
# }
# if batch.ip_adapter_embeds is not None:
# batch.added_cond_kwargs["image_embeds"] = batch.ip_adapter_embeds
# # Predict the noise residual
# batch.noise_pred = components.unet(
# block_state.scaled_latents,
# t,
# encoder_hidden_states=batch.prompt_embeds,
# timestep_cond=block_state.timestep_cond,
# cross_attention_kwargs=block_state.cross_attention_kwargs,
# added_cond_kwargs=batch.added_cond_kwargs,
# return_dict=False,
# )[0]
# components.guider.cleanup_models(components.unet)
# # Perform guidance
# block_state.noise_pred, scheduler_step_kwargs = components.guider(guider_data)
# # Perform scheduler step using the predicted output
# block_state.latents_dtype = block_state.latents.dtype
# block_state.latents = components.scheduler.step(block_state.noise_pred, t, block_state.latents, **block_state.extra_step_kwargs, **scheduler_step_kwargs, return_dict=False)[0]
# if block_state.latents.dtype != block_state.latents_dtype:
# if torch.backends.mps.is_available():
# # some platforms (eg. apple mps) misbehave due to a pytorch bug: https://github.com/pytorch/pytorch/pull/99272
# block_state.latents = block_state.latents.to(block_state.latents_dtype)
# if block_state.num_channels_unet == 4 and block_state.mask is not None and block_state.image_latents is not None:
# block_state.init_latents_proper = block_state.image_latents
# if i < len(block_state.timesteps) - 1:
# block_state.noise_timestep = block_state.timesteps[i + 1]
# block_state.init_latents_proper = components.scheduler.add_noise(
# block_state.init_latents_proper, block_state.noise, torch.tensor([block_state.noise_timestep])
# )
# block_state.latents = (1 - block_state.mask) * block_state.init_latents_proper + block_state.mask * block_state.latents
# if i == len(block_state.timesteps) - 1 or ((i + 1) > block_state.num_warmup_steps and (i + 1) % components.scheduler.order == 0):
# progress_bar.update()
# self.add_block_state(state, block_state)
# return components, state
# class StableDiffusionXLControlNetDenoiseStep(PipelineBlock):
# model_name = "stable-diffusion-xl"
# @property
# def expected_components(self) -> List[ComponentSpec]:
# return [
# ComponentSpec(
# "guider",
# ClassifierFreeGuidance,
# config=FrozenDict({"guidance_scale": 7.5}),
# default_creation_method="from_config"),
# ComponentSpec("scheduler", EulerDiscreteScheduler),
# ComponentSpec("unet", UNet2DConditionModel),
# ComponentSpec("controlnet", ControlNetModel),
# ]
# @property
# def description(self) -> str:
# return "step that iteratively denoise the latents for the text-to-image/image-to-image/inpainting generation process. Using ControlNet to condition the denoising process"
# @property
# def inputs(self) -> List[Tuple[str, Any]]:
# return [
# InputParam("num_images_per_prompt", default=1),
# InputParam("cross_attention_kwargs"),
# InputParam("generator"),
# InputParam("eta", default=0.0),
# InputParam("controlnet_conditioning_scale", type_hint=float, default=1.0), # can expect either input or intermediate input, (intermediate input if both are passed)
# ]
# @property
# def intermediates_inputs(self) -> List[str]:
# return [
# InputParam(
# "controlnet_cond",
# required=True,
# type_hint=torch.Tensor,
# description="The control image to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "control_guidance_start",
# required=True,
# type_hint=float,
# description="The control guidance start value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "control_guidance_end",
# required=True,
# type_hint=float,
# description="The control guidance end value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "conditioning_scale",
# type_hint=float,
# description="The controlnet conditioning scale value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "guess_mode",
# required=True,
# type_hint=bool,
# description="The guess mode value to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "controlnet_keep",
# required=True,
# type_hint=List[float],
# description="The controlnet keep values to use for the denoising process. Can be generated in prepare_controlnet_inputs step."
# ),
# InputParam(
# "latents",
# required=True,
# type_hint=torch.Tensor,
# description="The initial latents to use for the denoising process. Can be generated in prepare_latent step."
# ),
# InputParam(
# "batch_size",
# required=True,
# type_hint=int,
# description="Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can be generated in input step."
# ),
# InputParam(
# "timesteps",
# required=True,
# type_hint=torch.Tensor,
# description="The timesteps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(
# "prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The prompt embeddings used to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative prompt embeddings used to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "add_time_ids",
# required=True,
# type_hint=torch.Tensor,
# description="The time ids used to condition the denoising process. Can be generated in parepare_additional_conditioning step."
# ),
# InputParam(
# "negative_add_time_ids",
# type_hint=Optional[torch.Tensor],
# description="The negative time ids used to condition the denoising process. Can be generated in parepare_additional_conditioning step."
# ),
# InputParam(
# "pooled_prompt_embeds",
# required=True,
# type_hint=torch.Tensor,
# description="The pooled prompt embeddings used to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "negative_pooled_prompt_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative pooled prompt embeddings to use to condition the denoising process. Can be generated in text_encoder step."
# ),
# InputParam(
# "timestep_cond",
# type_hint=Optional[torch.Tensor],
# description="The guidance scale embedding to use for Latent Consistency Models(LCMs), can be generated by prepare_additional_conditioning step"
# ),
# InputParam(
# "mask",
# type_hint=Optional[torch.Tensor],
# description="The mask to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "masked_image_latents",
# type_hint=Optional[torch.Tensor],
# description="The masked image latents to use for the denoising process, for inpainting task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "noise",
# type_hint=Optional[torch.Tensor],
# description="The noise added to the image latents, for inpainting task only. Can be generated in prepare_latent step."
# ),
# InputParam(
# "image_latents",
# type_hint=Optional[torch.Tensor],
# description="The image latents to use for the denoising process, for inpainting/image-to-image task only. Can be generated in vae_encode or prepare_latent step."
# ),
# InputParam(
# "crops_coords",
# type_hint=Optional[Tuple[int]],
# description="The crop coordinates to use for preprocess/postprocess the image and mask, for inpainting task only. Can be generated in vae_encode step."
# ),
# InputParam(
# "ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# InputParam(
# "negative_ip_adapter_embeds",
# type_hint=Optional[torch.Tensor],
# description="The negative ip adapter embeddings to use to condition the denoising process, need to have ip adapter model loaded. Can be generated in ip_adapter step."
# ),
# InputParam(
# "num_inference_steps",
# required=True,
# type_hint=int,
# description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step."
# ),
# InputParam(kwargs_type="controlnet_kwargs", description="additional kwargs for controlnet")
# ]
# @property
# def intermediates_outputs(self) -> List[OutputParam]:
# return [OutputParam("latents", type_hint=torch.Tensor, description="The denoised latents")]
# @staticmethod
# def check_inputs(components, block_state):
# num_channels_unet = components.unet.config.in_channels
# if num_channels_unet == 9:
# # default case for runwayml/stable-diffusion-inpainting
# if block_state.mask is None or block_state.masked_image_latents is None:
# raise ValueError("mask and masked_image_latents must be provided for inpainting-specific Unet")
# num_channels_latents = block_state.latents.shape[1]
# num_channels_mask = block_state.mask.shape[1]
# num_channels_masked_image = block_state.masked_image_latents.shape[1]
# if num_channels_latents + num_channels_mask + num_channels_masked_image != num_channels_unet:
# raise ValueError(
# f"Incorrect configuration settings! The config of `components.unet`: {components.unet.config} expects"
# f" {components.unet.config.in_channels} but received `num_channels_latents`: {num_channels_latents} +"
# f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}"
# f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of"
# " `components.unet` or your `mask_image` or `image` input."
# )
# @staticmethod
# def prepare_extra_kwargs(func, exclude_kwargs=[], **kwargs):
# accepted_kwargs = set(inspect.signature(func).parameters.keys())
# extra_kwargs = {}
# for key, value in kwargs.items():
# if key in accepted_kwargs and key not in exclude_kwargs:
# extra_kwargs[key] = value
# return extra_kwargs
# @torch.no_grad()
# def __call__(self, components: StableDiffusionXLModularLoader, state: PipelineState) -> PipelineState:
# block_state = self.get_block_state(state)
# self.check_inputs(components, block_state)
# block_state.device = components._execution_device
# print(f" block_state: {block_state}")
# controlnet = unwrap_module(components.controlnet)
# # Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
# block_state.extra_step_kwargs = self.prepare_extra_kwargs(components.scheduler.step, generator=block_state.generator, eta=block_state.eta)
# block_state.extra_controlnet_kwargs = self.prepare_extra_kwargs(controlnet.forward, exclude_kwargs=["controlnet_cond", "conditioning_scale", "guess_mode"], **block_state.controlnet_kwargs)
# block_state.num_warmup_steps = max(len(block_state.timesteps) - block_state.num_inference_steps * components.scheduler.order, 0)
# # (1) setup guider
# # disable for LCMs
# block_state.disable_guidance = True if components.unet.config.time_cond_proj_dim is not None else False
# if block_state.disable_guidance:
# components.guider.disable()
# else:
# components.guider.enable()
# components.guider.set_input_fields(
# prompt_embeds=("prompt_embeds", "negative_prompt_embeds"),
# add_time_ids=("add_time_ids", "negative_add_time_ids"),
# pooled_prompt_embeds=("pooled_prompt_embeds", "negative_pooled_prompt_embeds"),
# ip_adapter_embeds=("ip_adapter_embeds", "negative_ip_adapter_embeds"),
# )
# # (5) Denoise loop
# with self.progress_bar(total=block_state.num_inference_steps) as progress_bar:
# for i, t in enumerate(block_state.timesteps):
# # prepare latent input for unet
# block_state.scaled_latents = components.scheduler.scale_model_input(block_state.latents, t)
# # adjust latent input for inpainting
# block_state.num_channels_unet = components.unet.config.in_channels
# if block_state.num_channels_unet == 9:
# block_state.scaled_latents = torch.cat([block_state.scaled_latents, block_state.mask, block_state.masked_image_latents], dim=1)
# # cond_scale (controlnet input)
# if isinstance(block_state.controlnet_keep[i], list):
# block_state.cond_scale = [c * s for c, s in zip(block_state.conditioning_scale, block_state.controlnet_keep[i])]
# else:
# block_state.controlnet_cond_scale = block_state.conditioning_scale
# if isinstance(block_state.controlnet_cond_scale, list):
# block_state.controlnet_cond_scale = block_state.controlnet_cond_scale[0]
# block_state.cond_scale = block_state.controlnet_cond_scale * block_state.controlnet_keep[i]
# # default controlnet output/unet input for guess mode + conditional path
# block_state.down_block_res_samples_zeros = None
# block_state.mid_block_res_sample_zeros = None
# # guided denoiser step
# components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t)
# guider_state = components.guider.prepare_inputs(block_state)
# for guider_state_batch in guider_state:
# components.guider.prepare_models(components.unet)
# # Prepare additional conditionings
# guider_state_batch.added_cond_kwargs = {
# "text_embeds": guider_state_batch.pooled_prompt_embeds,
# "time_ids": guider_state_batch.add_time_ids,
# }
# if guider_state_batch.ip_adapter_embeds is not None:
# guider_state_batch.added_cond_kwargs["image_embeds"] = guider_state_batch.ip_adapter_embeds
# # Prepare controlnet additional conditionings
# guider_state_batch.controlnet_added_cond_kwargs = {
# "text_embeds": guider_state_batch.pooled_prompt_embeds,
# "time_ids": guider_state_batch.add_time_ids,
# }
# if block_state.guess_mode and not components.guider.is_conditional:
# # guider always run uncond batch first, so these tensors should be set already
# guider_state_batch.down_block_res_samples = block_state.down_block_res_samples_zeros
# guider_state_batch.mid_block_res_sample = block_state.mid_block_res_sample_zeros
# else:
# guider_state_batch.down_block_res_samples, guider_state_batch.mid_block_res_sample = components.controlnet(
# block_state.scaled_latents,
# t,
# encoder_hidden_states=guider_state_batch.prompt_embeds,
# controlnet_cond=block_state.controlnet_cond,
# conditioning_scale=block_state.conditioning_scale,
# guess_mode=block_state.guess_mode,
# added_cond_kwargs=guider_state_batch.controlnet_added_cond_kwargs,
# return_dict=False,
# **block_state.extra_controlnet_kwargs,
# )
# if block_state.down_block_res_samples_zeros is None:
# block_state.down_block_res_samples_zeros = [torch.zeros_like(d) for d in guider_state_batch.down_block_res_samples]
# if block_state.mid_block_res_sample_zeros is None:
# block_state.mid_block_res_sample_zeros = torch.zeros_like(guider_state_batch.mid_block_res_sample)
# guider_state_batch.noise_pred = components.unet(
# block_state.scaled_latents,
# t,
# encoder_hidden_states=guider_state_batch.prompt_embeds,
# timestep_cond=block_state.timestep_cond,
# cross_attention_kwargs=block_state.cross_attention_kwargs,
# added_cond_kwargs=guider_state_batch.added_cond_kwargs,
# down_block_additional_residuals=guider_state_batch.down_block_res_samples,
# mid_block_additional_residual=guider_state_batch.mid_block_res_sample,
# return_dict=False,
# )[0]
# components.guider.cleanup_models(components.unet)
# # Perform guidance
# block_state.noise_pred, scheduler_step_kwargs = components.guider(guider_state)
# # Perform scheduler step using the predicted output
# block_state.latents_dtype = block_state.latents.dtype
# block_state.latents = components.scheduler.step(block_state.noise_pred, t, block_state.latents, **block_state.extra_step_kwargs, **scheduler_step_kwargs, return_dict=False)[0]
# if block_state.latents.dtype != block_state.latents_dtype:
# if torch.backends.mps.is_available():
# # some platforms (eg. apple mps) misbehave due to a pytorch bug: https://github.com/pytorch/pytorch/pull/99272
# block_state.latents = block_state.latents.to(block_state.latents_dtype)
# # adjust latent for inpainting
# if block_state.num_channels_unet == 4 and block_state.mask is not None and block_state.image_latents is not None:
# block_state.init_latents_proper = block_state.image_latents
# if i < len(block_state.timesteps) - 1:
# block_state.noise_timestep = block_state.timesteps[i + 1]
# block_state.init_latents_proper = components.scheduler.add_noise(
# block_state.init_latents_proper, block_state.noise, torch.tensor([block_state.noise_timestep])
# )
# block_state.latents = (1 - block_state.mask) * block_state.init_latents_proper + block_state.mask * block_state.latents
# if i == len(block_state.timesteps) - 1 or ((i + 1) > block_state.num_warmup_steps and (i + 1) % components.scheduler.order == 0):
# progress_bar.update()
# self.add_block_state(state, block_state)
# return components, state