mirror of
https://github.com/vladmandic/sdnext.git
synced 2026-01-27 15:02:48 +03:00
221 lines
9.1 KiB
Python
221 lines
9.1 KiB
Python
import numpy as np
|
|
import torch
|
|
from PIL import Image
|
|
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn, TimeElapsedColumn
|
|
import modules.postprocess.esrgan_model_arch as arch
|
|
from modules import images, devices, shared
|
|
from modules.upscaler import Upscaler, UpscalerData, compile_upscaler
|
|
|
|
|
|
def mod2normal(state_dict):
|
|
# this code is copied from https://github.com/victorca25/iNNfer
|
|
if 'conv_first.weight' in state_dict:
|
|
crt_net = {}
|
|
items = list(state_dict)
|
|
|
|
crt_net['model.0.weight'] = state_dict['conv_first.weight']
|
|
crt_net['model.0.bias'] = state_dict['conv_first.bias']
|
|
|
|
for k in items.copy():
|
|
if 'RDB' in k:
|
|
ori_k = k.replace('RRDB_trunk.', 'model.1.sub.')
|
|
if '.weight' in k:
|
|
ori_k = ori_k.replace('.weight', '.0.weight')
|
|
elif '.bias' in k:
|
|
ori_k = ori_k.replace('.bias', '.0.bias')
|
|
crt_net[ori_k] = state_dict[k]
|
|
items.remove(k)
|
|
|
|
crt_net['model.1.sub.23.weight'] = state_dict['trunk_conv.weight']
|
|
crt_net['model.1.sub.23.bias'] = state_dict['trunk_conv.bias']
|
|
crt_net['model.3.weight'] = state_dict['upconv1.weight']
|
|
crt_net['model.3.bias'] = state_dict['upconv1.bias']
|
|
crt_net['model.6.weight'] = state_dict['upconv2.weight']
|
|
crt_net['model.6.bias'] = state_dict['upconv2.bias']
|
|
crt_net['model.8.weight'] = state_dict['HRconv.weight']
|
|
crt_net['model.8.bias'] = state_dict['HRconv.bias']
|
|
crt_net['model.10.weight'] = state_dict['conv_last.weight']
|
|
crt_net['model.10.bias'] = state_dict['conv_last.bias']
|
|
state_dict = crt_net
|
|
return state_dict
|
|
|
|
|
|
def resrgan2normal(state_dict, nb=23):
|
|
# this code is copied from https://github.com/victorca25/iNNfer
|
|
if "conv_first.weight" in state_dict and "body.0.rdb1.conv1.weight" in state_dict:
|
|
re8x = 0
|
|
crt_net = {}
|
|
items = list(state_dict)
|
|
|
|
crt_net['model.0.weight'] = state_dict['conv_first.weight']
|
|
crt_net['model.0.bias'] = state_dict['conv_first.bias']
|
|
|
|
for k in items.copy():
|
|
if "rdb" in k:
|
|
ori_k = k.replace('body.', 'model.1.sub.')
|
|
ori_k = ori_k.replace('.rdb', '.RDB')
|
|
if '.weight' in k:
|
|
ori_k = ori_k.replace('.weight', '.0.weight')
|
|
elif '.bias' in k:
|
|
ori_k = ori_k.replace('.bias', '.0.bias')
|
|
crt_net[ori_k] = state_dict[k]
|
|
items.remove(k)
|
|
|
|
crt_net[f'model.1.sub.{nb}.weight'] = state_dict['conv_body.weight']
|
|
crt_net[f'model.1.sub.{nb}.bias'] = state_dict['conv_body.bias']
|
|
crt_net['model.3.weight'] = state_dict['conv_up1.weight']
|
|
crt_net['model.3.bias'] = state_dict['conv_up1.bias']
|
|
crt_net['model.6.weight'] = state_dict['conv_up2.weight']
|
|
crt_net['model.6.bias'] = state_dict['conv_up2.bias']
|
|
|
|
if 'conv_up3.weight' in state_dict:
|
|
# modification supporting: https://github.com/ai-forever/Real-ESRGAN/blob/main/RealESRGAN/rrdbnet_arch.py
|
|
re8x = 3
|
|
crt_net['model.9.weight'] = state_dict['conv_up3.weight']
|
|
crt_net['model.9.bias'] = state_dict['conv_up3.bias']
|
|
|
|
crt_net[f'model.{8+re8x}.weight'] = state_dict['conv_hr.weight']
|
|
crt_net[f'model.{8+re8x}.bias'] = state_dict['conv_hr.bias']
|
|
crt_net[f'model.{10+re8x}.weight'] = state_dict['conv_last.weight']
|
|
crt_net[f'model.{10+re8x}.bias'] = state_dict['conv_last.bias']
|
|
|
|
state_dict = crt_net
|
|
return state_dict
|
|
|
|
|
|
def infer_params(state_dict):
|
|
# this code is copied from https://github.com/victorca25/iNNfer
|
|
scale2x = 0
|
|
scalemin = 6
|
|
n_uplayer = 0
|
|
plus = False
|
|
|
|
for block in list(state_dict):
|
|
parts = block.split(".")
|
|
n_parts = len(parts)
|
|
if n_parts == 5 and parts[2] == "sub":
|
|
nb = int(parts[3])
|
|
elif n_parts == 3:
|
|
part_num = int(parts[1])
|
|
if (part_num > scalemin
|
|
and parts[0] == "model"
|
|
and parts[2] == "weight"):
|
|
scale2x += 1
|
|
if part_num > n_uplayer:
|
|
n_uplayer = part_num
|
|
out_nc = state_dict[block].shape[0]
|
|
if not plus and "conv1x1" in block:
|
|
plus = True
|
|
|
|
nf = state_dict["model.0.weight"].shape[0]
|
|
in_nc = state_dict["model.0.weight"].shape[1]
|
|
# out_nc = out_nc
|
|
scale = 2 ** scale2x
|
|
|
|
return in_nc, out_nc, nf, nb, plus, scale
|
|
|
|
|
|
class UpscalerESRGAN(Upscaler):
|
|
def __init__(self, dirname):
|
|
self.name = "ESRGAN"
|
|
self.user_path = dirname
|
|
super().__init__()
|
|
self.scalers = self.find_scalers()
|
|
self.models = {}
|
|
|
|
def do_upscale(self, img, selected_model):
|
|
model = self.load_model(selected_model)
|
|
if model is None:
|
|
return img
|
|
model.to(devices.device)
|
|
img = esrgan_upscale(model, img)
|
|
if shared.opts.upscaler_unload and selected_model in self.models:
|
|
del self.models[selected_model]
|
|
shared.log.debug(f"Upscaler unloaded: type={self.name} model={selected_model}")
|
|
devices.torch_gc(force=True)
|
|
return img
|
|
|
|
def load_model(self, path: str):
|
|
info: UpscalerData = self.find_model(path)
|
|
if info is None:
|
|
return
|
|
if self.models.get(info.local_data_path, None) is not None:
|
|
shared.log.debug(f"Upscaler cached: type={self.name} model={info.local_data_path}")
|
|
return self.models[info.local_data_path]
|
|
state_dict = torch.load(info.local_data_path, map_location='cpu' if devices.device.type in {'mps', 'cpu'} else None)
|
|
shared.log.info(f"Upscaler loaded: type={self.name} model={info.local_data_path}")
|
|
|
|
if "params_ema" in state_dict:
|
|
state_dict = state_dict["params_ema"]
|
|
elif "params" in state_dict:
|
|
state_dict = state_dict["params"]
|
|
num_conv = 16 if "realesr-animevideov3" in info.local_data_path else 32
|
|
model = arch.SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=num_conv, upscale=4, act_type='prelu')
|
|
model.load_state_dict(state_dict)
|
|
model.eval()
|
|
model = compile_upscaler(model)
|
|
self.models[info.local_data_path] = model
|
|
return self.models[info.local_data_path]
|
|
|
|
if "body.0.rdb1.conv1.weight" in state_dict and "conv_first.weight" in state_dict:
|
|
nb = 6 if "RealESRGAN_x4plus_anime_6B" in info.local_data_path else 23
|
|
state_dict = resrgan2normal(state_dict, nb)
|
|
elif "conv_first.weight" in state_dict:
|
|
state_dict = mod2normal(state_dict)
|
|
elif "model.0.weight" not in state_dict:
|
|
raise TypeError("The file is not a recognized ESRGAN model.")
|
|
in_nc, out_nc, nf, nb, plus, mscale = infer_params(state_dict)
|
|
model = arch.RRDBNet(in_nc=in_nc, out_nc=out_nc, nf=nf, nb=nb, upscale=mscale, plus=plus)
|
|
model.load_state_dict(state_dict)
|
|
model.eval()
|
|
model = compile_upscaler(model)
|
|
self.models[info.local_data_path] = model
|
|
return self.models[info.local_data_path]
|
|
|
|
|
|
def upscale_without_tiling(model, img):
|
|
img = np.array(img)
|
|
img = img[:, :, ::-1]
|
|
img = np.ascontiguousarray(np.transpose(img, (2, 0, 1))) / 255
|
|
img = torch.from_numpy(img).float()
|
|
img = img.unsqueeze(0).to(devices.device)
|
|
with devices.inference_context():
|
|
output = model(img)
|
|
output = output.squeeze().float().cpu().clamp_(0, 1).detach().numpy()
|
|
output = 255. * np.moveaxis(output, 0, 2)
|
|
output = output.astype(np.uint8)
|
|
output = output[:, :, ::-1]
|
|
return Image.fromarray(output, 'RGB')
|
|
|
|
|
|
def esrgan_upscale(model, img):
|
|
if shared.opts.upscaler_tile_size == 0:
|
|
return upscale_without_tiling(model, img)
|
|
|
|
grid = images.split_grid(img, shared.opts.upscaler_tile_size, shared.opts.upscaler_tile_size, shared.opts.upscaler_tile_overlap)
|
|
newtiles = []
|
|
scale_factor = 1
|
|
|
|
with Progress(TextColumn('[cyan]{task.description}'), BarColumn(), TaskProgressColumn(), TimeRemainingColumn(), TimeElapsedColumn(), console=shared.console) as progress:
|
|
total = 0
|
|
for _y, _h, row in grid.tiles:
|
|
total += len(row)
|
|
task = progress.add_task(description="Upscaling", total=total)
|
|
for y, h, row in grid.tiles:
|
|
if shared.state.interrupted:
|
|
break
|
|
newrow = []
|
|
for tiledata in row:
|
|
if shared.state.interrupted:
|
|
break
|
|
x, w, tile = tiledata
|
|
output = upscale_without_tiling(model, tile)
|
|
scale_factor = output.width // tile.width
|
|
newrow.append([x * scale_factor, w * scale_factor, output])
|
|
progress.update(task, advance=1, description="Upscaling")
|
|
newtiles.append([y * scale_factor, h * scale_factor, newrow])
|
|
|
|
newgrid = images.Grid(newtiles, grid.tile_w * scale_factor, grid.tile_h * scale_factor, grid.image_w * scale_factor, grid.image_h * scale_factor, grid.overlap * scale_factor)
|
|
output = images.combine_grid(newgrid)
|
|
return output
|