import os import sys import html import threading import time import cProfile from modules import shared, progress, errors, timer queue_lock = threading.Lock() debug = os.environ.get('SD_QUEUE_DEBUG', None) is not None def get_lock(): if debug: fn = f'{sys._getframe(3).f_code.co_name}:{sys._getframe(2).f_code.co_name}:{sys._getframe(1).f_code.co_name}' # pylint: disable=protected-access errors.log.debug(f'Queue: fn={fn} lock={queue_lock.locked()}') return queue_lock def wrap_queued_call(func): def f(*args, **kwargs): with get_lock(): res = func(*args, **kwargs) return res return f def wrap_gradio_gpu_call(func, extra_outputs=None, name=None): name = name or func.__name__ def f(*args, **kwargs): # if the first argument is a string that says "task(...)", it is treated as a job id if len(args) > 0 and type(args[0]) == str and args[0][0:5] == "task(" and args[0][-1] == ")": id_task = args[0] progress.add_task_to_queue(id_task) else: id_task = None with get_lock(): progress.start_task(id_task) try: res = func(*args, **kwargs) progress.record_results(id_task, res) except Exception as e: shared.log.error(f"Exception: {e}") shared.log.error(f"Arguments: args={str(args)[:10240]} kwargs={str(kwargs)[:10240]}") errors.display(e, 'gradio call') res = extra_outputs or [] res.append(f"
{html.escape(str(e))}
") finally: progress.finish_task(id_task) return res return wrap_gradio_call(f, extra_outputs=extra_outputs, add_stats=True, name=name) def wrap_gradio_call(func, extra_outputs=None, add_stats=False, name=None): job_name = name if name is not None else func.__name__ def f(*args, extra_outputs_array=extra_outputs, **kwargs): t = time.perf_counter() shared.mem_mon.reset() if len(args) > 0 and type(args[0]) == str and args[0][0:5] == "task(" and args[0][-1] == ")": task_id = args[0] else: task_id = 0 jobid = shared.state.begin(job_name, task_id=task_id) try: if shared.cmd_opts.profile: pr = cProfile.Profile() pr.enable() res = func(*args, **kwargs) if res is None: msg = "No result returned from function" shared.log.warning(msg) res = extra_outputs_array or [] res.append(f"
{html.escape(msg)}
") else: res = list(res) if shared.cmd_opts.profile: pr.disable() errors.profile(pr, 'Wrap') except Exception as e: errors.display(e, 'gradio call') res = extra_outputs_array or [] res.append(f"
{html.escape(type(e).__name__+': '+str(e))}
") shared.state.end(jobid) if not add_stats: return tuple(res) elapsed = time.perf_counter() - t elapsed_m = int(elapsed // 60) elapsed_s = elapsed % 60 elapsed_text = f"{elapsed_m}m {elapsed_s:.2f}s" if elapsed_m > 0 else f"{elapsed_s:.2f}s" summary = timer.process.summary(min_time=0.25, total=False).replace('=', ' ') memory = shared.mem_mon.summary() if isinstance(res, list) and isinstance(res[-1], str): res[-1] += f"

Time: {elapsed_text} | {summary} {memory}

" return tuple(res) return f