1
0
mirror of https://github.com/huggingface/diffusers.git synced 2026-01-27 17:22:53 +03:00
This commit is contained in:
yiyixuxu
2026-01-25 12:11:37 +01:00
parent 6a549f5f55
commit 20c35da75c
4 changed files with 122 additions and 92 deletions

View File

@@ -39,11 +39,12 @@ from .modular_pipeline_utils import (
InputParam,
InsertableDict,
OutputParam,
format_components,
format_configs,
make_doc_string,
combine_inputs,
combine_outputs,
format_components,
format_configs,
format_workflow,
make_doc_string,
)
@@ -303,9 +304,9 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
# currentlyonly ConditionalPipelineBlocks and SequentialPipelineBlocks support `get_execution_blocks`
def get_execution_blocks(self, **kwargs):
"""
Get the block(s) that would execute given the inputs.
Must be implemented by subclasses that support conditional block selection.
Get the block(s) that would execute given the inputs. Must be implemented by subclasses that support
conditional block selection.
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
"""
@@ -315,16 +316,15 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
@property
def workflow_names(self):
"""
Returns a list of available workflow names.
Must be implemented by subclasses that define `_workflow_map`.
Returns a list of available workflow names. Must be implemented by subclasses that define `_workflow_map`.
"""
raise NotImplementedError(f"`workflow_names` is not implemented for {self.__class__.__name__}")
def get_workflow(self, workflow_name: str):
"""
Get the execution blocks for a specific workflow.
Must be implemented by subclasses that define `_workflow_map`.
Get the execution blocks for a specific workflow. Must be implemented by subclasses that define
`_workflow_map`.
Args:
workflow_name: Name of the workflow to retrieve.
"""
@@ -498,8 +498,8 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
class ConditionalPipelineBlocks(ModularPipelineBlocks):
"""
A Pipeline Blocks that conditionally selects a block to run based on the inputs. Subclasses must implement the
`select_block` method to define the logic for selecting the block. Currently, we only support selection logic
based on the presence or absence of inputs (i.e., whether they are `None` or not)
`select_block` method to define the logic for selecting the block. Currently, we only support selection logic based
on the presence or absence of inputs (i.e., whether they are `None` or not)
This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the
library implements for all the pipeline blocks (such as loading or saving etc.)
@@ -510,9 +510,9 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
block_classes: List of block classes to be used. Must have the same length as `block_names`.
block_names: List of names for each block. Must have the same length as `block_classes`.
block_trigger_inputs: List of input names that `select_block()` uses to determine which block to run.
For `ConditionalPipelineBlocks`, this does not need to correspond to `block_names` and `block_classes`.
For `AutoPipelineBlocks`, this must have the same length as `block_names` and `block_classes`,
where each element specifies the trigger input for the corresponding block.
For `ConditionalPipelineBlocks`, this does not need to correspond to `block_names` and `block_classes`. For
`AutoPipelineBlocks`, this must have the same length as `block_names` and `block_classes`, where each
element specifies the trigger input for the corresponding block.
default_block_name: Name of the default block to run when no trigger inputs match.
If None, this block can be skipped entirely when no trigger inputs are provided.
"""
@@ -676,34 +676,34 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
def get_execution_blocks(self, **kwargs) -> Optional["ModularPipelineBlocks"]:
"""
Get the block(s) that would execute given the inputs.
Recursively resolves nested ConditionalPipelineBlocks until reaching either:
- A leaf block (no sub_blocks) → returns single `ModularPipelineBlocks`
- A `SequentialPipelineBlocks` → delegates to its `get_execution_blocks()` which returns
- A `SequentialPipelineBlocks` → delegates to its `get_execution_blocks()` which returns
a `SequentialPipelineBlocks` containing the resolved execution blocks
Args:
**kwargs: Input names and values. Only trigger inputs affect block selection.
Returns:
- `ModularPipelineBlocks`: A leaf block or resolved `SequentialPipelineBlocks`
- `None`: If this block would be skipped (no trigger matched and no default)
"""
trigger_kwargs = {name: kwargs.get(name) for name in self.block_trigger_inputs if name is not None}
block_name = self.select_block(**trigger_kwargs)
if block_name is None:
block_name = self.default_block_name
if block_name is None:
return None
block = self.sub_blocks[block_name]
# Recursively resolve until we hit a leaf block or a SequentialPipelineBlocks
if block.sub_blocks:
return block.get_execution_blocks(**kwargs)
return block
def __repr__(self):
@@ -784,32 +784,37 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks):
class AutoPipelineBlocks(ConditionalPipelineBlocks):
"""
A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs.
This is a specialized version of `ConditionalPipelineBlocks` where:
- Each block has one corresponding trigger input (1:1 mapping)
- Block selection is automatic: the first block whose trigger input is present gets selected
- `block_trigger_inputs` must have the same length as `block_names` and `block_classes`
- Use `None` in `block_trigger_inputs` to specify the default block, i.e the block that will run if no trigger inputs are present
Attributes:
block_classes: List of block classes to be used. Must have the same length as `block_names` and `block_trigger_inputs`.
block_names: List of names for each block. Must have the same length as `block_classes` and `block_trigger_inputs`.
block_trigger_inputs: List of input names where each element specifies the trigger input for the corresponding block.
Use `None` to mark the default block.
Example:
```python
class MyAutoBlock(AutoPipelineBlocks):
block_classes = [InpaintEncoderBlock, ImageEncoderBlock, TextEncoderBlock]
block_names = ["inpaint", "img2img", "text2img"]
block_trigger_inputs = ["mask_image", "image", None] # text2img is the default
```
With this definition:
- As long as `mask_image` is provided, "inpaint" block runs (regardless of `image` being provided or not)
- If `mask_image` is not provided but `image` is provided, "img2img" block runs
- Otherwise, "text2img" block runs (default, trigger is `None`)
A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs.
This is a specialized version of `ConditionalPipelineBlocks` where:
- Each block has one corresponding trigger input (1:1 mapping)
- Block selection is automatic: the first block whose trigger input is present gets selected
- `block_trigger_inputs` must have the same length as `block_names` and `block_classes`
- Use `None` in `block_trigger_inputs` to specify the default block, i.e the block that will run if no trigger
inputs are present
Attributes:
block_classes:
List of block classes to be used. Must have the same length as `block_names` and
`block_trigger_inputs`.
block_names:
List of names for each block. Must have the same length as `block_classes` and `block_trigger_inputs`.
block_trigger_inputs:
List of input names where each element specifies the trigger input for the corresponding block. Use
`None` to mark the default block.
Example:
```python
class MyAutoBlock(AutoPipelineBlocks):
block_classes = [InpaintEncoderBlock, ImageEncoderBlock, TextEncoderBlock]
block_names = ["inpaint", "img2img", "text2img"]
block_trigger_inputs = ["mask_image", "image", None] # text2img is the default
```
With this definition:
- As long as `mask_image` is provided, "inpaint" block runs (regardless of `image` being provided or not)
- If `mask_image` is not provided but `image` is provided, "img2img" block runs
- Otherwise, "text2img" block runs (default, trigger is `None`)
"""
def __init__(self):
@@ -830,7 +835,6 @@ class AutoPipelineBlocks(ConditionalPipelineBlocks):
idx = self.block_trigger_inputs.index(None)
self.default_block_name = self.block_names[idx]
def select_block(self, **kwargs) -> Optional[str]:
"""Select block based on which trigger input is present (not None)."""
for trigger_input, block_name in zip(self.block_trigger_inputs, self.block_names):
@@ -883,21 +887,24 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
expected_configs.append(config)
return expected_configs
@property
def workflow_names(self):
if self._workflow_map is None:
raise NotImplementedError(f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}")
raise NotImplementedError(
f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}"
)
return list(self._workflow_map.keys())
def get_workflow(self, workflow_name: str):
if self._workflow_map is None:
raise NotImplementedError(f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}")
raise NotImplementedError(
f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}"
)
if workflow_name not in self._workflow_map:
raise ValueError(f"Workflow {workflow_name} not found in {self.__class__.__name__}")
trigger_inputs = self._workflow_map[workflow_name]
workflow_blocks = self.get_execution_blocks(**trigger_inputs)
@@ -1058,7 +1065,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
"""
# Copy kwargs so we can add outputs as we traverse
active_inputs = dict(kwargs)
def fn_recursive_traverse(block, block_name, active_inputs):
result_blocks = OrderedDict()
@@ -1088,7 +1095,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
for block_name, block in self.sub_blocks.items():
nested_blocks = fn_recursive_traverse(block, block_name, active_inputs)
all_blocks.update(nested_blocks)
return SequentialPipelineBlocks.from_blocks_dict(all_blocks)
def __repr__(self):
@@ -1098,7 +1105,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
f"{class_name}(\n Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
)
if self._get_trigger_inputs():
if self._workflow_map is None and self._get_trigger_inputs():
header += "\n"
header += " " + "=" * 100 + "\n"
header += " This pipeline contains blocks that are selected at runtime based on inputs.\n"
@@ -1108,8 +1115,13 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
header += f" Use `get_execution_blocks()` to see selected blocks (e.g. `get_execution_blocks({example_input}=...)`).\n"
header += " " + "=" * 100 + "\n\n"
description = self.description
if self._workflow_map is not None:
workflow_str = format_workflow(self._workflow_map)
description = f"{self.description}\n\n{workflow_str}"
# Format description with proper indentation
desc_lines = self.description.split("\n")
desc_lines = description.split("\n")
desc = []
# First line with "Description:" label
desc.append(f" Description: {desc_lines[0]}")
@@ -1157,10 +1169,15 @@ class SequentialPipelineBlocks(ModularPipelineBlocks):
@property
def doc(self):
description = self.description
if self._workflow_map is not None:
workflow_str = format_workflow(self._workflow_map)
description = f"{self.description}\n\n{workflow_str}"
return make_doc_string(
self.inputs,
self.outputs,
self.description,
description=description,
class_name=self.__class__.__name__,
expected_components=self.expected_components,
expected_configs=self.expected_configs,

View File

@@ -14,11 +14,10 @@
import inspect
import re
import numpy as np
import warnings
from collections import OrderedDict
from dataclasses import dataclass, field, fields
from typing import Any, Dict, List, Literal, Optional, Type, Union, Set, Tuple
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
import PIL.Image
import torch
@@ -862,6 +861,30 @@ def format_configs(configs, indent_level=4, max_line_length=115, add_empty_lines
return "\n".join(formatted_configs)
def format_workflow(workflow_map):
"""Format a workflow map into a readable string representation.
Args:
workflow_map: Dictionary mapping workflow names to trigger inputs
Returns:
A formatted string representing all workflows
"""
if workflow_map is None:
return ""
lines = ["Supported workflows:"]
for workflow_name, trigger_inputs in workflow_map.items():
required_inputs = [k for k, v in trigger_inputs.items() if v]
if required_inputs:
inputs_str = ", ".join(f"`{t}`" for t in required_inputs)
lines.append(f" - `{workflow_name}`: requires {inputs_str}")
else:
lines.append(f" - `{workflow_name}`: default (no additional inputs required)")
return "\n".join(lines)
def make_doc_string(
inputs,
outputs,
@@ -920,9 +943,9 @@ def make_doc_string(
def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]:
"""
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if
current default value is None and new default value is not None. Warns if multiple non-None default values
exist for the same input.
Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if current
default value is None and new default value is not None. Warns if multiple non-None default values exist for the
same input.
Args:
named_input_lists: List of tuples containing (block_name, input_param_list) pairs
@@ -960,6 +983,7 @@ def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> Li
return list(combined_dict.values())
def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]:
"""
Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
@@ -980,4 +1004,4 @@ def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) ->
):
combined_dict[output_param.name] = output_param
return list(combined_dict.values())
return list(combined_dict.values())

View File

@@ -551,8 +551,7 @@ class QwenImageCreateMaskLatentsStep(ModularPipelineBlocks):
# auto_docstring
class QwenImageSetTimestepsStep(ModularPipelineBlocks):
"""
Step that sets the the scheduler's timesteps for text-to-image generation. Should be run after prepare latents
step.
Step that sets the scheduler's timesteps for text-to-image generation. Should be run after prepare latents step.
Components:
scheduler (`FlowMatchEulerDiscreteScheduler`)
@@ -718,8 +717,8 @@ class QwenImageLayeredSetTimestepsStep(ModularPipelineBlocks):
# auto_docstring
class QwenImageSetTimestepsWithStrengthStep(ModularPipelineBlocks):
"""
Step that sets the the scheduler's timesteps for image-to-image generation, and inpainting. Should be run after
prepare latents step.
Step that sets the scheduler's timesteps for image-to-image generation, and inpainting. Should be run after prepare
latents step.
Components:
scheduler (`FlowMatchEulerDiscreteScheduler`)
@@ -846,10 +845,6 @@ class QwenImageRoPEInputsStep(ModularPipelineBlocks):
Outputs:
img_shapes (`List`):
The shapes of the images latents, used for RoPE calculation
txt_seq_lens (`List`):
The sequence lengths of the prompt embeds, used for RoPE calculation
negative_txt_seq_lens (`List`):
The sequence lengths of the negative prompt embeds, used for RoPE calculation
"""
model_name = "qwenimage"
@@ -925,10 +920,6 @@ class QwenImageEditRoPEInputsStep(ModularPipelineBlocks):
Outputs:
img_shapes (`List`):
The shapes of the images latents, used for RoPE calculation
txt_seq_lens (`List`):
The sequence lengths of the prompt embeds, used for RoPE calculation
negative_txt_seq_lens (`List`):
The sequence lengths of the negative prompt embeds, used for RoPE calculation
"""
model_name = "qwenimage"

View File

@@ -1113,10 +1113,14 @@ AUTO_BLOCKS = InsertableDict(
class QwenImageAutoBlocks(SequentialPipelineBlocks):
"""
Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.
- for image-to-image generation, you need to provide `image`
- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.
- to run the controlnet workflow, you need to provide `control_image`
- for text-to-image generation, all you need to provide is `prompt`
Supported workflows:
- `text2image`: requires `prompt`
- `image2image`: requires `prompt`, `image`
- `inpainting`: requires `prompt`, `mask_image`, `image`
- `controlnet_text2image`: requires `prompt`, `control_image`
- `controlnet_image2image`: requires `prompt`, `image`, `control_image`
- `controlnet_inpainting`: requires `prompt`, `mask_image`, `image`, `control_image`
Components:
text_encoder (`Qwen2_5_VLForConditionalGeneration`): The text encoder to use tokenizer (`Qwen2Tokenizer`):
@@ -1214,13 +1218,7 @@ class QwenImageAutoBlocks(SequentialPipelineBlocks):
@property
def description(self):
return (
"Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.\n"
+ "- for image-to-image generation, you need to provide `image`\n"
+ "- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.\n"
+ "- to run the controlnet workflow, you need to provide `control_image`\n"
+ "- for text-to-image generation, all you need to provide is `prompt`"
)
return "Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage."
@property
def outputs(self):