mirror of
https://github.com/vladmandic/sdnext.git
synced 2026-01-27 15:02:48 +03:00
131 lines
4.3 KiB
Python
Executable File
131 lines
4.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import os
|
|
import io
|
|
import pathlib
|
|
import argparse
|
|
import filetype
|
|
import numpy as np
|
|
from imwatermark import WatermarkEncoder, WatermarkDecoder
|
|
from PIL import Image
|
|
from PIL.ExifTags import TAGS
|
|
from PIL.TiffImagePlugin import ImageFileDirectory_v2
|
|
from util import log, Map
|
|
import piexif
|
|
import piexif.helper
|
|
|
|
|
|
options = Map({ 'method': 'dwtDctSvd', 'type': 'bytes' })
|
|
|
|
|
|
def get_exif(image):
|
|
# using piexif
|
|
res1 = {}
|
|
try:
|
|
exif = piexif.load(image.info["exif"])
|
|
exif = exif.get("Exif", {})
|
|
for k, v in exif.items():
|
|
key = list(vars(piexif.ExifIFD).keys())[list(vars(piexif.ExifIFD).values()).index(k)]
|
|
res1[key] = piexif.helper.UserComment.load(v)
|
|
except Exception:
|
|
pass
|
|
# using pillow
|
|
res2 = {}
|
|
try:
|
|
res2 = { TAGS[k]: v for k, v in image.getexif().items() if k in TAGS }
|
|
except Exception:
|
|
pass
|
|
return {**res1, **res2}
|
|
|
|
|
|
def set_exif(d: dict):
|
|
ifd = ImageFileDirectory_v2()
|
|
_TAGS = {v: k for k, v in TAGS.items()} # enumerate possible exif tags
|
|
for k, v in d.items():
|
|
ifd[_TAGS[k]] = v
|
|
exif_stream = io.BytesIO()
|
|
ifd.save(exif_stream)
|
|
encoded = b'Exif\x00\x00' + exif_stream.getvalue()
|
|
return encoded
|
|
|
|
|
|
def get_watermark(image, params):
|
|
data = np.asarray(image)
|
|
decoder = WatermarkDecoder(options.type, params.length)
|
|
decoded = decoder.decode(data, options.method)
|
|
wm = decoded.decode(encoding='ascii', errors='ignore')
|
|
return wm
|
|
|
|
|
|
def set_watermark(image, params):
|
|
data = np.asarray(image)
|
|
encoder = WatermarkEncoder()
|
|
length = params.length // 8
|
|
text = f"{params.wm:<{length}}"[:length]
|
|
bytearr = text.encode(encoding='ascii', errors='ignore')
|
|
encoder.set_watermark(options.type, bytearr)
|
|
encoded = encoder.encode(data, options.method)
|
|
image = Image.fromarray(encoded)
|
|
return image
|
|
|
|
|
|
def watermark(params, file):
|
|
if not os.path.exists(file):
|
|
log.error({ 'watermark': 'file not found' })
|
|
return
|
|
if not filetype.is_image(file):
|
|
log.error({ 'watermark': 'file is not an image' })
|
|
return
|
|
image = Image.open(file)
|
|
if image.width * image.height < 256 * 256:
|
|
log.error({ 'watermark': 'image too small' })
|
|
return
|
|
|
|
exif = get_exif(image)
|
|
|
|
wm = None
|
|
if params.command == 'read':
|
|
fn = params.input
|
|
wm = get_watermark(image, params)
|
|
|
|
elif params.command == 'write':
|
|
metadata = b'' if params.strip else set_exif(exif)
|
|
if params.output != '':
|
|
pathlib.Path(params.output).mkdir(parents = True, exist_ok = True)
|
|
image=set_watermark(image, params)
|
|
fn = os.path.join(params.output, file)
|
|
image.save(fn, exif=metadata)
|
|
|
|
if params.verify:
|
|
image = Image.open(fn)
|
|
data = np.asarray(image)
|
|
decoder = WatermarkDecoder(options.type, params.length)
|
|
decoded = decoder.decode(data, options.method)
|
|
wm = decoded.decode(encoding='ascii', errors='ignore')
|
|
else:
|
|
wm = params.wm
|
|
|
|
log.info({ 'file': fn })
|
|
log.info({ 'resolution': f'{image.width}x{image.height}' })
|
|
log.info({ 'watermark': wm })
|
|
log.info({ 'exif': None if params.strip else exif })
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description = 'image watermarking')
|
|
parser.add_argument('command', choices = ['read', 'write'])
|
|
parser.add_argument('--wm', type=str, required=False, default='sdnext', help='watermark string')
|
|
parser.add_argument('--strip', default=False, action='store_true', help = "strip existing exif data")
|
|
parser.add_argument('--verify', default=False, action='store_true', help = "verify watermark during write")
|
|
parser.add_argument('--length', type=int, default=32, help="watermark length in bits")
|
|
parser.add_argument('--output', type=str, required=False, default='', help='folder to store images, default is overwrite in-place')
|
|
parser.add_argument('input', type=str, nargs='*')
|
|
args = parser.parse_args()
|
|
# log.info({ 'watermark args': vars(args), 'options': options })
|
|
for arg in args.input:
|
|
if os.path.isfile(arg):
|
|
watermark(args, arg)
|
|
elif os.path.isdir(arg):
|
|
for root, _dirs, files in os.walk(arg):
|
|
for f in files:
|
|
watermark(args, os.path.join(root, f))
|