1
0
mirror of https://github.com/huggingface/diffusers.git synced 2026-01-29 07:22:12 +03:00
Files
diffusers/examples/research_projects/lpl/lpl_loss.py
Kashif Rasul 2bb640f8ea [Research] Latent Perceptual Loss (LPL) for Stable Diffusion XL (#11573)
* initial

* added readme

* fix formatting

* added logging

* formatting

* use config

* debug

* better

* handle SNR

* floats have no item()

* remove debug

* formatting

* add paper link

* acknowledge reference source

* rename script

---------

Co-authored-by: Sayak Paul <spsayakpaul@gmail.com>
2026-01-09 10:24:21 -10:00

216 lines
8.0 KiB
Python

# Copyright 2025 Berrada et al.
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
def normalize_tensor(in_feat, eps=1e-10):
norm_factor = torch.sqrt(torch.sum(in_feat**2, dim=1, keepdim=True))
return in_feat / (norm_factor + eps)
def cross_normalize(input, target, eps=1e-10):
norm_factor = torch.sqrt(torch.sum(target**2, dim=1, keepdim=True))
return input / (norm_factor + eps), target / (norm_factor + eps)
def remove_outliers(feat, down_f=1, opening=5, closing=3, m=100, quant=0.02):
opening = int(np.ceil(opening / down_f))
closing = int(np.ceil(closing / down_f))
if opening == 2:
opening = 3
if closing == 2:
closing = 1
# replace quantile with kth value here.
feat_flat = feat.flatten(-2, -1)
k1, k2 = int(feat_flat.shape[-1] * quant), int(feat_flat.shape[-1] * (1 - quant))
q1 = feat_flat.kthvalue(k1, dim=-1).values[..., None, None]
q2 = feat_flat.kthvalue(k2, dim=-1).values[..., None, None]
m = 2 * feat_flat.std(-1)[..., None, None].detach()
mask = (q1 - m < feat) * (feat < q2 + m)
# dilate the mask.
mask = nn.MaxPool2d(kernel_size=closing, stride=1, padding=(closing - 1) // 2)(mask.float()) # closing
mask = (-nn.MaxPool2d(kernel_size=opening, stride=1, padding=(opening - 1) // 2)(-mask)).bool() # opening
feat = feat * mask
return mask, feat
class LatentPerceptualLoss(nn.Module):
def __init__(
self,
vae,
loss_type="mse",
grad_ckpt=True,
pow_law=False,
norm_type="default",
num_mid_blocks=4,
feature_type="feature",
remove_outliers=True,
):
super().__init__()
self.vae = vae
self.decoder = self.vae.decoder
# Store scaling factors as tensors on the correct device
device = next(self.vae.parameters()).device
# Get scaling factors with proper defaults and handle None values
scale_factor = getattr(self.vae.config, "scaling_factor", None)
shift_factor = getattr(self.vae.config, "shift_factor", None)
# Convert to tensors with proper defaults
self.scale = torch.tensor(1.0 if scale_factor is None else scale_factor, device=device)
self.shift = torch.tensor(0.0 if shift_factor is None else shift_factor, device=device)
self.gradient_checkpointing = grad_ckpt
self.pow_law = pow_law
self.norm_type = norm_type.lower()
self.outlier_mask = remove_outliers
self.last_feature_stats = [] # Store feature statistics for logging
assert feature_type in ["feature", "image"]
self.feature_type = feature_type
assert self.norm_type in ["default", "shared", "batch"]
assert num_mid_blocks >= 0 and num_mid_blocks <= 4
self.n_blocks = num_mid_blocks
assert loss_type in ["mse", "l1"]
if loss_type == "mse":
self.loss_fn = nn.MSELoss(reduction="none")
elif loss_type == "l1":
self.loss_fn = nn.L1Loss(reduction="none")
def get_features(self, z, latent_embeds=None, disable_grads=False):
with torch.set_grad_enabled(not disable_grads):
if self.gradient_checkpointing and not disable_grads:
def create_custom_forward(module):
def custom_forward(*inputs):
return module(*inputs)
return custom_forward
features = []
upscale_dtype = next(iter(self.decoder.up_blocks.parameters())).dtype
sample = z
sample = self.decoder.conv_in(sample)
# middle
sample = torch.utils.checkpoint.checkpoint(
create_custom_forward(self.decoder.mid_block),
sample,
latent_embeds,
use_reentrant=False,
)
sample = sample.to(upscale_dtype)
features.append(sample)
# up
for up_block in self.decoder.up_blocks[: self.n_blocks]:
sample = torch.utils.checkpoint.checkpoint(
create_custom_forward(up_block),
sample,
latent_embeds,
use_reentrant=False,
)
features.append(sample)
return features
else:
features = []
upscale_dtype = next(iter(self.decoder.up_blocks.parameters())).dtype
sample = z
sample = self.decoder.conv_in(sample)
# middle
sample = self.decoder.mid_block(sample, latent_embeds)
sample = sample.to(upscale_dtype)
features.append(sample)
# up
for up_block in self.decoder.up_blocks[: self.n_blocks]:
sample = up_block(sample, latent_embeds)
features.append(sample)
return features
def get_loss(self, input, target, get_hist=False):
if self.feature_type == "feature":
inp_f = self.get_features(self.shift + input / self.scale)
tar_f = self.get_features(self.shift + target / self.scale, disable_grads=True)
losses = []
self.last_feature_stats = [] # Reset feature stats
for i, (x, y) in enumerate(zip(inp_f, tar_f, strict=False)):
my = torch.ones_like(y).bool()
outlier_ratio = 0.0
if self.outlier_mask:
with torch.no_grad():
if i == 2:
my, y = remove_outliers(y, down_f=2)
outlier_ratio = 1.0 - my.float().mean().item()
elif i in [3, 4, 5]:
my, y = remove_outliers(y, down_f=1)
outlier_ratio = 1.0 - my.float().mean().item()
# Store feature statistics before normalization
with torch.no_grad():
stats = {
"mean": y.mean().item(),
"std": y.std().item(),
"outlier_ratio": outlier_ratio,
}
self.last_feature_stats.append(stats)
# normalize feature tensors
if self.norm_type == "default":
x = normalize_tensor(x)
y = normalize_tensor(y)
elif self.norm_type == "shared":
x, y = cross_normalize(x, y, eps=1e-6)
term_loss = self.loss_fn(x, y) * my
# reduce loss term
loss_f = 2 ** (-min(i, 3)) if self.pow_law else 1.0
term_loss = term_loss.sum((2, 3)) * loss_f / my.sum((2, 3))
losses.append(term_loss.mean((1,)))
if get_hist:
return losses
else:
loss = sum(losses)
return loss / len(inp_f)
elif self.feature_type == "image":
inp_f = self.vae.decode(input / self.scale).sample
tar_f = self.vae.decode(target / self.scale).sample
return F.mse_loss(inp_f, tar_f)
def get_first_conv(self, z):
sample = self.decoder.conv_in(z)
return sample
def get_first_block(self, z):
sample = self.decoder.conv_in(z)
sample = self.decoder.mid_block(sample)
for resnet in self.decoder.up_blocks[0].resnets:
sample = resnet(sample, None)
return sample
def get_first_layer(self, input, target, target_layer="conv"):
if target_layer == "conv":
feat_in = self.get_first_conv(input)
with torch.no_grad():
feat_tar = self.get_first_conv(target)
else:
feat_in = self.get_first_block(input)
with torch.no_grad():
feat_tar = self.get_first_block(target)
feat_in, feat_tar = cross_normalize(feat_in, feat_tar)
return F.mse_loss(feat_in, feat_tar, reduction="mean")