mirror of
https://github.com/vladmandic/sdnext.git
synced 2026-01-27 15:02:48 +03:00
110 lines
3.4 KiB
Python
Executable File
110 lines
3.4 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""
|
|
use clip to interrogate image(s)
|
|
"""
|
|
|
|
import io
|
|
import base64
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import filetype
|
|
from PIL import Image
|
|
from util import log, Map
|
|
import sdapi
|
|
|
|
|
|
stats = { 'captions': {}, 'keywords': {} }
|
|
exclude = ['a', 'in', 'on', 'out', 'at', 'the', 'and', 'with', 'next', 'to', 'it', 'for', 'of', 'into', 'that']
|
|
|
|
|
|
def decode(encoding):
|
|
if encoding.startswith("data:image/"):
|
|
encoding = encoding.split(";")[1].split(",")[1]
|
|
return Image.open(io.BytesIO(base64.b64decode(encoding)))
|
|
|
|
|
|
def encode(f):
|
|
image = Image.open(f)
|
|
exif = image.getexif()
|
|
if image.mode == 'RGBA':
|
|
image = image.convert('RGB')
|
|
with io.BytesIO() as stream:
|
|
image.save(stream, 'JPEG', exif = exif)
|
|
values = stream.getvalue()
|
|
encoded = base64.b64encode(values).decode()
|
|
return encoded
|
|
|
|
|
|
def print_summary():
|
|
captions = dict(sorted(stats['captions'].items(), key=lambda x:x[1], reverse=True))
|
|
log.info({ 'caption stats': captions })
|
|
keywords = dict(sorted(stats['keywords'].items(), key=lambda x:x[1], reverse=True))
|
|
log.info({ 'keyword stats': keywords })
|
|
|
|
|
|
async def interrogate(f):
|
|
if not filetype.is_image(f):
|
|
log.info({ 'interrogate skip': f })
|
|
return
|
|
json = Map({ 'image': encode(f) })
|
|
log.info({ 'interrogate': f })
|
|
# run clip
|
|
json.model = 'clip'
|
|
res = await sdapi.post('/sdapi/v1/interrogate', json)
|
|
caption = ""
|
|
style = ""
|
|
if 'caption' in res:
|
|
caption = res.caption
|
|
log.info({ 'interrogate caption': caption })
|
|
if ', by' in caption:
|
|
style = caption.split(', by')[1].strip()
|
|
log.info({ 'interrogate style': style })
|
|
for word in caption.split(' '):
|
|
if word not in exclude:
|
|
stats['captions'][word] = stats['captions'][word] + 1 if word in stats['captions'] else 1
|
|
else:
|
|
log.error({ 'interrogate clip error': res })
|
|
# run booru
|
|
json.model = 'deepdanbooru'
|
|
res = await sdapi.post('/sdapi/v1/interrogate', json)
|
|
keywords = {}
|
|
if 'caption' in res:
|
|
for term in res.caption.split(', '):
|
|
term = term.replace('(', '').replace(')', '').replace('\\', '').split(':')
|
|
if len(term) < 2:
|
|
continue
|
|
keywords[term[0]] = term[1]
|
|
keywords = dict(sorted(keywords.items(), key=lambda x:x[1], reverse=True))
|
|
for word in keywords.items():
|
|
stats['keywords'][word[0]] = stats['keywords'][word[0]] + 1 if word[0] in stats['keywords'] else 1
|
|
log.info({ 'interrogate keywords': keywords })
|
|
else:
|
|
log.error({ 'interrogate booru error': res })
|
|
return caption, keywords, style
|
|
|
|
|
|
async def main():
|
|
sys.argv.pop(0)
|
|
await sdapi.session()
|
|
if len(sys.argv) == 0:
|
|
log.error({ 'interrogate': 'no files specified' })
|
|
for arg in sys.argv:
|
|
if os.path.exists(arg):
|
|
if os.path.isfile(arg):
|
|
await interrogate(arg)
|
|
elif os.path.isdir(arg):
|
|
for root, _dirs, files in os.walk(arg):
|
|
for f in files:
|
|
_caption, _keywords, _style = await interrogate(os.path.join(root, f))
|
|
else:
|
|
log.error({ 'interrogate unknown file type': arg })
|
|
else:
|
|
log.error({ 'interrogate file missing': arg })
|
|
await sdapi.close()
|
|
print_summary()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|