1
0
mirror of https://github.com/vladmandic/sdnext.git synced 2026-01-27 15:02:48 +03:00
Files
sdnext/modules/control/processor.py
Vladimir Mandic e31fff9354 fix flex2 img2img denoise strength
Signed-off-by: Vladimir Mandic <mandic00@live.com>
2025-08-31 11:31:28 -04:00

250 lines
14 KiB
Python

import os
import time
import hashlib
import numpy as np
from PIL import Image
from modules.processing_class import StableDiffusionProcessingControl
from modules import shared, images, masking, sd_models
from modules.timer import process as process_timer
from modules.control import util
from modules.control import processors as control_processors
debug = os.environ.get('SD_CONTROL_DEBUG', None) is not None
debug_log = shared.log.trace if debug else lambda *args, **kwargs: None
processors = [
'None',
'OpenPose',
'DWPose',
'MediaPipe Face',
'Canny',
'Edge',
'LineArt Realistic',
'LineArt Anime',
'HED',
'PidiNet',
'Midas Depth Hybrid',
'Leres Depth',
'Zoe Depth',
'Marigold Depth',
'Normal Bae',
'SegmentAnything',
'MLSD',
'Shuffle',
'DPT Depth Hybrid',
'GLPN Depth',
'Depth Anything',
]
def preprocess_image(
p:StableDiffusionProcessingControl,
pipe,
input_image:Image.Image = None,
init_image:Image.Image = None,
input_mask:Image.Image = None,
input_type:str = 0,
unit_type:str = 'controlnet',
active_process:list = [],
active_model:list = [],
selected_models:list = [],
has_models:bool = False,
):
t0 = time.time()
# run resize before
if p.resize_mode_before != 0 and p.resize_name_before != 'None':
if p.selected_scale_tab_before == 1 and input_image is not None:
p.width_before, p.height_before = int(input_image.width * p.scale_by_before), int(input_image.height * p.scale_by_before)
if input_image is not None:
debug_log(f'Control resize: op=before image={input_image} width={p.width_before} height={p.height_before} mode={p.resize_mode_before} name={p.resize_name_before} context="{p.resize_context_before}"')
p.init_img_hash = getattr(p, 'init_img_hash', hashlib.sha256(input_image.tobytes()).hexdigest()[0:8]) # pylint: disable=attribute-defined-outside-init
p.init_img_width = getattr(p, 'init_img_width', input_image.width) # pylint: disable=attribute-defined-outside-init
p.init_img_height = getattr(p, 'init_img_height', input_image.height) # pylint: disable=attribute-defined-outside-init
input_image = images.resize_image(p.resize_mode_before, input_image, p.width_before, p.height_before, p.resize_name_before, context=p.resize_context_before)
if input_image is not None and init_image is not None and init_image.size != input_image.size:
debug_log(f'Control resize init: image={init_image} target={input_image}')
init_image = images.resize_image(resize_mode=1, im=init_image, width=input_image.width, height=input_image.height)
if input_image is not None and p.override is not None and p.override.size != input_image.size:
debug_log(f'Control resize override: image={p.override} target={input_image}')
p.override = images.resize_image(resize_mode=1, im=p.override, width=input_image.width, height=input_image.height)
if input_image is not None:
p.width = input_image.width
p.height = input_image.height
debug_log(f'Control: input image={input_image}')
# run masking
if input_mask is not None:
p.extra_generation_params["Mask only"] = masking.opts.mask_only if masking.opts.mask_only else None
p.extra_generation_params["Mask auto"] = masking.opts.auto_mask if masking.opts.auto_mask != 'None' else None
p.extra_generation_params["Mask invert"] = masking.opts.invert if masking.opts.invert else None
p.extra_generation_params["Mask blur"] = masking.opts.mask_blur if masking.opts.mask_blur > 0 else None
p.extra_generation_params["Mask erode"] = masking.opts.mask_erode if masking.opts.mask_erode > 0 else None
p.extra_generation_params["Mask dilate"] = masking.opts.mask_dilate if masking.opts.mask_dilate > 0 else None
p.extra_generation_params["Mask model"] = masking.opts.model if masking.opts.model is not None else None
masked_image = masking.run_mask(input_image=input_image, input_mask=input_mask, return_type='Masked', invert=p.inpainting_mask_invert==1) if input_mask is not None else input_image
else:
masked_image = input_image
# resize mask
if input_mask is not None and p.resize_mode_mask != 0 and p.resize_name_mask != 'None':
if p.selected_scale_tab_mask == 1:
p.width_mask, p.height_mask = int(input_image.width * p.scale_by_mask), int(input_image.height * p.scale_by_mask)
p.width, p.height = p.width_mask, p.height_mask
debug_log(f'Control resize: op=mask image={input_mask} width={p.width_mask} height={p.height_mask} mode={p.resize_mode_mask} name={p.resize_name_mask} context="{p.resize_context_mask}"')
# run image processors
processed_images = []
blended_image = None
for i, process in enumerate(active_process): # list[image]
debug_log(f'Control: i={i+1} process="{process.processor_id}" input={masked_image} override={process.override}')
processed_image = process(
image_input=masked_image,
mode='RGB',
resize_mode=p.resize_mode_before,
resize_name=p.resize_name_before,
scale_tab=p.selected_scale_tab_before,
scale_by=p.scale_by_before,
)
if processed_image is not None:
processed_images.append(processed_image)
if shared.opts.control_unload_processor and process.processor_id is not None:
control_processors.config[process.processor_id]['dirty'] = True # to force reload
process.model = None
# blend processed images
debug_log(f'Control processed: {len(processed_images)}')
if len(processed_images) > 0:
try:
if len(p.extra_generation_params["Control process"]) == 0:
p.extra_generation_params["Control process"] = None
else:
p.extra_generation_params["Control process"] = ';'.join([p.processor_id for p in active_process if p.processor_id is not None])
except Exception:
pass
if any(img is None for img in processed_images):
shared.log.error('Control: one or more processed images are None')
processed_images = [img for img in processed_images if img is not None]
if len(processed_images) > 1 and len(active_process) != len(active_model):
processed_image = [np.array(i) for i in processed_images]
processed_image = util.blend(processed_image) # blend all processed images into one
processed_image = Image.fromarray(processed_image)
blended_image = processed_image
elif len(processed_images) == 1:
processed_image = processed_images
blended_image = processed_image[0]
else:
blended_image = [np.array(i) for i in processed_images]
blended_image = util.blend(blended_image) # blend all processed images into one
blended_image = Image.fromarray(blended_image)
if isinstance(selected_models, list) and len(processed_images) == len(selected_models) and len(processed_images) > 0:
debug_log(f'Control: inputs match: input={len(processed_images)} models={len(selected_models)}')
p.init_images = processed_images
elif isinstance(selected_models, list) and len(processed_images) != len(selected_models):
shared.log.error(f'Control: number of inputs does not match: input={len(processed_images)} models={len(selected_models)}')
elif selected_models is not None:
p.init_images = processed_image
else:
debug_log('Control processed: using input direct')
processed_image = input_image
# conditional assignment
possible = sd_models.get_call(pipe).keys()
if unit_type == 'reference' and has_models:
p.ref_image = p.override or input_image
p.task_args.pop('image', None)
p.task_args['ref_image'] = p.ref_image
debug_log(f'Control: process=None image={p.ref_image}')
if p.ref_image is None:
shared.log.error('Control: reference mode without image')
elif unit_type == 'controlnet' and has_models:
if input_type == 0: # Control only
if 'control_image' in possible:
p.task_args['control_image'] = [p.init_images] if isinstance(p.init_images, Image.Image) else p.init_images
elif 'image' in possible:
p.task_args['image'] = [p.init_images] if isinstance(p.init_images, Image.Image) else p.init_images
if 'control_mode' in possible:
p.task_args['control_mode'] = getattr(p, 'control_mode', None)
if 'strength' in possible:
p.task_args['strength'] = p.denoising_strength
p.init_images = None
elif input_type == 1: # Init image same as control
p.init_images = [p.override or input_image] * max(1, len(active_model))
if 'inpaint_image' in possible: # flex
p.task_args['inpaint_image'] = p.init_images[0] if isinstance(p.init_images, list) else p.init_images
p.task_args['inpaint_mask'] = Image.new('L', p.task_args['inpaint_image'].size, int(p.denoising_strength * 255))
p.task_args['control_image'] = p.init_images[0] if isinstance(p.init_images, list) else p.init_images
p.task_args['width'] = p.width
p.task_args['height'] = p.height
elif 'control_image' in possible:
p.task_args['control_image'] = p.init_images # switch image and control_image
if 'control_mode' in possible:
p.task_args['control_mode'] = getattr(p, 'control_mode', None)
if 'strength' in possible:
p.task_args['strength'] = p.denoising_strength
elif input_type == 2: # Separate init image
if init_image is None:
shared.log.warning('Control: separate init image not provided')
init_image = input_image
if 'inpaint_image' in possible: # flex
p.task_args['inpaint_image'] = p.init_images[0] if isinstance(p.init_images, list) else p.init_images
p.task_args['inpaint_mask'] = Image.new('L', p.task_args['inpaint_image'].size, int(p.denoising_strength * 255))
p.task_args['control_image'] = p.init_images[0] if isinstance(p.init_images, list) else p.init_images
p.task_args['width'] = p.width
p.task_args['height'] = p.height
elif 'control_image' in possible:
p.task_args['control_image'] = p.init_images # switch image and control_image
if 'control_mode' in possible:
p.task_args['control_mode'] = getattr(p, 'control_mode', None)
if 'strength' in possible:
p.task_args['strength'] = p.denoising_strength
p.init_images = [init_image] * len(active_model)
if hasattr(shared.sd_model, 'controlnet') and hasattr(p.task_args, 'control_image') and len(p.task_args['control_image']) > 1 and (shared.sd_model.__class__.__name__ == 'StableDiffusionXLControlNetUnionPipeline'): # special case for controlnet-union
p.task_args['control_image'] = [[x] for x in p.task_args['control_image']]
p.task_args['control_mode'] = [[x] for x in p.task_args['control_mode']]
# determine txt2img, img2img, inpaint pipeline
if unit_type == 'reference' and has_models: # special case
p.is_control = True
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.TEXT_2_IMAGE)
elif not has_models: # run in txt2img/img2img/inpaint mode
if input_mask is not None:
p.task_args['strength'] = p.denoising_strength
p.image_mask = input_mask
p.init_images = input_image if isinstance(input_image, list) else [input_image]
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.INPAINTING)
elif processed_image is not None:
p.init_images = processed_image if isinstance(processed_image, list) else [processed_image]
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.IMAGE_2_IMAGE)
else:
p.init_hr(p.scale_by, p.resize_name, force=True)
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.TEXT_2_IMAGE)
elif has_models: # actual control
p.is_control = True
if input_mask is not None:
p.task_args['strength'] = p.denoising_strength
p.image_mask = input_mask
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.INPAINTING) # only controlnet supports inpaint
if hasattr(p, 'init_images') and p.init_images is not None:
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.IMAGE_2_IMAGE) # only controlnet supports img2img
else:
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.TEXT_2_IMAGE)
if hasattr(p, 'init_images') and p.init_images is not None and 'image' in possible:
p.task_args['image'] = p.init_images # need to set explicitly for txt2img
p.init_images = None
if unit_type == 'lite':
if input_type == 0:
shared.sd_model = sd_models.set_diffuser_pipe(shared.sd_model, sd_models.DiffusersTaskType.TEXT_2_IMAGE)
shared.sd_model.no_task_switch = True
elif input_type == 1:
p.init_images = [input_image]
elif input_type == 2:
if init_image is None:
shared.log.warning('Control: separate init image not provided')
init_image = input_image
p.init_images = [init_image]
t1 = time.time()
process_timer.add('proc', t1-t0)
return processed_image, blended_image