1
0
mirror of https://github.com/vladmandic/sdnext.git synced 2026-01-27 15:02:48 +03:00
Files
sdnext/modules/api/server.py
2026-01-13 02:08:11 -08:00

183 lines
7.2 KiB
Python

import os
import time
from typing import Any
from fastapi import Request, Depends
from fastapi.exceptions import HTTPException
from fastapi.responses import FileResponse
from modules import shared
from modules.api import models, helpers
def post_shutdown():
shared.log.info('Shutdown request received')
import sys
sys.exit(0)
def get_js(request: Request):
file = request.query_params.get("file", None)
if (file is None) or (len(file) == 0):
raise HTTPException(status_code=400, detail="file parameter is required")
ext = file.split('.')[-1]
if ext not in ['js', 'css', 'map', 'html', 'wasm', 'ttf', 'mjs', 'json']:
raise HTTPException(status_code=400, detail=f"invalid file extension: {ext}")
if not os.path.exists(file):
shared.log.error(f"API: file not found: {file}")
raise HTTPException(status_code=404, detail=f"file not found: {file}")
if ext in ['js', 'mjs']:
media_type = 'application/javascript'
elif ext in ['map', 'json']:
media_type = 'application/json'
elif ext in ['css']:
media_type = 'text/css'
elif ext in ['html']:
media_type = 'text/html'
elif ext in ['wasm']:
media_type = 'application/wasm'
elif ext in ['ttf']:
media_type = 'font/ttf'
else:
media_type = 'application/octet-stream'
return FileResponse(file, media_type=media_type)
def get_motd():
import requests
motd = ''
ver = shared.get_version()
if ver.get('updated', None) is not None:
motd = f"version <b>{ver['commit']} {ver['updated']}</b> <span style='color: var(--primary-500)'>{ver['url'].split('/')[-1]}</span><br>" # pylint: disable=use-maxsplit-arg
if shared.opts.motd:
try:
res = requests.get('https://vladmandic.github.io/sdnext/motd', timeout=3)
if res.status_code == 200:
msg = (res.text or '').strip()
shared.log.info(f'MOTD: {msg if len(msg) > 0 else "N/A"}')
motd += res.text
else:
shared.log.error(f'MOTD: {res.status_code}')
except Exception as err:
shared.log.error(f'MOTD: {err}')
return motd
def get_version():
return shared.get_version()
def get_platform():
from installer import get_platform as installer_get_platform
from modules.loader import get_packages as loader_get_packages
return { **installer_get_platform(), **loader_get_packages() }
def get_log(req: models.ReqGetLog = Depends()):
lines = shared.log.buffer[:req.lines] if req.lines > 0 else shared.log.buffer.copy()
if req.clear:
shared.log.buffer.clear()
return lines
def post_log(req: models.ReqPostLog):
if req.message is not None:
shared.log.info(f'UI: {req.message}')
if req.debug is not None:
shared.log.debug(f'UI: {req.debug}')
if req.error is not None:
shared.log.error(f'UI: {req.error}')
return {}
def get_config():
options = {}
for k in shared.opts.data.keys():
if shared.opts.data_labels.get(k) is not None:
options.update({k: shared.opts.data.get(k, shared.opts.data_labels.get(k).default)})
else:
options.update({k: shared.opts.data.get(k, None)})
if 'sd_lyco' in options:
del options['sd_lyco']
if 'sd_lora' in options:
del options['sd_lora']
return options
def set_config(req: dict[str, Any]):
updated = []
for k, v in req.items():
updated.append({ k: shared.opts.set(k, v) })
shared.opts.save()
return { "updated": updated }
def get_cmd_flags():
return vars(shared.cmd_opts)
def get_history(req: models.ReqHistory = Depends()):
if req.id is not None and len(req.id) > 0:
res = [item for item in shared.state.state_history if item['id'] == req.id]
else:
res = shared.state.state_history
res = [models.ResHistory(**item) for item in res]
return res
def get_progress(req: models.ReqProgress = Depends()):
if shared.state.job_count == 0: # idle state
return models.ResProgress(id=shared.state.id, progress=0, eta_relative=0, state=shared.state.dict(), textinfo=shared.state.textinfo)
shared.state.do_set_current_image()
current_image = None
if shared.state.current_image and not req.skip_current_image:
current_image = helpers.encode_pil_to_base64(shared.state.current_image)
batch_x = max(shared.state.job_no, 0)
batch_y = max(shared.state.job_count, 1)
step_x = max(shared.state.sampling_step, 0)
prev_steps = max(shared.state.sampling_steps, 1)
while step_x > shared.state.sampling_steps:
shared.state.sampling_steps += prev_steps
step_y = max(shared.state.sampling_steps, 1)
current = step_y * batch_x + step_x
total = step_y * batch_y
progress = min((current / total) if current > 0 and total > 0 else 0, 1)
time_since_start = time.time() - shared.state.time_start
eta_relative = (time_since_start / progress) - time_since_start if progress > 0 else 0
# shared.log.critical(f'get_progress: batch {batch_x}/{batch_y} step {step_x}/{step_y} current {current}/{total} time={time_since_start} eta={eta_relative}')
# shared.log.critical(shared.state)
res = models.ResProgress(id=shared.state.id, progress=round(progress, 2), eta_relative=round(eta_relative, 2), current_image=current_image, textinfo=shared.state.textinfo, state=shared.state.dict(), )
return res
def get_status():
return shared.state.status()
def post_interrupt():
shared.state.interrupt()
return {}
def post_skip():
shared.state.skip()
def get_memory():
try:
import psutil
process = psutil.Process(os.getpid())
res = process.memory_info() # only rss is cross-platform guaranteed so we dont rely on other values
ram_total = 100 * res.rss / process.memory_percent() # and total memory is calculated as actual value is not cross-platform safe
ram = { 'free': ram_total - res.rss, 'used': res.rss, 'total': ram_total }
except Exception as err:
ram = { 'error': f'{err}' }
try:
import torch
if torch.cuda.is_available():
s = torch.cuda.mem_get_info()
system = { 'free': s[0], 'used': s[1] - s[0], 'total': s[1] }
s = dict(torch.cuda.memory_stats(shared.device))
allocated = { 'current': s['allocated_bytes.all.current'], 'peak': s['allocated_bytes.all.peak'] }
reserved = { 'current': s['reserved_bytes.all.current'], 'peak': s['reserved_bytes.all.peak'] }
active = { 'current': s['active_bytes.all.current'], 'peak': s['active_bytes.all.peak'] }
inactive = { 'current': s['inactive_split_bytes.all.current'], 'peak': s['inactive_split_bytes.all.peak'] }
warnings = { 'retries': s['num_alloc_retries'], 'oom': s['num_ooms'] }
cuda = {
'system': system,
'active': active,
'allocated': allocated,
'reserved': reserved,
'inactive': inactive,
'events': warnings,
}
else:
cuda = { 'error': 'unavailable' }
except Exception as err:
cuda = { 'error': f'{err}' }
return models.ResMemory(ram = ram, cuda = cuda)