1
0
mirror of https://github.com/vladmandic/sdnext.git synced 2026-01-27 15:02:48 +03:00
Files
sdnext/cli/api-control.py
Vladimir Mandic 3ae10dd0e1 add nvidia-chronoedit
Signed-off-by: Vladimir Mandic <mandic00@live.com>
2025-10-30 19:52:29 -04:00

176 lines
6.9 KiB
Python
Executable File

#!/usr/bin/env python
# example: api-control.py --prompt "anime girl" --control "Canny:Canny:1.0:0.1:0.9:/home/vlado/generative/Samples/anime1.jpg,None:Depth:0.9:0.0:1.0:/home/vlado/generative/Samples/anime1.jpg" --hires --detailer --output /tmp/anime.jpg
import os
import io
import time
import base64
import logging
import argparse
import requests
import urllib3
from PIL import Image
sd_url = os.environ.get('SDAPI_URL', "http://127.0.0.1:7860")
sd_username = os.environ.get('SDAPI_USR', None)
sd_password = os.environ.get('SDAPI_PWD', None)
logging.basicConfig(level = logging.INFO, format = '%(asctime)s %(levelname)s: %(message)s')
log = logging.getLogger(__name__)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
options = {
"save_images": False,
"send_images": True,
}
def auth():
if sd_username is not None and sd_password is not None:
return requests.auth.HTTPBasicAuth(sd_username, sd_password)
return None
def post(endpoint: str, dct: dict = None):
req = requests.post(f'{sd_url}{endpoint}', json = dct, timeout=300, verify=False, auth=auth())
if req.status_code != 200:
return { 'error': req.status_code, 'reason': req.reason, 'url': req.url }
else:
return req.json()
def encode(f):
image = Image.open(f)
if image.mode == 'RGBA':
image = image.convert('RGB')
with io.BytesIO() as stream:
image.save(stream, 'JPEG')
image.close()
values = stream.getvalue()
encoded = base64.b64encode(values).decode()
return encoded
def generate(args): # pylint: disable=redefined-outer-name
t0 = time.time()
if args.model is not None:
post('/sdapi/v1/options', { 'sd_model_checkpoint': args.model })
post('/sdapi/v1/reload-checkpoint') # needed if running in api-only to trigger new model load
if args.init is not None:
options['inits'] = [encode(args.init)]
image = Image.open(args.init)
options['width'] = image.width
options['height'] = image.height
image.close()
if args.input is not None:
options['inputs'] = [encode(args.input)]
image = Image.open(args.input)
options['width'] = image.width
options['height'] = image.height
image.close()
options['prompt'] = args.prompt
options['negative_prompt'] = args.negative
options['steps'] = int(args.steps)
options['seed'] = int(args.seed)
if args.sampler is not None:
options['sampler_name'] = args.sampler
if args.control is not None:
if args.type is not None:
options['unit_type'] = args.type
options['control'] = []
for control in args.control.split(','):
u = control.split(':')
if len(u) < 2:
log.error(f'invalid control: {control}')
continue
options['control'].append({
'process': u[0].strip(),
'model': u[1].strip(),
'strength': float(u[2].strip()) if len(u) > 2 else 1.0,
'start': float(u[3].strip()) if len(u) > 3 else 0.0,
'end': float(u[4].strip()) if len(u) > 4 else 1.0,
'override': encode(u[5].strip()) if len(u) > 5 else None,
})
log.info(f'added control: {options["control"]}')
if args.ipadapter is not None:
options['ip_adapter'] = []
for ipadapter in args.ipadapter.split(','):
u = ipadapter.split(':')
if len(u) < 2:
log.error(f'invalid ipadapter: {ipadapter}')
continue
if not os.path.exists(u[1].strip()):
log.error(f'invalid ipadapter image: {u[1]}')
continue
options['ip_adapter'].append({
'adapter': u[0].strip(),
'images': [encode(u[1].strip())],
'scale': float(u[2].strip()) if len(u) > 2 else 1.0,
'start': float(u[3].strip()) if len(u) > 3 else 0.1,
'end': float(u[4].strip()) if len(u) > 4 else 1.0,
})
if args.mask is not None:
options['mask'] = encode(args.mask)
if args.detailer:
options['detailer_enabled'] = True
if args.hires:
options['enable_hr'] = True
options['hr_force'] = True
if args.upscaler is not None:
options['enable_hr'] = True
options['hr_force'] = True
options['hr_scale'] = 2
options['hr_resize_mode'] = 1
options['hr_upscaler'] = args.upscaler
data = post('/sdapi/v1/control', options)
t1 = time.time()
if 'info' in data:
log.info(f'info: {data["info"]}')
def get_image(encoded, output):
if not isinstance(encoded, list):
return
for i in range(len(encoded)):
b64 = encoded[i].split(',',1)[0]
info = data['info']
image = Image.open(io.BytesIO(base64.b64decode(b64)))
log.info(f'received image: size={image.size} time={t1-t0:.2f} info="{info}"')
if output:
image.save(output)
log.info(f'image saved: size={image.size} filename={output}')
if 'images' in data:
get_image(data['images'], args.output)
if 'processed' in data:
get_image(data['processed'], args.processed)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description = 'api-control')
parser.add_argument('--init', required=False, default=None, help='init image')
parser.add_argument('--input', required=False, default=None, help='input image')
parser.add_argument('--mask', required=False, help='mask image')
parser.add_argument('--prompt', required=False, default='', help='prompt text')
parser.add_argument('--negative', required=False, default='', help='negative prompt text')
parser.add_argument('--steps', required=False, default=20, help='number of steps')
parser.add_argument('--seed', required=False, default=-1, help='initial seed')
parser.add_argument('--sampler', required=False, default=None, help='sampler name')
parser.add_argument('--output', required=False, default=None, help='output image file')
parser.add_argument('--processed', required=False, default=None, help='processed output file')
parser.add_argument('--model', required=False, help='model name')
parser.add_argument('--type', required=False, default="controlnet", help='control type')
parser.add_argument('--control', required=False, help='control units')
parser.add_argument('--ipadapter', required=False, help='ipadapter units')
parser.add_argument('--detailer', required=False, default=False, action='store_true', help='force detailer')
parser.add_argument('--hires', required=False, default=False, action='store_true', help='force hires')
parser.add_argument('--upscaler', required=False, default=None, help='upscaler name')
args = parser.parse_args()
log.info(f'api-control: {args}')
generate(args)