diff --git a/src/diffusers/modular_pipelines/modular_pipeline.py b/src/diffusers/modular_pipelines/modular_pipeline.py index eede0b76d1..9252d6a739 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/modular_pipeline.py @@ -76,139 +76,59 @@ class PipelineState: [`PipelineState`] stores the state of a pipeline. It is used to pass data between pipeline blocks. """ - inputs: Dict[str, Any] = field(default_factory=dict) - intermediates: Dict[str, Any] = field(default_factory=dict) - input_kwargs: Dict[str, List[str]] = field(default_factory=dict) - intermediate_kwargs: Dict[str, List[str]] = field(default_factory=dict) + values: Dict[str, Any] = field(default_factory=dict) + kwargs_mapping: Dict[str, List[str]] = field(default_factory=dict) - def set_input(self, key: str, value: Any, kwargs_type: str = None): + def set(self, key: str, value: Any, kwargs_type: str = None): """ - Add an input to the immutable pipeline state, i.e, pipeline_state.inputs. - - The kwargs_type parameter allows you to associate inputs with specific input types. For example, if you call - set_input(prompt_embeds=..., kwargs_type="guider_kwargs"), this input will be automatically fetched when a - pipeline block has "guider_kwargs" in its expected_inputs list. + Add a value to the pipeline state. Args: - key (str): The key for the input - value (Any): The input value - kwargs_type (str): The kwargs_type with which the input is associated + key (str): The key for the value + value (Any): The value to store + kwargs_type (str): The kwargs_type with which the value is associated """ - self.inputs[key] = value + self.values[key] = value + if kwargs_type is not None: - if kwargs_type not in self.input_kwargs: - self.input_kwargs[kwargs_type] = [key] + if kwargs_type not in self.kwargs_mapping: + self.kwargs_mapping[kwargs_type] = [key] else: - self.input_kwargs[kwargs_type].append(key) + self.kwargs_mapping[kwargs_type].append(key) - def set_intermediate(self, key: str, value: Any, kwargs_type: str = None): + def get(self, keys: Union[str, List[str]], default: Any = None) -> Union[Any, Dict[str, Any]]: """ - Add an intermediate value to the mutable pipeline state, i.e, pipeline_state.intermediates. - - The kwargs_type parameter allows you to associate intermediate values with specific input types. For example, - if you call set_intermediate(latents=..., kwargs_type="latents_kwargs"), this intermediate value will be - automatically fetched when a pipeline block has "latents_kwargs" in its expected_intermediate_inputs list. + Get one or multiple values from the pipeline state. Args: - key (str): The key for the intermediate value - value (Any): The intermediate value - kwargs_type (str): The kwargs_type with which the intermediate value is associated - """ - self.intermediates[key] = value - if kwargs_type is not None: - if kwargs_type not in self.intermediate_kwargs: - self.intermediate_kwargs[kwargs_type] = [key] - else: - self.intermediate_kwargs[kwargs_type].append(key) - - def get_input(self, key: str, default: Any = None) -> Any: - """ - Get an input from the pipeline state. - - Args: - key (str): The key for the input - default (Any): The default value to return if the input is not found + keys (Union[str, List[str]]): Key or list of keys for the values + default (Any): The default value to return if not found Returns: - Any: The input value + Union[Any, Dict[str, Any]]: Single value if keys is str, dictionary of values if keys is list """ - value = self.inputs.get(key, default) - if value is not None: - return deepcopy(value) + if isinstance(keys, str): + return self.values.get(keys, default) + return {key: self.values.get(key, default) for key in keys} - def get_inputs(self, keys: List[str], default: Any = None) -> Dict[str, Any]: + def get_by_kwargs(self, kwargs_type: str) -> Dict[str, Any]: """ - Get multiple inputs from the pipeline state. - - Args: - keys (List[str]): The keys for the inputs - default (Any): The default value to return if the input is not found - - Returns: - Dict[str, Any]: Dictionary of inputs with matching keys - """ - return {key: self.inputs.get(key, default) for key in keys} - - def get_inputs_kwargs(self, kwargs_type: str) -> Dict[str, Any]: - """ - Get all inputs with matching kwargs_type. + Get all values with matching kwargs_type. Args: kwargs_type (str): The kwargs_type to filter by Returns: - Dict[str, Any]: Dictionary of inputs with matching kwargs_type + Dict[str, Any]: Dictionary of values with matching kwargs_type """ - input_names = self.input_kwargs.get(kwargs_type, []) - return self.get_inputs(input_names) - - def get_intermediate_kwargs(self, kwargs_type: str) -> Dict[str, Any]: - """ - Get all intermediates with matching kwargs_type. - - Args: - kwargs_type (str): The kwargs_type to filter by - - Returns: - Dict[str, Any]: Dictionary of intermediates with matching kwargs_type - """ - intermediate_names = self.intermediate_kwargs.get(kwargs_type, []) - return self.get_intermediates(intermediate_names) - - def get_intermediate(self, key: str, default: Any = None) -> Any: - """ - Get an intermediate value from the pipeline state. - - Args: - key (str): The key for the intermediate value - default (Any): The default value to return if the intermediate value is not found - - Returns: - Any: The intermediate value - """ - return self.intermediates.get(key, default) - - def get_intermediates(self, keys: List[str], default: Any = None) -> Dict[str, Any]: - """ - Get multiple intermediate values from the pipeline state. - - Args: - keys (List[str]): The keys for the intermediate values - default (Any): The default value to return if the intermediate value is not found - - Returns: - Dict[str, Any]: Dictionary of intermediate values with matching keys - """ - return {key: self.intermediates.get(key, default) for key in keys} + value_names = self.kwargs_mapping.get(kwargs_type, []) + return self.get(value_names) def to_dict(self) -> Dict[str, Any]: """ Convert PipelineState to a dictionary. - - Returns: - Dict[str, Any]: Dictionary containing all attributes of the PipelineState """ - return {**self.__dict__, "inputs": self.inputs, "intermediates": self.intermediates} + return {**self.__dict__} def __repr__(self): def format_value(v): @@ -219,23 +139,16 @@ class PipelineState: else: return repr(v) - inputs = "\n".join(f" {k}: {format_value(v)}" for k, v in self.inputs.items()) - intermediates = "\n".join(f" {k}: {format_value(v)}" for k, v in self.intermediates.items()) - - # Format input_kwargs and intermediate_kwargs - input_kwargs_str = "\n".join(f" {k}: {v}" for k, v in self.input_kwargs.items()) - intermediate_kwargs_str = "\n".join(f" {k}: {v}" for k, v in self.intermediate_kwargs.items()) + values_str = "\n".join(f" {k}: {format_value(v)}" for k, v in self.values.items()) + kwargs_mapping_str = "\n".join(f" {k}: {v}" for k, v in self.kwargs_mapping.items()) return ( f"PipelineState(\n" - f" inputs={{\n{inputs}\n }},\n" - f" intermediates={{\n{intermediates}\n }},\n" - f" input_kwargs={{\n{input_kwargs_str}\n }},\n" - f" intermediate_kwargs={{\n{intermediate_kwargs_str}\n }}\n" + f" values={{\n{values_str}\n }},\n" + f" kwargs_mapping={{\n{kwargs_mapping_str}\n }}\n" f")" ) - @dataclass class BlockState: """ @@ -322,7 +235,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): """ - config_name = "config.json" + config_name = "modular_config.json" model_name = None @classmethod @@ -334,6 +247,14 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): return expected_modules, optional_parameters + def __init__(self): + self.sub_blocks = InsertableDict() + + @property + def description(self) -> str: + """Description of the block. Must be implemented by subclasses.""" + return "" + @property def expected_components(self) -> List[ComponentSpec]: return [] @@ -343,8 +264,8 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): return [] @property - def intermediate_inputs(self) -> List[OutputParam]: - """List of intermediate output parameters. Must be implemented by subclasses.""" + def inputs(self) -> List[InputParam]: + """List of input parameters. Must be implemented by subclasses.""" return [] @property @@ -352,6 +273,13 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): """List of intermediate output parameters. Must be implemented by subclasses.""" return [] + def _get_outputs(self): + return self.intermediate_outputs + + @property + def outputs(self) -> List[OutputParam]: + return self._get_outputs() + @classmethod def from_pretrained( cls, @@ -436,12 +364,12 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): def get_block_state(self, state: PipelineState) -> dict: """Get all inputs and intermediates in one dictionary""" data = {} - state_inputs = self.inputs + self.intermediate_inputs + state_inputs = self.inputs # Check inputs for input_param in state_inputs: if input_param.name: - value = state.get_input(input_param.name) or state.get_intermediate(input_param.name) + value = state.get(input_param.name) if input_param.required and value is None: raise ValueError(f"Required input '{input_param.name}' is missing") elif value is not None or (value is None and input_param.name not in data): @@ -451,7 +379,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): # if kwargs_type is provided, get all inputs with matching kwargs_type if input_param.kwargs_type not in data: data[input_param.kwargs_type] = {} - inputs_kwargs = state.get_inputs_kwargs(input_param.kwargs_type) or state.get_intermediate_kwargs( + inputs_kwargs = state.get_by_kwargs(input_param.kwargs_type) or state.get_kwargs( input_param.kwargs_type ) if inputs_kwargs: @@ -467,25 +395,28 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): if not hasattr(block_state, output_param.name): raise ValueError(f"Intermediate output '{output_param.name}' is missing in block state") param = getattr(block_state, output_param.name) - state.set_intermediate(output_param.name, param, output_param.kwargs_type) + state.set(output_param.name, param, output_param.kwargs_type) - for input_param in self.intermediate_inputs: + for input_param in self.inputs: if input_param.name and hasattr(block_state, input_param.name): param = getattr(block_state, input_param.name) # Only add if the value is different from what's in the state - current_value = state.get_intermediate(input_param.name) + current_value = state.get(input_param.name) if current_value is not param: # Using identity comparison to check if object was modified - state.set_intermediate(input_param.name, param, input_param.kwargs_type) + state.set(input_param.name, param, input_param.kwargs_type) elif input_param.kwargs_type: # if it is a kwargs type, e.g. "guider_input_fields", it is likely to be a list of parameters # we need to first find out which inputs are and loop through them. - intermediate_kwargs = state.get_intermediate_kwargs(input_param.kwargs_type) + intermediate_kwargs = state.get_by_kwargs(input_param.kwargs_type) for param_name, current_value in intermediate_kwargs.items(): - if not hasattr(block_state, param_name): - continue - param = getattr(block_state, param_name) - if current_value is not param: # Using identity comparison to check if object was modified - state.set_intermediate(param_name, param, input_param.kwargs_type) + try: + if not hasattr(block_state, param_name): + continue + param = getattr(block_state, param_name) + if current_value is not param: # Using identity comparison to check if object was modified + state.set(param_name, param, input_param.kwargs_type) + except: + import ipdb; ipdb.set_trace() @staticmethod def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]: @@ -557,10 +488,6 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): def input_names(self) -> List[str]: return [input_param.name for input_param in self.inputs] - @property - def intermediate_input_names(self) -> List[str]: - return [input_param.name for input_param in self.intermediate_inputs] - @property def intermediate_output_names(self) -> List[str]: return [output_param.name for output_param in self.intermediate_outputs] @@ -737,31 +664,31 @@ class PipelineBlock(ModularPipelineBlocks): if not hasattr(block_state, output_param.name): raise ValueError(f"Intermediate output '{output_param.name}' is missing in block state") param = getattr(block_state, output_param.name) - state.set_intermediate(output_param.name, param, output_param.kwargs_type) + state.set(output_param.name, param, output_param.kwargs_type) for input_param in self.intermediate_inputs: if hasattr(block_state, input_param.name): param = getattr(block_state, input_param.name) # Only add if the value is different from what's in the state - current_value = state.get_intermediate(input_param.name) + current_value = state.get(input_param.name) if current_value is not param: # Using identity comparison to check if object was modified - state.set_intermediate(input_param.name, param, input_param.kwargs_type) + state.set(input_param.name, param, input_param.kwargs_type) for input_param in self.intermediate_inputs: if input_param.name and hasattr(block_state, input_param.name): param = getattr(block_state, input_param.name) # Only add if the value is different from what's in the state - current_value = state.get_intermediate(input_param.name) + current_value = state.get(input_param.name) if current_value is not param: # Using identity comparison to check if object was modified - state.set_intermediate(input_param.name, param, input_param.kwargs_type) + state.set(input_param.name, param, input_param.kwargs_type) elif input_param.kwargs_type: # if it is a kwargs type, e.g. "guider_input_fields", it is likely to be a list of parameters # we need to first find out which inputs are and loop through them. - intermediate_kwargs = state.get_intermediate_kwargs(input_param.kwargs_type) + intermediate_kwargs = state.get_kwargs(input_param.kwargs_type) for param_name, current_value in intermediate_kwargs.items(): param = getattr(block_state, param_name) if current_value is not param: # Using identity comparison to check if object was modified - state.set_intermediate(param_name, param, input_param.kwargs_type) + state.set(param_name, param, input_param.kwargs_type) class AutoPipelineBlocks(ModularPipelineBlocks): @@ -852,22 +779,6 @@ class AutoPipelineBlocks(ModularPipelineBlocks): return list(required_by_all) - # YiYi TODO: maybe we do not need this, it is only used in docstring, - # intermediate_inputs is by default required, unless you manually handle it inside the block - @property - def required_intermediate_inputs(self) -> List[str]: - if None not in self.block_trigger_inputs: - return [] - first_block = next(iter(self.sub_blocks.values())) - required_by_all = set(getattr(first_block, "required_intermediate_inputs", set())) - - # Intersect with required inputs from all other blocks - for block in list(self.sub_blocks.values())[1:]: - block_required = set(getattr(block, "required_intermediate_inputs", set())) - required_by_all.intersection_update(block_required) - - return list(required_by_all) - # YiYi TODO: add test for this @property def inputs(self) -> List[Tuple[str, Any]]: @@ -881,18 +792,6 @@ class AutoPipelineBlocks(ModularPipelineBlocks): input_param.required = False return combined_inputs - @property - def intermediate_inputs(self) -> List[str]: - named_inputs = [(name, block.intermediate_inputs) for name, block in self.sub_blocks.items()] - combined_inputs = self.combine_inputs(*named_inputs) - # mark Required inputs only if that input is required by all the blocks - for input_param in combined_inputs: - if input_param.name in self.required_intermediate_inputs: - input_param.required = True - else: - input_param.required = False - return combined_inputs - @property def intermediate_outputs(self) -> List[str]: named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()] @@ -911,10 +810,10 @@ class AutoPipelineBlocks(ModularPipelineBlocks): block = self.trigger_to_block_map.get(None) for input_name in self.block_trigger_inputs: - if input_name is not None and state.get_input(input_name) is not None: + if input_name is not None and state.get(input_name) is not None: block = self.trigger_to_block_map[input_name] break - elif input_name is not None and state.get_intermediate(input_name) is not None: + elif input_name is not None and state.get(input_name) is not None: block = self.trigger_to_block_map[input_name] break @@ -1133,6 +1032,34 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): sub_blocks[block_name] = block_cls() self.sub_blocks = sub_blocks + def _get_inputs(self): + inputs = [] + outputs = set() + + # Go through all blocks in order + for block in self.sub_blocks.values(): + # Add inputs that aren't in outputs yet + for inp in block.inputs: + if inp.name not in outputs and inp.name not in {input.name for input in inputs}: + inputs.append(inp) + + # Only add outputs if the block cannot be skipped + should_add_outputs = True + if hasattr(block, "block_trigger_inputs") and None not in block.block_trigger_inputs: + should_add_outputs = False + + if should_add_outputs: + # Add this block's outputs + block_intermediate_outputs = [out.name for out in block.intermediate_outputs] + outputs.update(block_intermediate_outputs) + + return inputs + + # YiYi TODO: add test for this + @property + def inputs(self) -> List[Tuple[str, Any]]: + return self._get_inputs() + @property def required_inputs(self) -> List[str]: # Get the first block from the dictionary @@ -1146,65 +1073,11 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): return list(required_by_any) - # YiYi TODO: maybe we do not need this, it is only used in docstring, - # intermediate_inputs is by default required, unless you manually handle it inside the block - @property - def required_intermediate_inputs(self) -> List[str]: - required_intermediate_inputs = [] - for input_param in self.intermediate_inputs: - if input_param.required: - required_intermediate_inputs.append(input_param.name) - return required_intermediate_inputs - - # YiYi TODO: add test for this - @property - def inputs(self) -> List[Tuple[str, Any]]: - return self.get_inputs() - - def get_inputs(self): - named_inputs = [(name, block.inputs) for name, block in self.sub_blocks.items()] - combined_inputs = self.combine_inputs(*named_inputs) - # mark Required inputs only if that input is required any of the blocks - for input_param in combined_inputs: - if input_param.name in self.required_inputs: - input_param.required = True - else: - input_param.required = False - return combined_inputs - - @property - def intermediate_inputs(self) -> List[str]: - return self.get_intermediate_inputs() - - def get_intermediate_inputs(self): - inputs = [] - outputs = set() - added_inputs = set() - - # Go through all blocks in order - for block in self.sub_blocks.values(): - # Add inputs that aren't in outputs yet - for inp in block.intermediate_inputs: - if inp.name not in outputs and inp.name not in added_inputs: - inputs.append(inp) - added_inputs.add(inp.name) - - # Only add outputs if the block cannot be skipped - should_add_outputs = True - if hasattr(block, "block_trigger_inputs") and None not in block.block_trigger_inputs: - should_add_outputs = False - - if should_add_outputs: - # Add this block's outputs - block_intermediate_outputs = [out.name for out in block.intermediate_outputs] - outputs.update(block_intermediate_outputs) - return inputs - @property def intermediate_outputs(self) -> List[str]: named_outputs = [] for name, block in self.sub_blocks.items(): - inp_names = {inp.name for inp in block.intermediate_inputs} + inp_names = {inp.name for inp in block.inputs} # so we only need to list new variables as intermediate_outputs, but if user wants to list these they modified it's still fine (a.k.a we don't enforce) # filter out them here so they do not end up as intermediate_outputs if name not in inp_names: @@ -1422,7 +1295,6 @@ class SequentialPipelineBlocks(ModularPipelineBlocks): def doc(self): return make_doc_string( self.inputs, - self.intermediate_inputs, self.outputs, self.description, class_name=self.__class__.__name__, @@ -1472,16 +1344,6 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks): """List of input parameters. Must be implemented by subclasses.""" return [] - @property - def loop_intermediate_inputs(self) -> List[InputParam]: - """List of intermediate input parameters. Must be implemented by subclasses.""" - return [] - - @property - def loop_intermediate_outputs(self) -> List[OutputParam]: - """List of intermediate output parameters. Must be implemented by subclasses.""" - return [] - @property def loop_required_inputs(self) -> List[str]: input_names = [] @@ -1491,12 +1353,9 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks): return input_names @property - def loop_required_intermediate_inputs(self) -> List[str]: - input_names = [] - for input_param in self.loop_intermediate_inputs: - if input_param.required: - input_names.append(input_param.name) - return input_names + def loop_intermediate_outputs(self) -> List[OutputParam]: + """List of intermediate output parameters. Must be implemented by subclasses.""" + return [] # modified from SequentialPipelineBlocks to include loop_expected_components @property @@ -1524,43 +1383,16 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks): expected_configs.append(config) return expected_configs - # modified from SequentialPipelineBlocks to include loop_inputs - def get_inputs(self): - named_inputs = [(name, block.inputs) for name, block in self.sub_blocks.items()] - named_inputs.append(("loop", self.loop_inputs)) - combined_inputs = self.combine_inputs(*named_inputs) - # mark Required inputs only if that input is required any of the blocks - for input_param in combined_inputs: - if input_param.name in self.required_inputs: - input_param.required = True - else: - input_param.required = False - return combined_inputs - - @property - # Copied from diffusers.modular_pipelines.modular_pipeline.SequentialPipelineBlocks.inputs - def inputs(self): - return self.get_inputs() - - # modified from SequentialPipelineBlocks to include loop_intermediate_inputs - @property - def intermediate_inputs(self): - intermediates = self.get_intermediate_inputs() - intermediate_names = [input.name for input in intermediates] - for loop_intermediate_input in self.loop_intermediate_inputs: - if loop_intermediate_input.name not in intermediate_names: - intermediates.append(loop_intermediate_input) - return intermediates - - # modified from SequentialPipelineBlocks - def get_intermediate_inputs(self): + def _get_inputs(self): inputs = [] + inputs.extend(self.loop_inputs) outputs = set() - # Go through all blocks in order - for block in self.sub_blocks.values(): + for name, block in self.sub_blocks.items(): # Add inputs that aren't in outputs yet - inputs.extend(input_name for input_name in block.intermediate_inputs if input_name.name not in outputs) + for inp in block.inputs: + if inp.name not in outputs and inp not in inputs: + inputs.append(inp) # Only add outputs if the block cannot be skipped should_add_outputs = True @@ -1571,8 +1403,20 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks): # Add this block's outputs block_intermediate_outputs = [out.name for out in block.intermediate_outputs] outputs.update(block_intermediate_outputs) + + for input_param in inputs: + if input_param.name in self.required_inputs: + input_param.required = True + else: + input_param.required = False + return inputs + @property + # Copied from diffusers.modular_pipelines.modular_pipeline.SequentialPipelineBlocks.inputs + def inputs(self): + return self._get_inputs() + # modified from SequentialPipelineBlocks, if any additionan input required by the loop is required by the block @property def required_inputs(self) -> List[str]: @@ -1590,19 +1434,6 @@ class LoopSequentialPipelineBlocks(ModularPipelineBlocks): return list(required_by_any) - # YiYi TODO: maybe we do not need this, it is only used in docstring, - # intermediate_inputs is by default required, unless you manually handle it inside the block - @property - def required_intermediate_inputs(self) -> List[str]: - required_intermediate_inputs = [] - for input_param in self.intermediate_inputs: - if input_param.required: - required_intermediate_inputs.append(input_param.name) - for input_param in self.loop_intermediate_inputs: - if input_param.required: - required_intermediate_inputs.append(input_param.name) - return required_intermediate_inputs - # YiYi TODO: this need to be thought about more # modified from SequentialPipelineBlocks to include loop_intermediate_outputs @property @@ -1892,96 +1723,6 @@ class ModularPipeline(ConfigMixin, PushToHubMixin): params[input_param.name] = input_param.default return params - def __call__(self, state: PipelineState = None, output: Union[str, List[str]] = None, **kwargs): - """ - Execute the pipeline by running the pipeline blocks with the given inputs. - - Args: - state (`PipelineState`, optional): - PipelineState instance contains inputs and intermediate values. If None, a new `PipelineState` will be - created based on the user inputs and the pipeline blocks's requirement. - output (`str` or `List[str]`, optional): - Optional specification of what to return: - - None: Returns the complete `PipelineState` with all inputs and intermediates (default) - - str: Returns a specific intermediate value from the state (e.g. `output="image"`) - - List[str]: Returns a dictionary of specific intermediate values (e.g. `output=["image", - "latents"]`) - - - Examples: - ```python - # Get complete pipeline state - state = pipeline(prompt="A beautiful sunset", num_inference_steps=20) - print(state.intermediates) # All intermediate outputs - - # Get specific output - image = pipeline(prompt="A beautiful sunset", output="image") - - # Get multiple specific outputs - results = pipeline(prompt="A beautiful sunset", output=["image", "latents"]) - image, latents = results["image"], results["latents"] - - # Continue from previous state - state = pipeline(prompt="A beautiful sunset") - new_state = pipeline(state=state, output="image") # Continue processing - ``` - - Returns: - - If `output` is None: Complete `PipelineState` containing all inputs and intermediates - - If `output` is str: The specific intermediate value from the state (e.g. `output="image"`) - - If `output` is List[str]: Dictionary mapping output names to their values from the state (e.g. - `output=["image", "latents"]`) - """ - if state is None: - state = PipelineState() - - # Make a copy of the input kwargs - passed_kwargs = kwargs.copy() - - # Add inputs to state, using defaults if not provided in the kwargs or the state - # if same input already in the state, will override it if provided in the kwargs - intermediate_inputs = [inp.name for inp in self.blocks.intermediate_inputs] - for expected_input_param in self.blocks.inputs: - name = expected_input_param.name - default = expected_input_param.default - kwargs_type = expected_input_param.kwargs_type - if name in passed_kwargs: - if name not in intermediate_inputs: - state.set_input(name, passed_kwargs.pop(name), kwargs_type) - else: - state.set_input(name, passed_kwargs[name], kwargs_type) - elif name not in state.inputs: - state.set_input(name, default, kwargs_type) - - for expected_intermediate_param in self.blocks.intermediate_inputs: - name = expected_intermediate_param.name - kwargs_type = expected_intermediate_param.kwargs_type - if name in passed_kwargs: - state.set_intermediate(name, passed_kwargs.pop(name), kwargs_type) - - # Warn about unexpected inputs - if len(passed_kwargs) > 0: - warnings.warn(f"Unexpected input '{passed_kwargs.keys()}' provided. This input will be ignored.") - # Run the pipeline - with torch.no_grad(): - try: - _, state = self.blocks(self, state) - except Exception: - error_msg = f"Error in block: ({self.blocks.__class__.__name__}):\n" - logger.error(error_msg) - raise - - if output is None: - return state - - elif isinstance(output, str): - return state.get_intermediate(output) - - elif isinstance(output, (list, tuple)): - return state.get_intermediates(output) - else: - raise ValueError(f"Output '{output}' is not a valid output type") - def load_default_components(self, **kwargs): """ Load from_pretrained components using the loading specs in the config dict. @@ -2805,3 +2546,87 @@ class ModularPipeline(ConfigMixin, PushToHubMixin): for sub_block_name, sub_block in self.blocks.sub_blocks.items(): if hasattr(sub_block, "set_progress_bar_config"): sub_block.set_progress_bar_config(**kwargs) + + def __call__(self, state: PipelineState = None, output: Union[str, List[str]] = None, **kwargs): + """ + Execute the pipeline by running the pipeline blocks with the given inputs. + + Args: + state (`PipelineState`, optional): + PipelineState instance contains inputs and intermediate values. If None, a new `PipelineState` will be + created based on the user inputs and the pipeline blocks's requirement. + output (`str` or `List[str]`, optional): + Optional specification of what to return: + - None: Returns the complete `PipelineState` with all inputs and intermediates (default) + - str: Returns a specific intermediate value from the state (e.g. `output="image"`) + - List[str]: Returns a dictionary of specific intermediate values (e.g. `output=["image", + "latents"]`) + + + Examples: + ```python + # Get complete pipeline state + state = pipeline(prompt="A beautiful sunset", num_inference_steps=20) + print(state.intermediates) # All intermediate outputs + + # Get specific output + image = pipeline(prompt="A beautiful sunset", output="image") + + # Get multiple specific outputs + results = pipeline(prompt="A beautiful sunset", output=["image", "latents"]) + image, latents = results["image"], results["latents"] + + # Continue from previous state + state = pipeline(prompt="A beautiful sunset") + new_state = pipeline(state=state, output="image") # Continue processing + ``` + + Returns: + - If `output` is None: Complete `PipelineState` containing all inputs and intermediates + - If `output` is str: The specific intermediate value from the state (e.g. `output="image"`) + - If `output` is List[str]: Dictionary mapping output names to their values from the state (e.g. + `output=["image", "latents"]`) + """ + if state is None: + state = PipelineState() + + # Make a copy of the input kwargs + passed_kwargs = kwargs.copy() + + # Add inputs to state, using defaults if not provided in the kwargs or the state + # if same input already in the state, will override it if provided in the kwargs + intermediate_inputs = [inp.name for inp in self.blocks.inputs] + for expected_input_param in self.blocks.inputs: + name = expected_input_param.name + default = expected_input_param.default + kwargs_type = expected_input_param.kwargs_type + if name in passed_kwargs: + if name not in intermediate_inputs: + state.set(name, passed_kwargs.pop(name), kwargs_type) + else: + state.set(name, passed_kwargs[name], kwargs_type) + elif name not in state.values: + state.set(name, default, kwargs_type) + + # Warn about unexpected inputs + if len(passed_kwargs) > 0: + warnings.warn(f"Unexpected input '{passed_kwargs.keys()}' provided. This input will be ignored.") + # Run the pipeline + with torch.no_grad(): + try: + _, state = self.blocks(self, state) + except Exception: + error_msg = f"Error in block: ({self.blocks.__class__.__name__}):\n" + logger.error(error_msg) + raise + + if output is None: + return state + + if isinstance(output, str): + return state.get(output) + + elif isinstance(output, (list, tuple)): + return state.get(output) + else: + raise ValueError(f"Output '{output}' is not a valid output type") diff --git a/src/diffusers/modular_pipelines/stable_diffusion_xl/before_denoise.py b/src/diffusers/modular_pipelines/stable_diffusion_xl/before_denoise.py index 1800a613ec..61487cde15 100644 --- a/src/diffusers/modular_pipelines/stable_diffusion_xl/before_denoise.py +++ b/src/diffusers/modular_pipelines/stable_diffusion_xl/before_denoise.py @@ -27,7 +27,7 @@ from ...schedulers import EulerDiscreteScheduler from ...utils import logging from ...utils.torch_utils import randn_tensor, unwrap_module from ..modular_pipeline import ( - PipelineBlock, + ModularPipelineBlocks, PipelineState, ) from ..modular_pipeline_utils import ComponentSpec, ConfigSpec, InputParam, OutputParam @@ -195,7 +195,7 @@ def prepare_latents_img2img( return latents -class StableDiffusionXLInputStep(PipelineBlock): +class StableDiffusionXLInputStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -213,11 +213,6 @@ class StableDiffusionXLInputStep(PipelineBlock): def inputs(self) -> List[InputParam]: return [ InputParam("num_images_per_prompt", default=1), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam( "prompt_embeds", required=True, @@ -394,7 +389,7 @@ class StableDiffusionXLInputStep(PipelineBlock): return components, state -class StableDiffusionXLImg2ImgSetTimestepsStep(PipelineBlock): +class StableDiffusionXLImg2ImgSetTimestepsStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -421,11 +416,6 @@ class StableDiffusionXLImg2ImgSetTimestepsStep(PipelineBlock): InputParam("denoising_start"), # YiYi TODO: do we need num_images_per_prompt here? InputParam("num_images_per_prompt", default=1), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam( "batch_size", required=True, @@ -543,7 +533,7 @@ class StableDiffusionXLImg2ImgSetTimestepsStep(PipelineBlock): return components, state -class StableDiffusionXLSetTimestepsStep(PipelineBlock): +class StableDiffusionXLSetTimestepsStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -611,7 +601,7 @@ class StableDiffusionXLSetTimestepsStep(PipelineBlock): return components, state -class StableDiffusionXLInpaintPrepareLatentsStep(PipelineBlock): +class StableDiffusionXLInpaintPrepareLatentsStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -640,11 +630,6 @@ class StableDiffusionXLInpaintPrepareLatentsStep(PipelineBlock): "`num_inference_steps`. A value of 1, therefore, essentially ignores `image`. Note that in the case of " "`denoising_start` being declared as an integer, the value of `strength` will be ignored.", ), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam("generator"), InputParam( "batch_size", @@ -890,7 +875,7 @@ class StableDiffusionXLInpaintPrepareLatentsStep(PipelineBlock): return components, state -class StableDiffusionXLImg2ImgPrepareLatentsStep(PipelineBlock): +class StableDiffusionXLImg2ImgPrepareLatentsStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -910,11 +895,6 @@ class StableDiffusionXLImg2ImgPrepareLatentsStep(PipelineBlock): InputParam("latents"), InputParam("num_images_per_prompt", default=1), InputParam("denoising_start"), - ] - - @property - def intermediate_inputs(self) -> List[InputParam]: - return [ InputParam("generator"), InputParam( "latent_timestep", @@ -971,7 +951,7 @@ class StableDiffusionXLImg2ImgPrepareLatentsStep(PipelineBlock): return components, state -class StableDiffusionXLPrepareLatentsStep(PipelineBlock): +class StableDiffusionXLPrepareLatentsStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -992,11 +972,6 @@ class StableDiffusionXLPrepareLatentsStep(PipelineBlock): InputParam("width"), InputParam("latents"), InputParam("num_images_per_prompt", default=1), - ] - - @property - def intermediate_inputs(self) -> List[InputParam]: - return [ InputParam("generator"), InputParam( "batch_size", @@ -1082,7 +1057,7 @@ class StableDiffusionXLPrepareLatentsStep(PipelineBlock): return components, state -class StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep(PipelineBlock): +class StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -1119,11 +1094,6 @@ class StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep(PipelineBlock): InputParam("num_images_per_prompt", default=1), InputParam("aesthetic_score", default=6.0), InputParam("negative_aesthetic_score", default=2.0), - ] - - @property - def intermediate_inputs(self) -> List[InputParam]: - return [ InputParam( "latents", required=True, @@ -1306,7 +1276,7 @@ class StableDiffusionXLImg2ImgPrepareAdditionalConditioningStep(PipelineBlock): return components, state -class StableDiffusionXLPrepareAdditionalConditioningStep(PipelineBlock): +class StableDiffusionXLPrepareAdditionalConditioningStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -1335,11 +1305,6 @@ class StableDiffusionXLPrepareAdditionalConditioningStep(PipelineBlock): InputParam("crops_coords_top_left", default=(0, 0)), InputParam("negative_crops_coords_top_left", default=(0, 0)), InputParam("num_images_per_prompt", default=1), - ] - - @property - def intermediate_inputs(self) -> List[InputParam]: - return [ InputParam( "latents", required=True, @@ -1489,7 +1454,7 @@ class StableDiffusionXLPrepareAdditionalConditioningStep(PipelineBlock): return components, state -class StableDiffusionXLControlNetInputStep(PipelineBlock): +class StableDiffusionXLControlNetInputStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -1517,11 +1482,6 @@ class StableDiffusionXLControlNetInputStep(PipelineBlock): InputParam("controlnet_conditioning_scale", default=1.0), InputParam("guess_mode", default=False), InputParam("num_images_per_prompt", default=1), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam( "latents", required=True, @@ -1708,7 +1668,7 @@ class StableDiffusionXLControlNetInputStep(PipelineBlock): return components, state -class StableDiffusionXLControlNetUnionInputStep(PipelineBlock): +class StableDiffusionXLControlNetUnionInputStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property diff --git a/src/diffusers/modular_pipelines/stable_diffusion_xl/decoders.py b/src/diffusers/modular_pipelines/stable_diffusion_xl/decoders.py index e9f627636e..44010792cd 100644 --- a/src/diffusers/modular_pipelines/stable_diffusion_xl/decoders.py +++ b/src/diffusers/modular_pipelines/stable_diffusion_xl/decoders.py @@ -24,7 +24,7 @@ from ...models import AutoencoderKL from ...models.attention_processor import AttnProcessor2_0, XFormersAttnProcessor from ...utils import logging from ..modular_pipeline import ( - PipelineBlock, + ModularPipelineBlocks, PipelineState, ) from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam @@ -33,7 +33,7 @@ from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam logger = logging.get_logger(__name__) # pylint: disable=invalid-name -class StableDiffusionXLDecodeStep(PipelineBlock): +class StableDiffusionXLDecodeStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -56,11 +56,6 @@ class StableDiffusionXLDecodeStep(PipelineBlock): def inputs(self) -> List[Tuple[str, Any]]: return [ InputParam("output_type", default="pil"), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam( "latents", required=True, @@ -157,7 +152,7 @@ class StableDiffusionXLDecodeStep(PipelineBlock): return components, state -class StableDiffusionXLInpaintOverlayMaskStep(PipelineBlock): +class StableDiffusionXLInpaintOverlayMaskStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property diff --git a/src/diffusers/modular_pipelines/stable_diffusion_xl/denoise.py b/src/diffusers/modular_pipelines/stable_diffusion_xl/denoise.py index 7fe4a472ee..96df9711cc 100644 --- a/src/diffusers/modular_pipelines/stable_diffusion_xl/denoise.py +++ b/src/diffusers/modular_pipelines/stable_diffusion_xl/denoise.py @@ -25,7 +25,7 @@ from ...utils import logging from ..modular_pipeline import ( BlockState, LoopSequentialPipelineBlocks, - PipelineBlock, + ModularPipelineBlocks, PipelineState, ) from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam @@ -37,7 +37,7 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name # YiYi experimenting composible denoise loop # loop step (1): prepare latent input for denoiser -class StableDiffusionXLLoopBeforeDenoiser(PipelineBlock): +class StableDiffusionXLLoopBeforeDenoiser(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -55,7 +55,7 @@ class StableDiffusionXLLoopBeforeDenoiser(PipelineBlock): ) @property - def intermediate_inputs(self) -> List[str]: + def inputs(self) -> List[str]: return [ InputParam( "latents", @@ -73,7 +73,7 @@ class StableDiffusionXLLoopBeforeDenoiser(PipelineBlock): # loop step (1): prepare latent input for denoiser (with inpainting) -class StableDiffusionXLInpaintLoopBeforeDenoiser(PipelineBlock): +class StableDiffusionXLInpaintLoopBeforeDenoiser(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -91,7 +91,7 @@ class StableDiffusionXLInpaintLoopBeforeDenoiser(PipelineBlock): ) @property - def intermediate_inputs(self) -> List[str]: + def inputs(self) -> List[str]: return [ InputParam( "latents", @@ -144,7 +144,7 @@ class StableDiffusionXLInpaintLoopBeforeDenoiser(PipelineBlock): # loop step (2): denoise the latents with guidance -class StableDiffusionXLLoopDenoiser(PipelineBlock): +class StableDiffusionXLLoopDenoiser(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -171,11 +171,6 @@ class StableDiffusionXLLoopDenoiser(PipelineBlock): def inputs(self) -> List[Tuple[str, Any]]: return [ InputParam("cross_attention_kwargs"), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam( "num_inference_steps", required=True, @@ -249,7 +244,7 @@ class StableDiffusionXLLoopDenoiser(PipelineBlock): # loop step (2): denoise the latents with guidance (with controlnet) -class StableDiffusionXLControlNetLoopDenoiser(PipelineBlock): +class StableDiffusionXLControlNetLoopDenoiser(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -277,11 +272,6 @@ class StableDiffusionXLControlNetLoopDenoiser(PipelineBlock): def inputs(self) -> List[Tuple[str, Any]]: return [ InputParam("cross_attention_kwargs"), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam( "controlnet_cond", required=True, @@ -449,7 +439,7 @@ class StableDiffusionXLControlNetLoopDenoiser(PipelineBlock): # loop step (3): scheduler step to update latents -class StableDiffusionXLLoopAfterDenoiser(PipelineBlock): +class StableDiffusionXLLoopAfterDenoiser(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -470,11 +460,6 @@ class StableDiffusionXLLoopAfterDenoiser(PipelineBlock): def inputs(self) -> List[Tuple[str, Any]]: return [ InputParam("eta", default=0.0), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam("generator"), ] @@ -520,7 +505,7 @@ class StableDiffusionXLLoopAfterDenoiser(PipelineBlock): # loop step (3): scheduler step to update latents (with inpainting) -class StableDiffusionXLInpaintLoopAfterDenoiser(PipelineBlock): +class StableDiffusionXLInpaintLoopAfterDenoiser(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -542,11 +527,6 @@ class StableDiffusionXLInpaintLoopAfterDenoiser(PipelineBlock): def inputs(self) -> List[Tuple[str, Any]]: return [ InputParam("eta", default=0.0), - ] - - @property - def intermediate_inputs(self) -> List[str]: - return [ InputParam("generator"), InputParam( "timesteps", @@ -660,7 +640,7 @@ class StableDiffusionXLDenoiseLoopWrapper(LoopSequentialPipelineBlocks): ] @property - def loop_intermediate_inputs(self) -> List[InputParam]: + def loop_inputs(self) -> List[InputParam]: return [ InputParam( "timesteps", diff --git a/src/diffusers/modular_pipelines/stable_diffusion_xl/encoders.py b/src/diffusers/modular_pipelines/stable_diffusion_xl/encoders.py index bd0e962140..8926d6c1fb 100644 --- a/src/diffusers/modular_pipelines/stable_diffusion_xl/encoders.py +++ b/src/diffusers/modular_pipelines/stable_diffusion_xl/encoders.py @@ -35,7 +35,7 @@ from ...utils import ( scale_lora_layers, unscale_lora_layers, ) -from ..modular_pipeline import PipelineBlock, PipelineState +from ..modular_pipeline import ModularPipelineBlocks, PipelineState from ..modular_pipeline_utils import ComponentSpec, ConfigSpec, InputParam, OutputParam from .modular_pipeline import StableDiffusionXLModularPipeline @@ -57,7 +57,7 @@ def retrieve_latents( raise AttributeError("Could not access latents of provided encoder_output") -class StableDiffusionXLIPAdapterStep(PipelineBlock): +class StableDiffusionXLIPAdapterStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -215,7 +215,7 @@ class StableDiffusionXLIPAdapterStep(PipelineBlock): return components, state -class StableDiffusionXLTextEncoderStep(PipelineBlock): +class StableDiffusionXLTextEncoderStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -576,7 +576,7 @@ class StableDiffusionXLTextEncoderStep(PipelineBlock): return components, state -class StableDiffusionXLVaeEncoderStep(PipelineBlock): +class StableDiffusionXLVaeEncoderStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -601,11 +601,6 @@ class StableDiffusionXLVaeEncoderStep(PipelineBlock): InputParam("image", required=True), InputParam("height"), InputParam("width"), - ] - - @property - def intermediate_inputs(self) -> List[InputParam]: - return [ InputParam("generator"), InputParam("dtype", type_hint=torch.dtype, description="Data type of model tensor inputs"), InputParam( @@ -691,7 +686,7 @@ class StableDiffusionXLVaeEncoderStep(PipelineBlock): return components, state -class StableDiffusionXLInpaintVaeEncoderStep(PipelineBlock): +class StableDiffusionXLInpaintVaeEncoderStep(ModularPipelineBlocks): model_name = "stable-diffusion-xl" @property @@ -726,11 +721,6 @@ class StableDiffusionXLInpaintVaeEncoderStep(PipelineBlock): InputParam("image", required=True), InputParam("mask_image", required=True), InputParam("padding_mask_crop"), - ] - - @property - def intermediate_inputs(self) -> List[InputParam]: - return [ InputParam("dtype", type_hint=torch.dtype, description="The dtype of the model inputs"), InputParam("generator"), ] diff --git a/src/diffusers/modular_pipelines/stable_diffusion_xl/modular_pipeline.py b/src/diffusers/modular_pipelines/stable_diffusion_xl/modular_pipeline.py index fc030fae56..0ee37f5201 100644 --- a/src/diffusers/modular_pipelines/stable_diffusion_xl/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/stable_diffusion_xl/modular_pipeline.py @@ -247,10 +247,6 @@ SDXL_INPUTS_SCHEMA = { "control_mode": InputParam( "control_mode", type_hint=List[int], required=True, description="Control mode for union controlnet" ), -} - - -SDXL_INTERMEDIATE_INPUTS_SCHEMA = { "prompt_embeds": InputParam( "prompt_embeds", type_hint=torch.Tensor, @@ -271,13 +267,6 @@ SDXL_INTERMEDIATE_INPUTS_SCHEMA = { "preprocess_kwargs": InputParam( "preprocess_kwargs", type_hint=Optional[dict], description="Kwargs for ImageProcessor" ), - "latents": InputParam( - "latents", type_hint=torch.Tensor, required=True, description="Initial latents for denoising process" - ), - "timesteps": InputParam("timesteps", type_hint=torch.Tensor, required=True, description="Timesteps for inference"), - "num_inference_steps": InputParam( - "num_inference_steps", type_hint=int, required=True, description="Number of denoising steps" - ), "latent_timestep": InputParam( "latent_timestep", type_hint=torch.Tensor, required=True, description="Initial noise level timestep" ), diff --git a/tests/modular_pipelines/stable_diffusion_xl/test_modular_pipeline_stable_diffusion_xl.py b/tests/modular_pipelines/stable_diffusion_xl/test_modular_pipeline_stable_diffusion_xl.py index 2e7c90d8e4..b199e42f98 100644 --- a/tests/modular_pipelines/stable_diffusion_xl/test_modular_pipeline_stable_diffusion_xl.py +++ b/tests/modular_pipelines/stable_diffusion_xl/test_modular_pipeline_stable_diffusion_xl.py @@ -119,11 +119,10 @@ class SDXLModularIPAdapterTests: _ = blocks.sub_blocks.pop("ip_adapter") parameters = blocks.input_names - intermediate_parameters = blocks.intermediate_input_names assert "ip_adapter_image" not in parameters, ( "`ip_adapter_image` argument must be removed from the `__call__` method" ) - assert "ip_adapter_image_embeds" not in intermediate_parameters, ( + assert "ip_adapter_image_embeds" not in parameters, ( "`ip_adapter_image_embeds` argument must be supported by the `__call__` method" ) diff --git a/tests/modular_pipelines/test_modular_pipelines_common.py b/tests/modular_pipelines/test_modular_pipelines_common.py index 24c36b9fa9..6848e1906d 100644 --- a/tests/modular_pipelines/test_modular_pipelines_common.py +++ b/tests/modular_pipelines/test_modular_pipelines_common.py @@ -139,7 +139,6 @@ class ModularPipelineTesterMixin: def test_pipeline_call_signature(self): pipe = self.get_pipeline() input_parameters = pipe.blocks.input_names - intermediate_parameters = pipe.blocks.intermediate_input_names optional_parameters = pipe.default_call_parameters def _check_for_parameters(parameters, expected_parameters, param_type): @@ -149,7 +148,6 @@ class ModularPipelineTesterMixin: ) _check_for_parameters(self.params, input_parameters, "input") - _check_for_parameters(self.intermediate_params, intermediate_parameters, "intermediate") _check_for_parameters(self.optional_params, optional_parameters, "optional") def test_inference_batch_consistent(self, batch_sizes=[2], batch_generator=True):