1
0
mirror of https://github.com/huggingface/diffusers.git synced 2026-01-29 07:22:12 +03:00

start the section on sequential pipelines

This commit is contained in:
yiyixuxu
2025-06-30 07:48:44 +02:00
parent 285f877620
commit f09b1ccfae

View File

@@ -17,7 +17,7 @@ In Modular Diffusers, you build your workflow using `ModularPipelineBlocks`. We
In this tutorial, we will focus on how to write a basic `PipelineBlock` and how it interacts with other components in the system. We will also cover how to connect them together using the multi-blocks: `SequentialPipelineBlocks`, `LoopSequentialPipelineBlocks`, and `AutoPipelineBlocks`.
### Understanding the Foundation: `PipelineState`
## Understanding the Foundation: `PipelineState`
Before we dive into creating `PipelineBlock`s, we need to have a basic understanding of `PipelineState` - the core data structure that all blocks operate on. This concept is fundamental to understanding how blocks interact with each other and the pipeline system.
@@ -44,7 +44,7 @@ PipelineState(
},
```
### Creating a `PipelineBlock`
## Creating a `PipelineBlock`
To write a `PipelineBlock` class, you need to define a few properties that determine how your block interacts with the pipeline state. Understanding these properties is crucial - they define what data your block can access and what it can produce.
@@ -182,17 +182,17 @@ def make_block(inputs=[], intermediate_inputs=[], intermediate_outputs=[], block
Let's create a simple block to see how these definitions interact with the pipeline state. To better understand what's happening, we'll print out the states before and after updates to inspect them:
```py
user_inputs = [
inputs = [
InputParam(name="image", type_hint="PIL.Image", description="raw input image to process")
]
user_intermediate_inputs = [InputParam(name="batch_size", type_hint=int)]
intermediate_inputs = [InputParam(name="batch_size", type_hint=int)]
user_intermediate_outputs = [
intermediate_outputs = [
OutputParam(name="image_latents", description="latents representing the image")
]
def user_block_fn(block_state, pipeline_state):
def image_encoder_block_fn(block_state, pipeline_state):
print(f"pipeline_state (before update): {pipeline_state}")
print(f"block_state (before update): {block_state}")
@@ -206,21 +206,23 @@ def user_block_fn(block_state, pipeline_state):
return block_state
# Create a block with our definitions
block = make_block(
inputs=user_inputs,
intermediate_inputs=user_intermediate_inputs,
intermediate_outputs=user_intermediate_outputs,
block_fn=user_block_fn
image_encoder_block = make_block(
inputs=inputs,
intermediate_inputs=intermediate_inputs,
intermediate_outputs=intermediate_outputs,
block_fn=image_encoder_block_fn,
description=" Encode raw image into its latent presentation"
)
pipe = block.init_pipeline()
pipe = image_encoder_block.init_pipeline()
```
Let's check the pipeline's docstring to see what inputs it expects:
```py
>>> print(pipe.doc)
class TestBlock
Encode raw image into its latent presentation
Inputs:
image (`PIL.Image`, *optional*):
@@ -246,37 +248,6 @@ state = pipe(image=image, batch_size=2)
print(f"pipeline_state (after update): {state}")
```
```out
pipeline_state (before update): PipelineState(
inputs={
image: <PIL.Image.Image image mode=RGB size=512x512 at 0x7F226024EB90>
},
intermediates={
batch_size: 2
},
)
block_state (before update): BlockState(
image: <PIL.Image.Image image mode=RGB size=512x512 at 0x7F2260260220>
batch_size: 2
)
block_state (after update): BlockState(
image: Tensor(dtype=torch.float32, shape=torch.Size([1, 3, 512, 512]))
batch_size: 4
processed_image: List[4] of Tensors with shapes [torch.Size([1, 3, 512, 512]), torch.Size([1, 3, 512, 512]), torch.Size([1, 3, 512, 512]), torch.Size([1, 3, 512, 512])]
image_latents: Tensor(dtype=torch.float32, shape=torch.Size([1, 4, 64, 64]))
)
pipeline_state (after update): PipelineState(
inputs={
image: <PIL.Image.Image image mode=RGB size=512x512 at 0x7F226024EB90>
},
intermediates={
batch_size: 4
image_latents: Tensor(dtype=torch.float32, shape=torch.Size([1, 4, 64, 64]))
},
)
```
**Key Observations:**
1. **Before the update**: `image` (the input) goes to the immutable inputs dict, while `batch_size` (the intermediate_input) goes to the mutable intermediates dict, and both are available in `block_state`.
@@ -288,3 +259,84 @@ pipeline_state (after update): PipelineState(
- **`processed_image`** was not added to `pipeline_state` because it wasn't declared as an intermediate output
I hope by now you have a basic idea about how `PipelineBlock` manages state through inputs, intermediate inputs, and intermediate outputs. The real power comes when we connect multiple blocks together - their intermediate outputs become intermediate inputs for subsequent blocks, creating modular workflows. Let's explore how to build these connections using multi-blocks like `SequentialPipelineBlocks`.
## Create a `SequentialPipelineBlocks`
I think by this point, you're already familiar with `SequentialPipelineBlocks` and how to create them with the `from_blocks_dict` API. It's one of the most common ways to use Modular Diffusers, and we've covered it pretty well in the [quicktour](https://moon-ci-docs.huggingface.co/docs/diffusers/pr_9672/en/modular_diffusers/quicktour#modularpipelineblocks).
But how do blocks actually connect and work together? Understanding this is crucial for building effective modular workflows. Let's explore this through an example.
**How Blocks Connect in SequentialPipelineBlocks:**
The key insight is that blocks connect through their intermediate inputs and outputs - the "studs and anti-studs" we discussed earlier. Let's expand on our example to create a new block that produces `batch_size`, which we'll call "input_block":
```py
def input_block_fn(block_state, pipeline_state):
# Simulate processing the image
if not isinstance(block_state.prompt, list):
prompt = [block_state.prompt]
batch_size = len(block_state.prompt)
block_state.batch_size = batch_size * block_state.num_images_per_prompt
return block_state
input_block = make_block(
inputs=[
InputParam(name="prompt", type_hint=list, description="list of text prompts"),
InputParam(name="num_images_per_prompt", type_hint=int, description="number of images per prompt")
],
intermediate_outputs=[
OutputParam(name="batch_size", description="calculated batch size")
],
block_fn=input_block_fn,
description="A block that determines batch_size based on the number of prompts and num_images_per_prompt argument."
)
```
Now let's connect these blocks to create a pipeline:
```py
from diffusers.modular_pipelines import SequentialPipelineBlocks, InsertableDict
blocks_dict = InsertableDict()
blocks_dict["input"] = input_block
blocks_dict["image_encoder"] = image_encoder_block
blocks = SequentialPipelineBlocks.from_blocks_dict(blocks_dict)
pipeline = blocks.init_pipeline()
```
Now you have a pipeline with 2 blocks. When you inspect `pipeline.doc`, you can see that `batch_size` is not listed as an input. The pipeline automatically detects that the `input_block` can produce `batch_size` for the `image_encoder_block`, so it doesn't ask the user to provide it.
```py
>>> print(pipeline.doc)
class SequentialPipelineBlocks
Inputs:
prompt (`None`, *optional*):
num_images_per_prompt (`None`, *optional*):
image (`PIL.Image`, *optional*):
raw input image to process
Outputs:
batch_size (`None`):
image_latents (`None`):
latents representing the image
```
At runtime, you have data flow like this:
![Data Flow Diagram](https://huggingface.co/datasets/YiYiXu/testing-images/resolve/main/modular_quicktour/sequential_mermaid.png)
**How SequentialPipelineBlocks Works:**
1. **Execution Order**: Blocks are executed in the order they're registered in the `blocks_dict`
2. **Data Flow**: Outputs from one block become available as intermediate inputs to all subsequent blocks
3. **Smart Input Resolution**: The pipeline automatically figures out which values need to be provided by the user and which will be generated by previous blocks
4. **Consistent Interface**: Each block maintains its own behavior and operates through its defined interface, while collectively these interfaces determine what the entire pipeline accepts and produces
What happens within each block follows the same pattern we described earlier: each block gets its own `block_state` with the relevant inputs and intermediate inputs, performs its computation, and updates the pipeline state with its intermediate outputs.