From f09b1ccfaebb3af7221997855a77dd2e1ffdbe77 Mon Sep 17 00:00:00 2001 From: yiyixuxu Date: Mon, 30 Jun 2025 07:48:44 +0200 Subject: [PATCH] start the section on sequential pipelines --- .../write_own_pipeline_block.md | 140 ++++++++++++------ 1 file changed, 96 insertions(+), 44 deletions(-) diff --git a/docs/source/en/modular_diffusers/write_own_pipeline_block.md b/docs/source/en/modular_diffusers/write_own_pipeline_block.md index 656bc59e40..62a55467cd 100644 --- a/docs/source/en/modular_diffusers/write_own_pipeline_block.md +++ b/docs/source/en/modular_diffusers/write_own_pipeline_block.md @@ -17,7 +17,7 @@ In Modular Diffusers, you build your workflow using `ModularPipelineBlocks`. We In this tutorial, we will focus on how to write a basic `PipelineBlock` and how it interacts with other components in the system. We will also cover how to connect them together using the multi-blocks: `SequentialPipelineBlocks`, `LoopSequentialPipelineBlocks`, and `AutoPipelineBlocks`. -### Understanding the Foundation: `PipelineState` +## Understanding the Foundation: `PipelineState` Before we dive into creating `PipelineBlock`s, we need to have a basic understanding of `PipelineState` - the core data structure that all blocks operate on. This concept is fundamental to understanding how blocks interact with each other and the pipeline system. @@ -44,7 +44,7 @@ PipelineState( }, ``` -### Creating a `PipelineBlock` +## Creating a `PipelineBlock` To write a `PipelineBlock` class, you need to define a few properties that determine how your block interacts with the pipeline state. Understanding these properties is crucial - they define what data your block can access and what it can produce. @@ -182,17 +182,17 @@ def make_block(inputs=[], intermediate_inputs=[], intermediate_outputs=[], block Let's create a simple block to see how these definitions interact with the pipeline state. To better understand what's happening, we'll print out the states before and after updates to inspect them: ```py -user_inputs = [ +inputs = [ InputParam(name="image", type_hint="PIL.Image", description="raw input image to process") ] -user_intermediate_inputs = [InputParam(name="batch_size", type_hint=int)] +intermediate_inputs = [InputParam(name="batch_size", type_hint=int)] -user_intermediate_outputs = [ +intermediate_outputs = [ OutputParam(name="image_latents", description="latents representing the image") ] -def user_block_fn(block_state, pipeline_state): +def image_encoder_block_fn(block_state, pipeline_state): print(f"pipeline_state (before update): {pipeline_state}") print(f"block_state (before update): {block_state}") @@ -206,21 +206,23 @@ def user_block_fn(block_state, pipeline_state): return block_state # Create a block with our definitions -block = make_block( - inputs=user_inputs, - intermediate_inputs=user_intermediate_inputs, - intermediate_outputs=user_intermediate_outputs, - block_fn=user_block_fn +image_encoder_block = make_block( + inputs=inputs, + intermediate_inputs=intermediate_inputs, + intermediate_outputs=intermediate_outputs, + block_fn=image_encoder_block_fn, + description=" Encode raw image into its latent presentation" ) -pipe = block.init_pipeline() +pipe = image_encoder_block.init_pipeline() ``` Let's check the pipeline's docstring to see what inputs it expects: - ```py >>> print(pipe.doc) class TestBlock + Encode raw image into its latent presentation + Inputs: image (`PIL.Image`, *optional*): @@ -246,37 +248,6 @@ state = pipe(image=image, batch_size=2) print(f"pipeline_state (after update): {state}") ``` -```out -pipeline_state (before update): PipelineState( - inputs={ - image: - }, - intermediates={ - batch_size: 2 - }, -) -block_state (before update): BlockState( - image: - batch_size: 2 -) - -block_state (after update): BlockState( - image: Tensor(dtype=torch.float32, shape=torch.Size([1, 3, 512, 512])) - batch_size: 4 - processed_image: List[4] of Tensors with shapes [torch.Size([1, 3, 512, 512]), torch.Size([1, 3, 512, 512]), torch.Size([1, 3, 512, 512]), torch.Size([1, 3, 512, 512])] - image_latents: Tensor(dtype=torch.float32, shape=torch.Size([1, 4, 64, 64])) -) -pipeline_state (after update): PipelineState( - inputs={ - image: - }, - intermediates={ - batch_size: 4 - image_latents: Tensor(dtype=torch.float32, shape=torch.Size([1, 4, 64, 64])) - }, -) -``` - **Key Observations:** 1. **Before the update**: `image` (the input) goes to the immutable inputs dict, while `batch_size` (the intermediate_input) goes to the mutable intermediates dict, and both are available in `block_state`. @@ -288,3 +259,84 @@ pipeline_state (after update): PipelineState( - **`processed_image`** was not added to `pipeline_state` because it wasn't declared as an intermediate output I hope by now you have a basic idea about how `PipelineBlock` manages state through inputs, intermediate inputs, and intermediate outputs. The real power comes when we connect multiple blocks together - their intermediate outputs become intermediate inputs for subsequent blocks, creating modular workflows. Let's explore how to build these connections using multi-blocks like `SequentialPipelineBlocks`. + +## Create a `SequentialPipelineBlocks` + +I think by this point, you're already familiar with `SequentialPipelineBlocks` and how to create them with the `from_blocks_dict` API. It's one of the most common ways to use Modular Diffusers, and we've covered it pretty well in the [quicktour](https://moon-ci-docs.huggingface.co/docs/diffusers/pr_9672/en/modular_diffusers/quicktour#modularpipelineblocks). + +But how do blocks actually connect and work together? Understanding this is crucial for building effective modular workflows. Let's explore this through an example. + +**How Blocks Connect in SequentialPipelineBlocks:** + +The key insight is that blocks connect through their intermediate inputs and outputs - the "studs and anti-studs" we discussed earlier. Let's expand on our example to create a new block that produces `batch_size`, which we'll call "input_block": + +```py +def input_block_fn(block_state, pipeline_state): + + # Simulate processing the image + if not isinstance(block_state.prompt, list): + prompt = [block_state.prompt] + batch_size = len(block_state.prompt) + block_state.batch_size = batch_size * block_state.num_images_per_prompt + + return block_state + +input_block = make_block( + inputs=[ + InputParam(name="prompt", type_hint=list, description="list of text prompts"), + InputParam(name="num_images_per_prompt", type_hint=int, description="number of images per prompt") + ], + intermediate_outputs=[ + OutputParam(name="batch_size", description="calculated batch size") + ], + block_fn=input_block_fn, + description="A block that determines batch_size based on the number of prompts and num_images_per_prompt argument." +) +``` + +Now let's connect these blocks to create a pipeline: + +```py +from diffusers.modular_pipelines import SequentialPipelineBlocks, InsertableDict +blocks_dict = InsertableDict() +blocks_dict["input"] = input_block +blocks_dict["image_encoder"] = image_encoder_block +blocks = SequentialPipelineBlocks.from_blocks_dict(blocks_dict) +pipeline = blocks.init_pipeline() +``` + +Now you have a pipeline with 2 blocks. When you inspect `pipeline.doc`, you can see that `batch_size` is not listed as an input. The pipeline automatically detects that the `input_block` can produce `batch_size` for the `image_encoder_block`, so it doesn't ask the user to provide it. + +```py +>>> print(pipeline.doc) +class SequentialPipelineBlocks + + Inputs: + + prompt (`None`, *optional*): + + num_images_per_prompt (`None`, *optional*): + + image (`PIL.Image`, *optional*): + raw input image to process + + Outputs: + + batch_size (`None`): + + image_latents (`None`): + latents representing the image +``` + +At runtime, you have data flow like this: + +![Data Flow Diagram](https://huggingface.co/datasets/YiYiXu/testing-images/resolve/main/modular_quicktour/sequential_mermaid.png) + +**How SequentialPipelineBlocks Works:** + +1. **Execution Order**: Blocks are executed in the order they're registered in the `blocks_dict` +2. **Data Flow**: Outputs from one block become available as intermediate inputs to all subsequent blocks +3. **Smart Input Resolution**: The pipeline automatically figures out which values need to be provided by the user and which will be generated by previous blocks +4. **Consistent Interface**: Each block maintains its own behavior and operates through its defined interface, while collectively these interfaces determine what the entire pipeline accepts and produces + +What happens within each block follows the same pattern we described earlier: each block gets its own `block_state` with the relevant inputs and intermediate inputs, performs its computation, and updates the pipeline state with its intermediate outputs.