1
0
mirror of https://github.com/vladmandic/sdnext.git synced 2026-01-27 15:02:48 +03:00
Files
sdnext/modules/api/gallery.py
CalamitousFelicitousness 761ea1c327 feat(settings): add base path support for output folders
Change "Images folder" and "Grids folder" settings to act as base paths
that combine with specific folder settings, rather than replacing them.

- Add resolve_output_path() helper function to modules/paths.py
- Update all output path usages to use combined base + specific paths
- Update gallery API to return resolved paths with display labels
- Update gallery UI to show short labels with full path on hover

Example: If base is "C:\Database\" and specific is "outputs/text",
the resolved path becomes "C:\Database\outputs\text"

Edge cases handled:
- Empty base path: uses specific path directly (backward compatible)
- Absolute specific path: ignores base path
- Empty specific path: uses base path only
2026-01-16 16:24:05 +00:00

236 lines
10 KiB
Python

import io
import os
import time
import base64
from typing import List, Union
from urllib.parse import quote, unquote
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from starlette.websockets import WebSocket, WebSocketState
from pydantic import BaseModel, Field # pylint: disable=no-name-in-module
from PIL import Image
from modules import shared, images, files_cache, modelstats
from modules.paths import resolve_output_path
debug = shared.log.debug if os.environ.get('SD_BROWSER_DEBUG', None) is not None else lambda *args, **kwargs: None
OPTS_FOLDERS = [
"outdir_samples",
"outdir_txt2img_samples",
"outdir_img2img_samples",
"outdir_control_samples",
"outdir_extras_samples",
"outdir_save",
"outdir_video",
"outdir_init_images",
"outdir_grids",
"outdir_txt2img_grids",
"outdir_img2img_grids",
"outdir_control_grids",
]
### class definitions
class ReqFiles(BaseModel):
folder: str = Field(title="Folder")
### ws connection manager
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
agent = ws._headers.get("user-agent", "") # pylint: disable=protected-access
debug(f'Browser WS connect: client={ws.client.host} agent="{agent}"')
self.active.append(ws)
def disconnect(self, ws: WebSocket):
debug(f'Browser WS disconnect: client={ws.client.host}')
self.active.remove(ws)
async def send(self, ws: WebSocket, data: Union[str, dict, bytes]):
# debug(f'Browser WS send: client={ws.client.host} data={type(data)}')
if ws.client_state != WebSocketState.CONNECTED:
return
if isinstance(data, bytes):
await ws.send_bytes(data)
elif isinstance(data, dict):
await ws.send_json(data)
elif isinstance(data, str):
await ws.send_text(data)
else:
debug(f'Browser WS send: client={ws.client.host} data={type(data)} unknown')
async def broadcast(self, data: Union[str, dict, bytes]):
for ws in self.active:
await self.send(ws, data)
### api definitions
def register_api(app: FastAPI): # register api
manager = ConnectionManager()
def get_video_thumbnail(filepath):
from modules.video import get_video_params
try:
stat_size, stat_mtime = modelstats.stat(filepath)
frames, fps, duration, width, height, codec, frame = get_video_params(filepath, capture=True)
h = shared.opts.extra_networks_card_size
w = shared.opts.extra_networks_card_size if shared.opts.browser_fixed_width else width * h // height
frame = frame.convert('RGB')
frame.thumbnail((w, h), Image.Resampling.HAMMING)
buffered = io.BytesIO()
frame.save(buffered, format='jpeg')
data_url = f'data:image/jpeg;base64,{base64.b64encode(buffered.getvalue()).decode("ascii")}'
frame.close()
content = {
'exif': f'Codec: {codec}, Frames: {frames}, Duration: {duration:.2f} sec, FPS: {fps:.2f}',
'data': data_url,
'width': width,
'height': height,
'size': stat_size,
'mtime': stat_mtime.timestamp() * 1000, # JS timestamps use milliseconds
}
return content
except Exception as e:
shared.log.error(f'Gallery video: file="{filepath}" {e}')
return {}
def get_image_thumbnail(filepath):
try:
stat_size, stat_mtime = modelstats.stat(filepath)
image = Image.open(filepath)
geninfo, _items = images.read_info_from_image(image)
h = shared.opts.extra_networks_card_size
w = shared.opts.extra_networks_card_size if shared.opts.browser_fixed_width else image.width * h // image.height
width, height = image.width, image.height
image = image.convert('RGB')
image.thumbnail((w, h), Image.Resampling.HAMMING)
buffered = io.BytesIO()
image.save(buffered, format='jpeg')
data_url = f'data:image/jpeg;base64,{base64.b64encode(buffered.getvalue()).decode("ascii")}'
image.close()
content = {
'exif': geninfo,
'data': data_url,
'width': width,
'height': height,
'size': stat_size,
'mtime': stat_mtime.timestamp() * 1000, # JS timestamps use milliseconds
}
return content
except Exception as e:
shared.log.error(f'Gallery image: file="{filepath}" {e}')
return {}
# @app.get('/sdapi/v1/browser/folders', response_model=List[str])
def get_folders():
def make_folder(path, label=None):
"""Create folder entry with path and display label."""
if label is None:
label = os.path.basename(path) or path
return {"path": path, "label": label}
reference_dir = os.path.join('models', 'Reference')
base_samples = shared.opts.outdir_samples
base_grids = shared.opts.outdir_grids
# Build list of resolved output paths with labels
folders = []
if base_samples:
folders.append(make_folder(base_samples, os.path.basename(base_samples.rstrip('/\\'))))
if base_grids and base_grids != base_samples:
folders.append(make_folder(base_grids, os.path.basename(base_grids.rstrip('/\\'))))
# Use the specific folder setting values as labels (e.g., "outputs/text" -> "outputs/text")
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_txt2img_samples), shared.opts.outdir_txt2img_samples))
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_img2img_samples), shared.opts.outdir_img2img_samples))
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_control_samples), shared.opts.outdir_control_samples))
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_extras_samples), shared.opts.outdir_extras_samples))
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_save), shared.opts.outdir_save))
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_video), shared.opts.outdir_video))
folders.append(make_folder(resolve_output_path(base_samples, shared.opts.outdir_init_images), shared.opts.outdir_init_images))
folders.append(make_folder(resolve_output_path(base_grids, shared.opts.outdir_txt2img_grids), shared.opts.outdir_txt2img_grids))
folders.append(make_folder(resolve_output_path(base_grids, shared.opts.outdir_img2img_grids), shared.opts.outdir_img2img_grids))
folders.append(make_folder(resolve_output_path(base_grids, shared.opts.outdir_control_grids), shared.opts.outdir_control_grids))
# Custom browser folders and reference dir
for f in shared.opts.browser_folders.split(','):
f = f.strip()
if f:
folders.append(make_folder(f))
folders.append(make_folder(reference_dir, 'Reference'))
# Filter empty and duplicates (by path)
seen_paths = set()
unique_folders = []
for f in folders:
path = f["path"].strip()
if path and path not in seen_paths and os.path.isdir(path):
seen_paths.add(path)
unique_folders.append(f)
if shared.demo is not None and path not in shared.demo.allowed_paths:
debug(f'Browser folders allow: {path}')
shared.demo.allowed_paths.append(quote(path))
debug(f'Browser folders: {unique_folders}')
return JSONResponse(content=unique_folders)
# @app.get("/sdapi/v1/browser/thumb", response_model=dict)
async def get_thumb(file: str):
try:
decoded = unquote(file).replace('%3A', ':')
if decoded.lower().endswith('.mp4'):
return JSONResponse(content=get_video_thumbnail(decoded))
else:
return JSONResponse(content=get_image_thumbnail(decoded))
except Exception as e:
shared.log.error(f'Gallery: {file} {e}')
content = { 'error': str(e) }
return JSONResponse(content=content)
# @app.get("/sdapi/v1/browser/files", response_model=list)
async def ht_files(folder: str):
try:
t0 = time.time()
files = files_cache.directory_files(folder, recursive=True)
lines = []
for f in files:
file = os.path.relpath(f, folder)
msg = quote(folder) + '##F##' + quote(file)
msg = msg[:1] + ":" + msg[4:] if msg[1:4] == "%3A" else msg
lines.append(msg)
t1 = time.time()
shared.log.debug(f'Gallery: type=ht folder="{folder}" files={len(lines)} time={t1-t0:.3f}')
return lines
except Exception as e:
shared.log.error(f'Gallery: {folder} {e}')
return []
shared.api.add_api_route("/sdapi/v1/browser/folders", get_folders, methods=["GET"], response_model=List[str])
shared.api.add_api_route("/sdapi/v1/browser/thumb", get_thumb, methods=["GET"], response_model=dict)
shared.api.add_api_route("/sdapi/v1/browser/files", ht_files, methods=["GET"], response_model=list)
@app.websocket("/sdapi/v1/browser/files")
async def ws_files(ws: WebSocket):
try:
await manager.connect(ws)
folder = await ws.receive_text()
folder = unquote(folder).replace('%3A', ':')
t0 = time.time()
numFiles = 0
files = files_cache.list_files(folder, recursive=True)
# files = list(files_cache.directory_files(folder, recursive=True))
# files.sort(key=os.path.getmtime)
for f in files:
numFiles += 1
file = os.path.relpath(f, folder)
msg = quote(folder) + '##F##' + quote(file)
msg = msg[:1] + ":" + msg[4:] if msg[1:4] == "%3A" else msg
await manager.send(ws, msg)
await manager.send(ws, '#END#')
t1 = time.time()
shared.log.debug(f'Gallery: type=ws folder="{folder}" files={numFiles} time={t1-t0:.3f}')
except Exception as e:
debug(f'Browser WS error: {e}')
manager.disconnect(ws)