1
0
mirror of https://github.com/vladmandic/sdnext.git synced 2026-01-27 15:02:48 +03:00
Files
sdnext/cli/image-watermark.py
Vladimir Mandic ad87d1eb07 watermark alphablending
Signed-off-by: Vladimir Mandic <mandic00@live.com>
2024-11-01 14:50:36 -04:00

131 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python
import os
import io
import pathlib
import argparse
import filetype
import numpy as np
from imwatermark import WatermarkEncoder, WatermarkDecoder
from PIL import Image
from PIL.ExifTags import TAGS
from PIL.TiffImagePlugin import ImageFileDirectory_v2
from util import log, Map
import piexif
import piexif.helper
options = Map({ 'method': 'dwtDctSvd', 'type': 'bytes' })
def get_exif(image):
# using piexif
res1 = {}
try:
exif = piexif.load(image.info["exif"])
exif = exif.get("Exif", {})
for k, v in exif.items():
key = list(vars(piexif.ExifIFD).keys())[list(vars(piexif.ExifIFD).values()).index(k)]
res1[key] = piexif.helper.UserComment.load(v)
except Exception:
pass
# using pillow
res2 = {}
try:
res2 = { TAGS[k]: v for k, v in image.getexif().items() if k in TAGS }
except Exception:
pass
return {**res1, **res2}
def set_exif(d: dict):
ifd = ImageFileDirectory_v2()
_TAGS = {v: k for k, v in TAGS.items()} # enumerate possible exif tags
for k, v in d.items():
ifd[_TAGS[k]] = v
exif_stream = io.BytesIO()
ifd.save(exif_stream)
encoded = b'Exif\x00\x00' + exif_stream.getvalue()
return encoded
def get_watermark(image, params):
data = np.asarray(image)
decoder = WatermarkDecoder(options.type, params.length)
decoded = decoder.decode(data, options.method)
wm = decoded.decode(encoding='ascii', errors='ignore')
return wm
def set_watermark(image, params):
data = np.asarray(image)
encoder = WatermarkEncoder()
length = params.length // 8
text = f"{params.wm:<{length}}"[:length]
bytearr = text.encode(encoding='ascii', errors='ignore')
encoder.set_watermark(options.type, bytearr)
encoded = encoder.encode(data, options.method)
image = Image.fromarray(encoded)
return image
def watermark(params, file):
if not os.path.exists(file):
log.error({ 'watermark': 'file not found' })
return
if not filetype.is_image(file):
log.error({ 'watermark': 'file is not an image' })
return
image = Image.open(file)
if image.width * image.height < 256 * 256:
log.error({ 'watermark': 'image too small' })
return
exif = get_exif(image)
wm = None
if params.command == 'read':
fn = params.input
wm = get_watermark(image, params)
elif params.command == 'write':
metadata = b'' if params.strip else set_exif(exif)
if params.output != '':
pathlib.Path(params.output).mkdir(parents = True, exist_ok = True)
image=set_watermark(image, params)
fn = os.path.join(params.output, file)
image.save(fn, exif=metadata)
if params.verify:
image = Image.open(fn)
data = np.asarray(image)
decoder = WatermarkDecoder(options.type, params.length)
decoded = decoder.decode(data, options.method)
wm = decoded.decode(encoding='ascii', errors='ignore')
else:
wm = params.wm
log.info({ 'file': fn })
log.info({ 'resolution': f'{image.width}x{image.height}' })
log.info({ 'watermark': wm })
log.info({ 'exif': None if params.strip else exif })
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'image watermarking')
parser.add_argument('command', choices = ['read', 'write'])
parser.add_argument('--wm', type=str, required=False, default='sdnext', help='watermark string')
parser.add_argument('--strip', default=False, action='store_true', help = "strip existing exif data")
parser.add_argument('--verify', default=False, action='store_true', help = "verify watermark during write")
parser.add_argument('--length', type=int, default=32, help="watermark length in bits")
parser.add_argument('--output', type=str, required=False, default='', help='folder to store images, default is overwrite in-place')
parser.add_argument('input', type=str, nargs='*')
args = parser.parse_args()
# log.info({ 'watermark args': vars(args), 'options': options })
for arg in args.input:
if os.path.isfile(arg):
watermark(args, arg)
elif os.path.isdir(arg):
for root, _dirs, files in os.walk(arg):
for f in files:
watermark(args, os.path.join(root, f))