1
0
mirror of https://github.com/vladmandic/sdnext.git synced 2026-01-27 15:02:48 +03:00
Files
sdnext/modules/video_models/video_run.py
vladmandic 84ad7bf20b improve wrap_gradio_call
Signed-off-by: vladmandic <mandic00@live.com>
2026-01-20 18:56:49 +01:00

185 lines
8.7 KiB
Python

import os
import copy
import time
from modules import shared, errors, sd_models, processing, devices, images, ui_common
from modules.video_models import models_def, video_utils, video_load, video_vae, video_overrides, video_save, video_prompt
from modules.paths import resolve_output_path
debug = shared.log.trace if os.environ.get('SD_VIDEO_DEBUG', None) is not None else lambda *args, **kwargs: None
def generate(*args, **kwargs):
task_id, ui_state, engine, model, prompt, negative, styles, width, height, frames, steps, sampler_index, sampler_shift, dynamic_shift, seed, guidance_scale, guidance_true, init_image, init_strength, last_image, vae_type, vae_tile_frames, mp4_fps, mp4_interpolate, mp4_codec, mp4_ext, mp4_opt, mp4_video, mp4_frames, mp4_sf, vlm_enhance, vlm_model, vlm_system_prompt, override_settings = args
if engine is None or model is None or engine == 'None' or model == 'None':
return video_utils.queue_err('model not selected')
# videojob = shared.state.begin('Video')
found = [model.name for model in models_def.models.get(engine, [])]
selected: models_def.Model = [m for m in models_def.models[engine] if m.name == model][0] if len(found) > 0 else None
if not shared.sd_loaded:
debug('Video: model not yet loaded')
video_load.load_model(selected)
if selected.name != video_load.loaded_model:
debug('Video: force reload')
video_load.load_model(selected)
if not shared.sd_loaded:
debug('Video: model still not loaded')
return video_utils.queue_err('model not loaded')
debug(f'Video generate: task={task_id} args={args} kwargs={kwargs}')
p = processing.StableDiffusionProcessingVideo(
sd_model=shared.sd_model,
video_engine=engine,
video_model=model,
prompt=prompt,
negative_prompt=negative,
styles=styles,
seed=int(seed),
sampler_name = processing.get_sampler_name(sampler_index),
sampler_shift=float(sampler_shift),
steps=int(steps),
width=16 * int(width // 16),
height=16 * int(height // 16),
frames=int(frames),
denoising_strength=float(init_strength),
init_image=init_image,
cfg_scale=float(guidance_scale),
pag_scale=float(guidance_true),
vae_type=vae_type,
vae_tile_frames=int(vae_tile_frames),
override_settings=override_settings,
)
if p.vae_type == 'Remote' and not selected.vae_remote:
shared.log.warning(f'Video: model={selected.name} remote vae not supported')
p.vae_type = 'Default'
p.scripts = None
p.script_args = None
p.state = ui_state
p.do_not_save_grid = True
p.do_not_save_samples = not mp4_frames
p.outpath_samples = resolve_output_path(shared.opts.outdir_samples, shared.opts.outdir_video)
if 'T2V' in model:
if init_image is not None:
shared.log.warning('Video: op=T2V init image not supported')
elif 'I2V' in model:
if init_image is None:
return video_utils.queue_err('init image not set')
p.task_args['image'] = images.resize_image(resize_mode=2, im=init_image, width=p.width, height=p.height, upscaler_name=None, output_type='pil')
shared.log.debug(f'Video: op=I2V init={init_image} resized={p.task_args["image"]}')
elif 'FLF2V' in model:
if init_image is None:
return video_utils.queue_err('init image not set')
if last_image is None:
return video_utils.queue_err('last image not set')
p.task_args['image'] = images.resize_image(resize_mode=2, im=init_image, width=p.width, height=p.height, upscaler_name=None, output_type='pil')
p.task_args['last_image'] = images.resize_image(resize_mode=2, im=last_image, width=p.width, height=p.height, upscaler_name=None, output_type='pil')
shared.log.debug(f'Video: op=FLF2V init={init_image} last={last_image} resized={p.task_args["image"]}')
elif 'VACE' in model:
if init_image is not None:
p.task_args['reference_images'] = [images.resize_image(resize_mode=2, im=init_image, width=p.width, height=p.height, upscaler_name=None, output_type='pil')]
shared.log.debug(f'Video: op=VACE reference={init_image} resized={p.task_args["reference_images"]}')
elif 'Animate' in model:
if init_image is None:
return video_utils.queue_err('init image not set')
p.task_args['image'] = images.resize_image(resize_mode=2, im=init_image, width=p.width, height=p.height, upscaler_name=None, output_type='pil')
p.task_args['mode'] = 'animate'
p.task_args['pose_video'] = [] # input pose video to condition the generation on. must be a list of PIL images.
p.task_args['face_video'] = [] # input face video to condition the generation on. must be a list of PIL images.
shared.log.debug(f'Video: op=Animate init={p.task_args["image"]} pose={p.task_args["pose_video"]} face={p.task_args["face_video"]}')
else:
shared.log.warning(f'Video: unknown model type "{model}"')
# cleanup memory
shared.sd_model = sd_models.apply_balanced_offload(shared.sd_model)
devices.torch_gc(force=True, reason='video')
prompt = video_prompt.prepare_prompt(p, init_image, prompt, vlm_enhance, vlm_model, vlm_system_prompt)
# set args
processing.fix_seed(p)
video_vae.set_vae_params(p)
video_utils.set_prompt(p)
p.task_args['num_inference_steps'] = p.steps
p.task_args['width'] = p.width
p.task_args['height'] = p.height
p.task_args['output_type'] = 'latent' if (p.vae_type == 'Remote') else 'pil'
p.ops.append('video')
# set scheduler params
orig_dynamic_shift = shared.opts.schedulers_dynamic_shift
orig_sampler_shift = shared.opts.schedulers_shift
shared.opts.data['schedulers_dynamic_shift'] = dynamic_shift
shared.opts.data['schedulers_shift'] = sampler_shift
if hasattr(shared.sd_model, 'scheduler') and hasattr(shared.sd_model.scheduler, 'config') and hasattr(shared.sd_model.scheduler, 'register_to_config'):
if hasattr(shared.sd_model.scheduler.config, 'use_dynamic_shifting'):
shared.sd_model.scheduler.config.use_dynamic_shifting = dynamic_shift
shared.sd_model.scheduler.register_to_config(use_dynamic_shifting = dynamic_shift)
if hasattr(shared.sd_model.scheduler.config, 'flow_shift') and sampler_shift >= 0:
shared.sd_model.scheduler.config.flow_shift = sampler_shift
shared.sd_model.scheduler.register_to_config(flow_shift = sampler_shift)
shared.sd_model.default_scheduler = copy.deepcopy(shared.sd_model.scheduler)
video_overrides.set_overrides(p, selected)
debug(f'Video: task_args={p.task_args}')
if p.vae_type == 'Upscale':
video_load.load_upscale_vae()
elif hasattr(shared.sd_model, 'orig_vae'):
shared.sd_model.vae = shared.sd_model.orig_vae
# run processing
shared.state.disable_preview = True
shared.log.debug(f'Video: cls={shared.sd_model.__class__.__name__} width={p.width} height={p.height} frames={p.frames} steps={p.steps}')
err = None
t0 = time.time()
processed = None
try:
processed = processing.process_images(p)
except Exception as e:
err = str(e)
errors.display(e, 'video')
t1 = time.time()
shared.state.disable_preview = False
shared.opts.data['schedulers_dynamic_shift'] = orig_dynamic_shift
shared.opts.data['schedulers_shift'] = orig_sampler_shift
p.close()
# done
if err:
return video_utils.queue_err(err)
if processed is None or (len(processed.images) == 0 and processed.bytes is None):
return video_utils.queue_err('processing failed')
shared.log.info(f'Video: name="{selected.name}" cls={shared.sd_model.__class__.__name__} frames={len(processed.images)} time={t1-t0:.2f}')
if hasattr(processed, 'images') and processed.images is not None:
pixels = video_save.images_to_tensor(processed.images)
else:
pixels = None
if hasattr(processed, 'audio') and processed.audio is not None:
audio = processed.audio[0].float().cpu()
else:
audio = None
_num_frames, video_file = video_save.save_video(
p=p,
pixels=pixels,
audio=audio,
binary=processed.bytes,
mp4_fps=mp4_fps,
mp4_codec=mp4_codec,
mp4_opt=mp4_opt,
mp4_ext=mp4_ext,
mp4_sf=mp4_sf,
mp4_video=mp4_video,
mp4_frames=mp4_frames,
mp4_interpolate=mp4_interpolate,
metadata={},
)
if not mp4_frames:
processed.images = []
generation_info_js = processed.js() if processed is not None else ''
# shared.state.end(videojob)
return processed.images, video_file, generation_info_js, processed.info, ui_common.plaintext_to_html(processed.comments)