diff --git a/src/diffusers/modular_pipelines/modular_pipeline.py b/src/diffusers/modular_pipelines/modular_pipeline.py index f9bf257c36..cbc6ee2470 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/modular_pipeline.py @@ -39,11 +39,12 @@ from .modular_pipeline_utils import ( InputParam, InsertableDict, OutputParam, - format_components, - format_configs, - make_doc_string, combine_inputs, combine_outputs, + format_components, + format_configs, + format_workflow, + make_doc_string, ) @@ -303,9 +304,9 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): # currentlyonly ConditionalPipelineBlocks and SequentialPipelineBlocks support `get_execution_blocks` def get_execution_blocks(self, **kwargs): """ - Get the block(s) that would execute given the inputs. - Must be implemented by subclasses that support conditional block selection. - + Get the block(s) that would execute given the inputs. Must be implemented by subclasses that support + conditional block selection. + Args: **kwargs: Input names and values. Only trigger inputs affect block selection. """ @@ -315,16 +316,15 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): @property def workflow_names(self): """ - Returns a list of available workflow names. - Must be implemented by subclasses that define `_workflow_map`. + Returns a list of available workflow names. Must be implemented by subclasses that define `_workflow_map`. """ raise NotImplementedError(f"`workflow_names` is not implemented for {self.__class__.__name__}") def get_workflow(self, workflow_name: str): """ - Get the execution blocks for a specific workflow. - Must be implemented by subclasses that define `_workflow_map`. - + Get the execution blocks for a specific workflow. Must be implemented by subclasses that define + `_workflow_map`. + Args: workflow_name: Name of the workflow to retrieve. """ @@ -498,8 +498,8 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): class ConditionalPipelineBlocks(ModularPipelineBlocks): """ A Pipeline Blocks that conditionally selects a block to run based on the inputs. Subclasses must implement the - `select_block` method to define the logic for selecting the block. Currently, we only support selection logic - based on the presence or absence of inputs (i.e., whether they are `None` or not) + `select_block` method to define the logic for selecting the block. Currently, we only support selection logic based + on the presence or absence of inputs (i.e., whether they are `None` or not) This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the library implements for all the pipeline blocks (such as loading or saving etc.) @@ -510,9 +510,9 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks): block_classes: List of block classes to be used. Must have the same length as `block_names`. block_names: List of names for each block. Must have the same length as `block_classes`. block_trigger_inputs: List of input names that `select_block()` uses to determine which block to run. - For `ConditionalPipelineBlocks`, this does not need to correspond to `block_names` and `block_classes`. - For `AutoPipelineBlocks`, this must have the same length as `block_names` and `block_classes`, - where each element specifies the trigger input for the corresponding block. + For `ConditionalPipelineBlocks`, this does not need to correspond to `block_names` and `block_classes`. For + `AutoPipelineBlocks`, this must have the same length as `block_names` and `block_classes`, where each + element specifies the trigger input for the corresponding block. default_block_name: Name of the default block to run when no trigger inputs match. If None, this block can be skipped entirely when no trigger inputs are provided. """ @@ -676,34 +676,34 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks): def get_execution_blocks(self, **kwargs) -> Optional["ModularPipelineBlocks"]: """ Get the block(s) that would execute given the inputs. - + Recursively resolves nested ConditionalPipelineBlocks until reaching either: - A leaf block (no sub_blocks) → returns single `ModularPipelineBlocks` - - A `SequentialPipelineBlocks` → delegates to its `get_execution_blocks()` which returns + - A `SequentialPipelineBlocks` → delegates to its `get_execution_blocks()` which returns a `SequentialPipelineBlocks` containing the resolved execution blocks - + Args: **kwargs: Input names and values. Only trigger inputs affect block selection. - + Returns: - `ModularPipelineBlocks`: A leaf block or resolved `SequentialPipelineBlocks` - `None`: If this block would be skipped (no trigger matched and no default) """ trigger_kwargs = {name: kwargs.get(name) for name in self.block_trigger_inputs if name is not None} block_name = self.select_block(**trigger_kwargs) - + if block_name is None: block_name = self.default_block_name - + if block_name is None: return None - + block = self.sub_blocks[block_name] - + # Recursively resolve until we hit a leaf block or a SequentialPipelineBlocks if block.sub_blocks: return block.get_execution_blocks(**kwargs) - + return block def __repr__(self): @@ -784,32 +784,37 @@ class ConditionalPipelineBlocks(ModularPipelineBlocks): class AutoPipelineBlocks(ConditionalPipelineBlocks): """ - A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs. - - This is a specialized version of `ConditionalPipelineBlocks` where: - - Each block has one corresponding trigger input (1:1 mapping) - - Block selection is automatic: the first block whose trigger input is present gets selected - - `block_trigger_inputs` must have the same length as `block_names` and `block_classes` - - Use `None` in `block_trigger_inputs` to specify the default block, i.e the block that will run if no trigger inputs are present - - Attributes: - block_classes: List of block classes to be used. Must have the same length as `block_names` and `block_trigger_inputs`. - block_names: List of names for each block. Must have the same length as `block_classes` and `block_trigger_inputs`. - block_trigger_inputs: List of input names where each element specifies the trigger input for the corresponding block. - Use `None` to mark the default block. - - Example: -```python - class MyAutoBlock(AutoPipelineBlocks): - block_classes = [InpaintEncoderBlock, ImageEncoderBlock, TextEncoderBlock] - block_names = ["inpaint", "img2img", "text2img"] - block_trigger_inputs = ["mask_image", "image", None] # text2img is the default -``` - - With this definition: - - As long as `mask_image` is provided, "inpaint" block runs (regardless of `image` being provided or not) - - If `mask_image` is not provided but `image` is provided, "img2img" block runs - - Otherwise, "text2img" block runs (default, trigger is `None`) + A Pipeline Blocks that automatically selects a block to run based on the presence of trigger inputs. + + This is a specialized version of `ConditionalPipelineBlocks` where: + - Each block has one corresponding trigger input (1:1 mapping) + - Block selection is automatic: the first block whose trigger input is present gets selected + - `block_trigger_inputs` must have the same length as `block_names` and `block_classes` + - Use `None` in `block_trigger_inputs` to specify the default block, i.e the block that will run if no trigger + inputs are present + + Attributes: + block_classes: + List of block classes to be used. Must have the same length as `block_names` and + `block_trigger_inputs`. + block_names: + List of names for each block. Must have the same length as `block_classes` and `block_trigger_inputs`. + block_trigger_inputs: + List of input names where each element specifies the trigger input for the corresponding block. Use + `None` to mark the default block. + + Example: + ```python + class MyAutoBlock(AutoPipelineBlocks): + block_classes = [InpaintEncoderBlock, ImageEncoderBlock, TextEncoderBlock] + block_names = ["inpaint", "img2img", "text2img"] + block_trigger_inputs = ["mask_image", "image", None] # text2img is the default + ``` + + With this definition: + - As long as `mask_image` is provided, "inpaint" block runs (regardless of `image` being provided or not) + - If `mask_image` is not provided but `image` is provided, "img2img" block runs + - Otherwise, "text2img" block runs (default, trigger is `None`) """ def __init__(self): @@ -830,7 +835,6 @@ class AutoPipelineBlocks(ConditionalPipelineBlocks): idx = self.block_trigger_inputs.index(None) self.default_block_name = self.block_names[idx] - def select_block(self, **kwargs) -> Optional[str]: """Select block based on which trigger input is present (not None).""" for trigger_input, block_name in zip(self.block_trigger_inputs, self.block_names): @@ -883,21 +887,24 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): expected_configs.append(config) return expected_configs - @property def workflow_names(self): if self._workflow_map is None: - raise NotImplementedError(f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}") - + raise NotImplementedError( + f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}" + ) + return list(self._workflow_map.keys()) def get_workflow(self, workflow_name: str): if self._workflow_map is None: - raise NotImplementedError(f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}") - + raise NotImplementedError( + f"workflows is not supported because _workflow_map is not set for {self.__class__.__name__}" + ) + if workflow_name not in self._workflow_map: raise ValueError(f"Workflow {workflow_name} not found in {self.__class__.__name__}") - + trigger_inputs = self._workflow_map[workflow_name] workflow_blocks = self.get_execution_blocks(**trigger_inputs) @@ -1058,7 +1065,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): """ # Copy kwargs so we can add outputs as we traverse active_inputs = dict(kwargs) - + def fn_recursive_traverse(block, block_name, active_inputs): result_blocks = OrderedDict() @@ -1088,7 +1095,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): for block_name, block in self.sub_blocks.items(): nested_blocks = fn_recursive_traverse(block, block_name, active_inputs) all_blocks.update(nested_blocks) - + return SequentialPipelineBlocks.from_blocks_dict(all_blocks) def __repr__(self): @@ -1098,7 +1105,7 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): f"{class_name}(\n Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n" ) - if self._get_trigger_inputs(): + if self._workflow_map is None and self._get_trigger_inputs(): header += "\n" header += " " + "=" * 100 + "\n" header += " This pipeline contains blocks that are selected at runtime based on inputs.\n" @@ -1108,8 +1115,13 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): header += f" Use `get_execution_blocks()` to see selected blocks (e.g. `get_execution_blocks({example_input}=...)`).\n" header += " " + "=" * 100 + "\n\n" + description = self.description + if self._workflow_map is not None: + workflow_str = format_workflow(self._workflow_map) + description = f"{self.description}\n\n{workflow_str}" + # Format description with proper indentation - desc_lines = self.description.split("\n") + desc_lines = description.split("\n") desc = [] # First line with "Description:" label desc.append(f" Description: {desc_lines[0]}") @@ -1157,10 +1169,15 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): @property def doc(self): + description = self.description + if self._workflow_map is not None: + workflow_str = format_workflow(self._workflow_map) + description = f"{self.description}\n\n{workflow_str}" + return make_doc_string( self.inputs, self.outputs, - self.description, + description=description, class_name=self.__class__.__name__, expected_components=self.expected_components, expected_configs=self.expected_configs, diff --git a/src/diffusers/modular_pipelines/modular_pipeline_utils.py b/src/diffusers/modular_pipelines/modular_pipeline_utils.py index e075f88a0b..6792d7db42 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline_utils.py +++ b/src/diffusers/modular_pipelines/modular_pipeline_utils.py @@ -14,11 +14,10 @@ import inspect import re -import numpy as np import warnings from collections import OrderedDict from dataclasses import dataclass, field, fields -from typing import Any, Dict, List, Literal, Optional, Type, Union, Set, Tuple +from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union import PIL.Image import torch @@ -862,6 +861,30 @@ def format_configs(configs, indent_level=4, max_line_length=115, add_empty_lines return "\n".join(formatted_configs) +def format_workflow(workflow_map): + """Format a workflow map into a readable string representation. + + Args: + workflow_map: Dictionary mapping workflow names to trigger inputs + + Returns: + A formatted string representing all workflows + """ + if workflow_map is None: + return "" + + lines = ["Supported workflows:"] + for workflow_name, trigger_inputs in workflow_map.items(): + required_inputs = [k for k, v in trigger_inputs.items() if v] + if required_inputs: + inputs_str = ", ".join(f"`{t}`" for t in required_inputs) + lines.append(f" - `{workflow_name}`: requires {inputs_str}") + else: + lines.append(f" - `{workflow_name}`: default (no additional inputs required)") + + return "\n".join(lines) + + def make_doc_string( inputs, outputs, @@ -920,9 +943,9 @@ def make_doc_string( def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]: """ - Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if - current default value is None and new default value is not None. Warns if multiple non-None default values - exist for the same input. + Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if current + default value is None and new default value is not None. Warns if multiple non-None default values exist for the + same input. Args: named_input_lists: List of tuples containing (block_name, input_param_list) pairs @@ -960,6 +983,7 @@ def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> Li return list(combined_dict.values()) + def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]: """ Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first @@ -980,4 +1004,4 @@ def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> ): combined_dict[output_param.name] = output_param - return list(combined_dict.values()) \ No newline at end of file + return list(combined_dict.values()) diff --git a/src/diffusers/modular_pipelines/qwenimage/before_denoise.py b/src/diffusers/modular_pipelines/qwenimage/before_denoise.py index 338caf514b..80a379da6b 100644 --- a/src/diffusers/modular_pipelines/qwenimage/before_denoise.py +++ b/src/diffusers/modular_pipelines/qwenimage/before_denoise.py @@ -551,8 +551,7 @@ class QwenImageCreateMaskLatentsStep(ModularPipelineBlocks): # auto_docstring class QwenImageSetTimestepsStep(ModularPipelineBlocks): """ - Step that sets the the scheduler's timesteps for text-to-image generation. Should be run after prepare latents - step. + Step that sets the scheduler's timesteps for text-to-image generation. Should be run after prepare latents step. Components: scheduler (`FlowMatchEulerDiscreteScheduler`) @@ -718,8 +717,8 @@ class QwenImageLayeredSetTimestepsStep(ModularPipelineBlocks): # auto_docstring class QwenImageSetTimestepsWithStrengthStep(ModularPipelineBlocks): """ - Step that sets the the scheduler's timesteps for image-to-image generation, and inpainting. Should be run after - prepare latents step. + Step that sets the scheduler's timesteps for image-to-image generation, and inpainting. Should be run after prepare + latents step. Components: scheduler (`FlowMatchEulerDiscreteScheduler`) @@ -846,10 +845,6 @@ class QwenImageRoPEInputsStep(ModularPipelineBlocks): Outputs: img_shapes (`List`): The shapes of the images latents, used for RoPE calculation - txt_seq_lens (`List`): - The sequence lengths of the prompt embeds, used for RoPE calculation - negative_txt_seq_lens (`List`): - The sequence lengths of the negative prompt embeds, used for RoPE calculation """ model_name = "qwenimage" @@ -925,10 +920,6 @@ class QwenImageEditRoPEInputsStep(ModularPipelineBlocks): Outputs: img_shapes (`List`): The shapes of the images latents, used for RoPE calculation - txt_seq_lens (`List`): - The sequence lengths of the prompt embeds, used for RoPE calculation - negative_txt_seq_lens (`List`): - The sequence lengths of the negative prompt embeds, used for RoPE calculation """ model_name = "qwenimage" diff --git a/src/diffusers/modular_pipelines/qwenimage/modular_blocks_qwenimage.py b/src/diffusers/modular_pipelines/qwenimage/modular_blocks_qwenimage.py index 66f861da65..9bdc49ff91 100644 --- a/src/diffusers/modular_pipelines/qwenimage/modular_blocks_qwenimage.py +++ b/src/diffusers/modular_pipelines/qwenimage/modular_blocks_qwenimage.py @@ -1113,10 +1113,14 @@ AUTO_BLOCKS = InsertableDict( class QwenImageAutoBlocks(SequentialPipelineBlocks): """ Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage. - - for image-to-image generation, you need to provide `image` - - for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`. - - to run the controlnet workflow, you need to provide `control_image` - - for text-to-image generation, all you need to provide is `prompt` + + Supported workflows: + - `text2image`: requires `prompt` + - `image2image`: requires `prompt`, `image` + - `inpainting`: requires `prompt`, `mask_image`, `image` + - `controlnet_text2image`: requires `prompt`, `control_image` + - `controlnet_image2image`: requires `prompt`, `image`, `control_image` + - `controlnet_inpainting`: requires `prompt`, `mask_image`, `image`, `control_image` Components: text_encoder (`Qwen2_5_VLForConditionalGeneration`): The text encoder to use tokenizer (`Qwen2Tokenizer`): @@ -1214,13 +1218,7 @@ class QwenImageAutoBlocks(SequentialPipelineBlocks): @property def description(self): - return ( - "Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage.\n" - + "- for image-to-image generation, you need to provide `image`\n" - + "- for inpainting, you need to provide `mask_image` and `image`, optionally you can provide `padding_mask_crop`.\n" - + "- to run the controlnet workflow, you need to provide `control_image`\n" - + "- for text-to-image generation, all you need to provide is `prompt`" - ) + return "Auto Modular pipeline for text-to-image, image-to-image, inpainting, and controlnet tasks using QwenImage." @property def outputs(self):