mirror of
https://github.com/huggingface/diffusers.git
synced 2026-01-27 17:22:53 +03:00
Merge branch 'main' of https://github.com/huggingface/diffusers into main
This commit is contained in:
@@ -11,47 +11,75 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from torch import nn
|
||||
|
||||
|
||||
# unet.py
|
||||
def get_timestep_embedding(timesteps, embedding_dim):
|
||||
def get_timestep_embedding(
|
||||
timesteps, embedding_dim, flip_sin_to_cos=False, downscale_freq_shift=1, scale=1, max_period=10000
|
||||
):
|
||||
"""
|
||||
This matches the implementation in Denoising Diffusion Probabilistic Models:
|
||||
From Fairseq.
|
||||
Build sinusoidal embeddings.
|
||||
This matches the implementation in tensor2tensor, but differs slightly
|
||||
from the description in Section 3.5 of "Attention Is All You Need".
|
||||
"""
|
||||
assert len(timesteps.shape) == 1
|
||||
|
||||
half_dim = embedding_dim // 2
|
||||
emb = math.log(10000) / (half_dim - 1)
|
||||
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb)
|
||||
emb = emb.to(device=timesteps.device)
|
||||
emb = timesteps.float()[:, None] * emb[None, :]
|
||||
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
|
||||
if embedding_dim % 2 == 1: # zero pad
|
||||
emb = torch.nn.functional.pad(emb, (0, 1, 0, 0))
|
||||
return emb
|
||||
|
||||
|
||||
# unet_glide.py
|
||||
def timestep_embedding(timesteps, dim, max_period=10000):
|
||||
"""
|
||||
Create sinusoidal timestep embeddings.
|
||||
|
||||
:param timesteps: a 1-D Tensor of N indices, one per batch element.
|
||||
These may be fractional.
|
||||
:param dim: the dimension of the output.
|
||||
:param embedding_dim: the dimension of the output.
|
||||
:param max_period: controls the minimum frequency of the embeddings.
|
||||
:return: an [N x dim] Tensor of positional embeddings.
|
||||
"""
|
||||
half = dim // 2
|
||||
freqs = torch.exp(-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(
|
||||
device=timesteps.device
|
||||
)
|
||||
args = timesteps[:, None].float() * freqs[None]
|
||||
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
|
||||
if dim % 2:
|
||||
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1)
|
||||
return embedding
|
||||
assert len(timesteps.shape) == 1, "Timesteps should be a 1d-array"
|
||||
|
||||
half_dim = embedding_dim // 2
|
||||
|
||||
emb_coeff = -math.log(max_period) / (half_dim - downscale_freq_shift)
|
||||
emb = torch.arange(half_dim, dtype=torch.float32, device=timesteps.device)
|
||||
emb = torch.exp(emb * emb_coeff)
|
||||
emb = timesteps[:, None].float() * emb[None, :]
|
||||
|
||||
# scale embeddings
|
||||
emb = scale * emb
|
||||
|
||||
# concat sine and cosine embeddings
|
||||
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=-1)
|
||||
|
||||
# flip sine and cosine embeddings
|
||||
if flip_sin_to_cos:
|
||||
emb = torch.cat([emb[:, half_dim:], emb[:, :half_dim]], dim=-1)
|
||||
|
||||
# zero pad
|
||||
if embedding_dim % 2 == 1:
|
||||
emb = torch.nn.functional.pad(emb, (0, 1, 0, 0))
|
||||
return emb
|
||||
|
||||
|
||||
# unet_sde_score_estimation.py
|
||||
class GaussianFourierProjection(nn.Module):
|
||||
"""Gaussian Fourier embeddings for noise levels."""
|
||||
|
||||
def __init__(self, embedding_size=256, scale=1.0):
|
||||
super().__init__()
|
||||
self.W = nn.Parameter(torch.randn(embedding_size) * scale, requires_grad=False)
|
||||
|
||||
def forward(self, x):
|
||||
x_proj = x[:, None] * self.W[None, :] * 2 * np.pi
|
||||
return torch.cat([torch.sin(x_proj), torch.cos(x_proj)], dim=-1)
|
||||
|
||||
|
||||
# unet_rl.py - TODO(need test)
|
||||
class SinusoidalPosEmb(nn.Module):
|
||||
def __init__(self, dim):
|
||||
super().__init__()
|
||||
self.dim = dim
|
||||
|
||||
def forward(self, x):
|
||||
device = x.device
|
||||
half_dim = self.dim // 2
|
||||
emb = math.log(10000) / (half_dim - 1)
|
||||
emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
|
||||
emb = x[:, None] * emb[None, :]
|
||||
emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
|
||||
return emb
|
||||
|
||||
@@ -30,27 +30,7 @@ from tqdm import tqdm
|
||||
|
||||
from ..configuration_utils import ConfigMixin
|
||||
from ..modeling_utils import ModelMixin
|
||||
|
||||
|
||||
def get_timestep_embedding(timesteps, embedding_dim):
|
||||
"""
|
||||
This matches the implementation in Denoising Diffusion Probabilistic Models:
|
||||
From Fairseq.
|
||||
Build sinusoidal embeddings.
|
||||
This matches the implementation in tensor2tensor, but differs slightly
|
||||
from the description in Section 3.5 of "Attention Is All You Need".
|
||||
"""
|
||||
assert len(timesteps.shape) == 1
|
||||
|
||||
half_dim = embedding_dim // 2
|
||||
emb = math.log(10000) / (half_dim - 1)
|
||||
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb)
|
||||
emb = emb.to(device=timesteps.device)
|
||||
emb = timesteps.float()[:, None] * emb[None, :]
|
||||
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
|
||||
if embedding_dim % 2 == 1: # zero pad
|
||||
emb = torch.nn.functional.pad(emb, (0, 1, 0, 0))
|
||||
return emb
|
||||
from .embeddings import get_timestep_embedding
|
||||
|
||||
|
||||
def nonlinearity(x):
|
||||
|
||||
@@ -7,6 +7,7 @@ import torch.nn.functional as F
|
||||
|
||||
from ..configuration_utils import ConfigMixin
|
||||
from ..modeling_utils import ModelMixin
|
||||
from .embeddings import get_timestep_embedding
|
||||
|
||||
|
||||
def convert_module_to_f16(l):
|
||||
@@ -86,27 +87,6 @@ def normalization(channels, swish=0.0):
|
||||
return GroupNorm32(num_channels=channels, num_groups=32, swish=swish)
|
||||
|
||||
|
||||
def timestep_embedding(timesteps, dim, max_period=10000):
|
||||
"""
|
||||
Create sinusoidal timestep embeddings.
|
||||
|
||||
:param timesteps: a 1-D Tensor of N indices, one per batch element.
|
||||
These may be fractional.
|
||||
:param dim: the dimension of the output.
|
||||
:param max_period: controls the minimum frequency of the embeddings.
|
||||
:return: an [N x dim] Tensor of positional embeddings.
|
||||
"""
|
||||
half = dim // 2
|
||||
freqs = torch.exp(-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(
|
||||
device=timesteps.device
|
||||
)
|
||||
args = timesteps[:, None].float() * freqs[None]
|
||||
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
|
||||
if dim % 2:
|
||||
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1)
|
||||
return embedding
|
||||
|
||||
|
||||
def zero_module(module):
|
||||
"""
|
||||
Zero out the parameters of a module and return it.
|
||||
@@ -627,7 +607,9 @@ class GlideUNetModel(ModelMixin, ConfigMixin):
|
||||
"""
|
||||
|
||||
hs = []
|
||||
emb = self.time_embed(timestep_embedding(timesteps, self.model_channels))
|
||||
emb = self.time_embed(
|
||||
get_timestep_embedding(timesteps, self.model_channels, flip_sin_to_cos=True, downscale_freq_shift=0)
|
||||
)
|
||||
|
||||
h = x.type(self.dtype)
|
||||
for module in self.input_blocks:
|
||||
@@ -714,7 +696,9 @@ class GlideTextToImageUNetModel(GlideUNetModel):
|
||||
|
||||
def forward(self, x, timesteps, transformer_out=None):
|
||||
hs = []
|
||||
emb = self.time_embed(timestep_embedding(timesteps, self.model_channels))
|
||||
emb = self.time_embed(
|
||||
get_timestep_embedding(timesteps, self.model_channels, flip_sin_to_cos=True, downscale_freq_shift=0)
|
||||
)
|
||||
|
||||
# project the last token
|
||||
transformer_proj = self.transformer_proj(transformer_out[:, -1])
|
||||
@@ -806,7 +790,9 @@ class GlideSuperResUNetModel(GlideUNetModel):
|
||||
x = torch.cat([x, upsampled], dim=1)
|
||||
|
||||
hs = []
|
||||
emb = self.time_embed(timestep_embedding(timesteps, self.model_channels))
|
||||
emb = self.time_embed(
|
||||
get_timestep_embedding(timesteps, self.model_channels, flip_sin_to_cos=True, downscale_freq_shift=0)
|
||||
)
|
||||
|
||||
h = x
|
||||
for module in self.input_blocks:
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import math
|
||||
|
||||
import torch
|
||||
|
||||
|
||||
@@ -11,6 +9,7 @@ except:
|
||||
|
||||
from ..configuration_utils import ConfigMixin
|
||||
from ..modeling_utils import ModelMixin
|
||||
from .embeddings import get_timestep_embedding
|
||||
|
||||
|
||||
class Mish(torch.nn.Module):
|
||||
@@ -107,21 +106,6 @@ class Residual(torch.nn.Module):
|
||||
return output
|
||||
|
||||
|
||||
class SinusoidalPosEmb(torch.nn.Module):
|
||||
def __init__(self, dim):
|
||||
super(SinusoidalPosEmb, self).__init__()
|
||||
self.dim = dim
|
||||
|
||||
def forward(self, x, scale=1000):
|
||||
device = x.device
|
||||
half_dim = self.dim // 2
|
||||
emb = math.log(10000) / (half_dim - 1)
|
||||
emb = torch.exp(torch.arange(half_dim, device=device).float() * -emb)
|
||||
emb = scale * x.unsqueeze(1) * emb.unsqueeze(0)
|
||||
emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
|
||||
return emb
|
||||
|
||||
|
||||
class UNetGradTTSModel(ModelMixin, ConfigMixin):
|
||||
def __init__(self, dim, dim_mults=(1, 2, 4), groups=8, n_spks=None, spk_emb_dim=64, n_feats=80, pe_scale=1000):
|
||||
super(UNetGradTTSModel, self).__init__()
|
||||
@@ -149,7 +133,6 @@ class UNetGradTTSModel(ModelMixin, ConfigMixin):
|
||||
torch.nn.Linear(spk_emb_dim, spk_emb_dim * 4), Mish(), torch.nn.Linear(spk_emb_dim * 4, n_feats)
|
||||
)
|
||||
|
||||
self.time_pos_emb = SinusoidalPosEmb(dim)
|
||||
self.mlp = torch.nn.Sequential(torch.nn.Linear(dim, dim * 4), Mish(), torch.nn.Linear(dim * 4, dim))
|
||||
|
||||
dims = [2 + (1 if n_spks > 1 else 0), *map(lambda m: dim * m, dim_mults)]
|
||||
@@ -198,7 +181,7 @@ class UNetGradTTSModel(ModelMixin, ConfigMixin):
|
||||
if not isinstance(spk, type(None)):
|
||||
s = self.spk_mlp(spk)
|
||||
|
||||
t = self.time_pos_emb(timesteps, scale=self.pe_scale)
|
||||
t = get_timestep_embedding(timesteps, self.dim, scale=self.pe_scale)
|
||||
t = self.mlp(t)
|
||||
|
||||
if self.n_spks < 2:
|
||||
|
||||
@@ -16,6 +16,7 @@ except:
|
||||
|
||||
from ..configuration_utils import ConfigMixin
|
||||
from ..modeling_utils import ModelMixin
|
||||
from .embeddings import get_timestep_embedding
|
||||
|
||||
|
||||
def exists(val):
|
||||
@@ -316,36 +317,6 @@ def normalization(channels, swish=0.0):
|
||||
return GroupNorm32(num_channels=channels, num_groups=32, swish=swish)
|
||||
|
||||
|
||||
def timestep_embedding(timesteps, dim, max_period=10000):
|
||||
"""
|
||||
Create sinusoidal timestep embeddings.
|
||||
|
||||
:param timesteps: a 1-D Tensor of N indices, one per batch element.
|
||||
These may be fractional.
|
||||
:param dim: the dimension of the output.
|
||||
:param max_period: controls the minimum frequency of the embeddings.
|
||||
:return: an [N x dim] Tensor of positional embeddings.
|
||||
"""
|
||||
half = dim // 2
|
||||
freqs = torch.exp(-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(
|
||||
device=timesteps.device
|
||||
)
|
||||
args = timesteps[:, None].float() * freqs[None]
|
||||
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
|
||||
if dim % 2:
|
||||
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1)
|
||||
return embedding
|
||||
|
||||
|
||||
def zero_module(module):
|
||||
"""
|
||||
Zero out the parameters of a module and return it.
|
||||
"""
|
||||
for p in module.parameters():
|
||||
p.detach().zero_()
|
||||
return module
|
||||
|
||||
|
||||
## go
|
||||
class AttentionPool2d(nn.Module):
|
||||
"""
|
||||
@@ -1026,7 +997,7 @@ class UNetLDMModel(ModelMixin, ConfigMixin):
|
||||
hs = []
|
||||
if not torch.is_tensor(timesteps):
|
||||
timesteps = torch.tensor([timesteps], dtype=torch.long, device=x.device)
|
||||
t_emb = timestep_embedding(timesteps, self.model_channels)
|
||||
t_emb = get_timestep_embedding(timesteps, self.model_channels, flip_sin_to_cos=True, downscale_freq_shift=0)
|
||||
emb = self.time_embed(t_emb)
|
||||
|
||||
if self.num_classes is not None:
|
||||
@@ -1240,7 +1211,9 @@ class EncoderUNetModel(nn.Module):
|
||||
:param timesteps: a 1-D batch of timesteps.
|
||||
:return: an [N x K] Tensor of outputs.
|
||||
"""
|
||||
emb = self.time_embed(timestep_embedding(timesteps, self.model_channels))
|
||||
emb = self.time_embed(
|
||||
get_timestep_embedding(timesteps, self.model_channels, flip_sin_to_cos=True, downscale_freq_shift=0)
|
||||
)
|
||||
|
||||
results = []
|
||||
h = x.type(self.dtype)
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
# helpers functions
|
||||
|
||||
import functools
|
||||
import math
|
||||
import string
|
||||
|
||||
import numpy as np
|
||||
@@ -26,6 +25,7 @@ import torch.nn.functional as F
|
||||
|
||||
from ..configuration_utils import ConfigMixin
|
||||
from ..modeling_utils import ModelMixin
|
||||
from .embeddings import GaussianFourierProjection, get_timestep_embedding
|
||||
|
||||
|
||||
def upfirdn2d(input, kernel, up=1, down=1, pad=(0, 0)):
|
||||
@@ -381,23 +381,6 @@ def get_act(nonlinearity):
|
||||
raise NotImplementedError("activation function does not exist!")
|
||||
|
||||
|
||||
def get_timestep_embedding(timesteps, embedding_dim, max_positions=10000):
|
||||
assert len(timesteps.shape) == 1 # and timesteps.dtype == tf.int32
|
||||
half_dim = embedding_dim // 2
|
||||
# magic number 10000 is from transformers
|
||||
emb = math.log(max_positions) / (half_dim - 1)
|
||||
# emb = math.log(2.) / (half_dim - 1)
|
||||
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32, device=timesteps.device) * -emb)
|
||||
# emb = tf.range(num_embeddings, dtype=jnp.float32)[:, None] * emb[None, :]
|
||||
# emb = tf.cast(timesteps, dtype=jnp.float32)[:, None] * emb[None, :]
|
||||
emb = timesteps.float()[:, None] * emb[None, :]
|
||||
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
|
||||
if embedding_dim % 2 == 1: # zero pad
|
||||
emb = F.pad(emb, (0, 1), mode="constant")
|
||||
assert emb.shape == (timesteps.shape[0], embedding_dim)
|
||||
return emb
|
||||
|
||||
|
||||
def default_init(scale=1.0):
|
||||
"""The same initialization used in DDPM."""
|
||||
scale = 1e-10 if scale == 0 else scale
|
||||
@@ -434,18 +417,6 @@ def variance_scaling(scale, mode, distribution, in_axis=1, out_axis=0, dtype=tor
|
||||
return init
|
||||
|
||||
|
||||
class GaussianFourierProjection(nn.Module):
|
||||
"""Gaussian Fourier embeddings for noise levels."""
|
||||
|
||||
def __init__(self, embedding_size=256, scale=1.0):
|
||||
super().__init__()
|
||||
self.W = nn.Parameter(torch.randn(embedding_size) * scale, requires_grad=False)
|
||||
|
||||
def forward(self, x):
|
||||
x_proj = x[:, None] * self.W[None, :] * 2 * np.pi
|
||||
return torch.cat([torch.sin(x_proj), torch.cos(x_proj)], dim=-1)
|
||||
|
||||
|
||||
class Combine(nn.Module):
|
||||
"""Combine information from skip connections."""
|
||||
|
||||
|
||||
@@ -21,718 +21,95 @@ import unittest
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from diffusers import (
|
||||
BDDMPipeline,
|
||||
DDIMPipeline,
|
||||
DDIMScheduler,
|
||||
DDPMPipeline,
|
||||
DDPMScheduler,
|
||||
GlidePipeline,
|
||||
GlideSuperResUNetModel,
|
||||
GlideTextToImageUNetModel,
|
||||
GradTTSPipeline,
|
||||
GradTTSScheduler,
|
||||
LatentDiffusionPipeline,
|
||||
PNDMPipeline,
|
||||
PNDMScheduler,
|
||||
UNetGradTTSModel,
|
||||
UNetLDMModel,
|
||||
UNetModel,
|
||||
)
|
||||
from diffusers.configuration_utils import ConfigMixin
|
||||
from diffusers.pipeline_utils import DiffusionPipeline
|
||||
from diffusers.pipelines.pipeline_bddm import DiffWave
|
||||
from diffusers.models.embeddings import get_timestep_embedding
|
||||
from diffusers.testing_utils import floats_tensor, slow, torch_device
|
||||
|
||||
|
||||
torch.backends.cuda.matmul.allow_tf32 = False
|
||||
|
||||
|
||||
class ConfigTester(unittest.TestCase):
|
||||
def test_load_not_from_mixin(self):
|
||||
with self.assertRaises(ValueError):
|
||||
ConfigMixin.from_config("dummy_path")
|
||||
class EmbeddingsTests(unittest.TestCase):
|
||||
def test_timestep_embeddings(self):
|
||||
embedding_dim = 256
|
||||
timesteps = torch.arange(16)
|
||||
|
||||
def test_save_load(self):
|
||||
class SampleObject(ConfigMixin):
|
||||
config_name = "config.json"
|
||||
t1 = get_timestep_embedding(timesteps, embedding_dim)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
a=2,
|
||||
b=5,
|
||||
c=(2, 5),
|
||||
d="for diffusion",
|
||||
e=[1, 3],
|
||||
):
|
||||
self.register_to_config(a=a, b=b, c=c, d=d, e=e)
|
||||
# first vector should always be composed only of 0's and 1's
|
||||
assert (t1[0, : embedding_dim // 2] - 0).abs().sum() < 1e-5
|
||||
assert (t1[0, embedding_dim // 2 :] - 1).abs().sum() < 1e-5
|
||||
|
||||
obj = SampleObject()
|
||||
config = obj.config
|
||||
# last element of each vector should be one
|
||||
assert (t1[:, -1] - 1).abs().sum() < 1e-5
|
||||
|
||||
assert config["a"] == 2
|
||||
assert config["b"] == 5
|
||||
assert config["c"] == (2, 5)
|
||||
assert config["d"] == "for diffusion"
|
||||
assert config["e"] == [1, 3]
|
||||
# For large embeddings (e.g. 128) the frequency of every vector is higher
|
||||
# than the previous one which means that the gradients of later vectors are
|
||||
# ALWAYS higher than the previous ones
|
||||
grad_mean = np.abs(np.gradient(t1, axis=-1)).mean(axis=1)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
obj.save_config(tmpdirname)
|
||||
new_obj = SampleObject.from_config(tmpdirname)
|
||||
new_config = new_obj.config
|
||||
prev_grad = 0.0
|
||||
for grad in grad_mean:
|
||||
assert grad > prev_grad
|
||||
prev_grad = grad
|
||||
|
||||
# unfreeze configs
|
||||
config = dict(config)
|
||||
new_config = dict(new_config)
|
||||
def test_timestep_defaults(self):
|
||||
embedding_dim = 16
|
||||
timesteps = torch.arange(10)
|
||||
|
||||
assert config.pop("c") == (2, 5) # instantiated as tuple
|
||||
assert new_config.pop("c") == [2, 5] # saved & loaded as list because of json
|
||||
assert config == new_config
|
||||
|
||||
|
||||
class ModelTesterMixin:
|
||||
def test_from_pretrained_save_pretrained(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
model.save_pretrained(tmpdirname)
|
||||
new_model = self.model_class.from_pretrained(tmpdirname)
|
||||
new_model.to(torch_device)
|
||||
|
||||
with torch.no_grad():
|
||||
image = model(**inputs_dict)
|
||||
new_image = new_model(**inputs_dict)
|
||||
|
||||
max_diff = (image - new_image).abs().sum().item()
|
||||
self.assertLessEqual(max_diff, 1e-5, "Models give different forward passes")
|
||||
|
||||
def test_determinism(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
with torch.no_grad():
|
||||
first = model(**inputs_dict)
|
||||
second = model(**inputs_dict)
|
||||
|
||||
out_1 = first.cpu().numpy()
|
||||
out_2 = second.cpu().numpy()
|
||||
out_1 = out_1[~np.isnan(out_1)]
|
||||
out_2 = out_2[~np.isnan(out_2)]
|
||||
max_diff = np.amax(np.abs(out_1 - out_2))
|
||||
self.assertLessEqual(max_diff, 1e-5)
|
||||
|
||||
def test_output(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(**inputs_dict)
|
||||
|
||||
self.assertIsNotNone(output)
|
||||
expected_shape = inputs_dict["x"].shape
|
||||
self.assertEqual(output.shape, expected_shape, "Input and output shapes do not match")
|
||||
|
||||
def test_forward_signature(self):
|
||||
init_dict, _ = self.prepare_init_args_and_inputs_for_common()
|
||||
|
||||
model = self.model_class(**init_dict)
|
||||
signature = inspect.signature(model.forward)
|
||||
# signature.parameters is an OrderedDict => so arg_names order is deterministic
|
||||
arg_names = [*signature.parameters.keys()]
|
||||
|
||||
expected_arg_names = ["x", "timesteps"]
|
||||
self.assertListEqual(arg_names[:2], expected_arg_names)
|
||||
|
||||
def test_model_from_config(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
|
||||
# test if the model can be loaded from the config
|
||||
# and has all the expected shape
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
model.save_config(tmpdirname)
|
||||
new_model = self.model_class.from_config(tmpdirname)
|
||||
new_model.to(torch_device)
|
||||
new_model.eval()
|
||||
|
||||
# check if all paramters shape are the same
|
||||
for param_name in model.state_dict().keys():
|
||||
param_1 = model.state_dict()[param_name]
|
||||
param_2 = new_model.state_dict()[param_name]
|
||||
self.assertEqual(param_1.shape, param_2.shape)
|
||||
|
||||
with torch.no_grad():
|
||||
output_1 = model(**inputs_dict)
|
||||
output_2 = new_model(**inputs_dict)
|
||||
|
||||
self.assertEqual(output_1.shape, output_2.shape)
|
||||
|
||||
def test_training(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.train()
|
||||
output = model(**inputs_dict)
|
||||
noise = torch.randn((inputs_dict["x"].shape[0],) + self.get_output_shape).to(torch_device)
|
||||
loss = torch.nn.functional.mse_loss(output, noise)
|
||||
loss.backward()
|
||||
|
||||
|
||||
class UnetModelTests(ModelTesterMixin, unittest.TestCase):
|
||||
model_class = UNetModel
|
||||
|
||||
@property
|
||||
def dummy_input(self):
|
||||
batch_size = 4
|
||||
num_channels = 3
|
||||
sizes = (32, 32)
|
||||
|
||||
noise = floats_tensor((batch_size, num_channels) + sizes).to(torch_device)
|
||||
time_step = torch.tensor([10]).to(torch_device)
|
||||
|
||||
return {"x": noise, "timesteps": time_step}
|
||||
|
||||
@property
|
||||
def get_input_shape(self):
|
||||
return (3, 32, 32)
|
||||
|
||||
@property
|
||||
def get_output_shape(self):
|
||||
return (3, 32, 32)
|
||||
|
||||
def prepare_init_args_and_inputs_for_common(self):
|
||||
init_dict = {
|
||||
"ch": 32,
|
||||
"ch_mult": (1, 2),
|
||||
"num_res_blocks": 2,
|
||||
"attn_resolutions": (16,),
|
||||
"resolution": 32,
|
||||
}
|
||||
inputs_dict = self.dummy_input
|
||||
return init_dict, inputs_dict
|
||||
|
||||
def test_from_pretrained_hub(self):
|
||||
model, loading_info = UNetModel.from_pretrained("fusing/ddpm_dummy", output_loading_info=True)
|
||||
self.assertIsNotNone(model)
|
||||
self.assertEqual(len(loading_info["missing_keys"]), 0)
|
||||
|
||||
model.to(torch_device)
|
||||
image = model(**self.dummy_input)
|
||||
|
||||
assert image is not None, "Make sure output is not None"
|
||||
|
||||
def test_output_pretrained(self):
|
||||
model = UNetModel.from_pretrained("fusing/ddpm_dummy")
|
||||
model.eval()
|
||||
|
||||
torch.manual_seed(0)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.manual_seed_all(0)
|
||||
|
||||
noise = torch.randn(1, model.config.in_channels, model.config.resolution, model.config.resolution)
|
||||
time_step = torch.tensor([10])
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(noise, time_step)
|
||||
|
||||
output_slice = output[0, -1, -3:, -3:].flatten()
|
||||
# fmt: off
|
||||
expected_output_slice = torch.tensor([0.2891, -0.1899, 0.2595, -0.6214, 0.0968, -0.2622, 0.4688, 0.1311, 0.0053])
|
||||
# fmt: on
|
||||
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
|
||||
|
||||
|
||||
class GlideSuperResUNetTests(ModelTesterMixin, unittest.TestCase):
|
||||
model_class = GlideSuperResUNetModel
|
||||
|
||||
@property
|
||||
def dummy_input(self):
|
||||
batch_size = 4
|
||||
num_channels = 6
|
||||
sizes = (32, 32)
|
||||
low_res_size = (4, 4)
|
||||
|
||||
noise = torch.randn((batch_size, num_channels // 2) + sizes).to(torch_device)
|
||||
low_res = torch.randn((batch_size, 3) + low_res_size).to(torch_device)
|
||||
time_step = torch.tensor([10] * noise.shape[0], device=torch_device)
|
||||
|
||||
return {"x": noise, "timesteps": time_step, "low_res": low_res}
|
||||
|
||||
@property
|
||||
def get_input_shape(self):
|
||||
return (3, 32, 32)
|
||||
|
||||
@property
|
||||
def get_output_shape(self):
|
||||
return (6, 32, 32)
|
||||
|
||||
def prepare_init_args_and_inputs_for_common(self):
|
||||
init_dict = {
|
||||
"attention_resolutions": (2,),
|
||||
"channel_mult": (1, 2),
|
||||
"in_channels": 6,
|
||||
"out_channels": 6,
|
||||
"model_channels": 32,
|
||||
"num_head_channels": 8,
|
||||
"num_heads_upsample": 1,
|
||||
"num_res_blocks": 2,
|
||||
"resblock_updown": True,
|
||||
"resolution": 32,
|
||||
"use_scale_shift_norm": True,
|
||||
}
|
||||
inputs_dict = self.dummy_input
|
||||
return init_dict, inputs_dict
|
||||
|
||||
def test_output(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(**inputs_dict)
|
||||
|
||||
output, _ = torch.split(output, 3, dim=1)
|
||||
|
||||
self.assertIsNotNone(output)
|
||||
expected_shape = inputs_dict["x"].shape
|
||||
self.assertEqual(output.shape, expected_shape, "Input and output shapes do not match")
|
||||
|
||||
def test_from_pretrained_hub(self):
|
||||
model, loading_info = GlideSuperResUNetModel.from_pretrained(
|
||||
"fusing/glide-super-res-dummy", output_loading_info=True
|
||||
t1 = get_timestep_embedding(timesteps, embedding_dim)
|
||||
t2 = get_timestep_embedding(
|
||||
timesteps, embedding_dim, flip_sin_to_cos=False, downscale_freq_shift=1, max_period=10_000
|
||||
)
|
||||
self.assertIsNotNone(model)
|
||||
self.assertEqual(len(loading_info["missing_keys"]), 0)
|
||||
|
||||
model.to(torch_device)
|
||||
image = model(**self.dummy_input)
|
||||
assert torch.allclose(t1.cpu(), t2.cpu(), 1e-3)
|
||||
|
||||
assert image is not None, "Make sure output is not None"
|
||||
def test_timestep_flip_sin_cos(self):
|
||||
embedding_dim = 16
|
||||
timesteps = torch.arange(10)
|
||||
|
||||
def test_output_pretrained(self):
|
||||
model = GlideSuperResUNetModel.from_pretrained("fusing/glide-super-res-dummy")
|
||||
t1 = get_timestep_embedding(timesteps, embedding_dim, flip_sin_to_cos=True)
|
||||
t1 = torch.cat([t1[:, embedding_dim // 2 :], t1[:, : embedding_dim // 2]], dim=-1)
|
||||
|
||||
torch.manual_seed(0)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.manual_seed_all(0)
|
||||
t2 = get_timestep_embedding(timesteps, embedding_dim, flip_sin_to_cos=False)
|
||||
|
||||
noise = torch.randn(1, 3, 64, 64)
|
||||
low_res = torch.randn(1, 3, 4, 4)
|
||||
time_step = torch.tensor([42] * noise.shape[0])
|
||||
assert torch.allclose(t1.cpu(), t2.cpu(), 1e-3)
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(noise, time_step, low_res)
|
||||
def test_timestep_downscale_freq_shift(self):
|
||||
embedding_dim = 16
|
||||
timesteps = torch.arange(10)
|
||||
|
||||
output, _ = torch.split(output, 3, dim=1)
|
||||
output_slice = output[0, -1, -3:, -3:].flatten()
|
||||
# fmt: off
|
||||
expected_output_slice = torch.tensor([-22.8782, -23.2652, -15.3966, -22.8034, -23.3159, -15.5640, -15.3970, -15.4614, - 10.4370])
|
||||
# fmt: on
|
||||
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
|
||||
t1 = get_timestep_embedding(timesteps, embedding_dim, downscale_freq_shift=0)
|
||||
t2 = get_timestep_embedding(timesteps, embedding_dim, downscale_freq_shift=1)
|
||||
|
||||
# get cosine half (vectors that are wrapped into cosine)
|
||||
cosine_half = (t1 - t2)[:, embedding_dim // 2 :]
|
||||
|
||||
class GlideTextToImageUNetModelTests(ModelTesterMixin, unittest.TestCase):
|
||||
model_class = GlideTextToImageUNetModel
|
||||
# cosine needs to be negative
|
||||
assert (np.abs((cosine_half <= 0).numpy()) - 1).sum() < 1e-5
|
||||
|
||||
@property
|
||||
def dummy_input(self):
|
||||
batch_size = 4
|
||||
num_channels = 3
|
||||
sizes = (32, 32)
|
||||
transformer_dim = 32
|
||||
seq_len = 16
|
||||
def test_sinoid_embeddings_hardcoded(self):
|
||||
embedding_dim = 64
|
||||
timesteps = torch.arange(128)
|
||||
|
||||
noise = torch.randn((batch_size, num_channels) + sizes).to(torch_device)
|
||||
emb = torch.randn((batch_size, seq_len, transformer_dim)).to(torch_device)
|
||||
time_step = torch.tensor([10] * noise.shape[0], device=torch_device)
|
||||
# standard unet, score_vde
|
||||
t1 = get_timestep_embedding(timesteps, embedding_dim, downscale_freq_shift=1, flip_sin_to_cos=False)
|
||||
# glide, ldm
|
||||
t2 = get_timestep_embedding(timesteps, embedding_dim, downscale_freq_shift=0, flip_sin_to_cos=True)
|
||||
# grad-tts
|
||||
t3 = get_timestep_embedding(timesteps, embedding_dim, scale=1000)
|
||||
|
||||
return {"x": noise, "timesteps": time_step, "transformer_out": emb}
|
||||
|
||||
@property
|
||||
def get_input_shape(self):
|
||||
return (3, 32, 32)
|
||||
|
||||
@property
|
||||
def get_output_shape(self):
|
||||
return (6, 32, 32)
|
||||
|
||||
def prepare_init_args_and_inputs_for_common(self):
|
||||
init_dict = {
|
||||
"attention_resolutions": (2,),
|
||||
"channel_mult": (1, 2),
|
||||
"in_channels": 3,
|
||||
"out_channels": 6,
|
||||
"model_channels": 32,
|
||||
"num_head_channels": 8,
|
||||
"num_heads_upsample": 1,
|
||||
"num_res_blocks": 2,
|
||||
"resblock_updown": True,
|
||||
"resolution": 32,
|
||||
"use_scale_shift_norm": True,
|
||||
"transformer_dim": 32,
|
||||
}
|
||||
inputs_dict = self.dummy_input
|
||||
return init_dict, inputs_dict
|
||||
|
||||
def test_output(self):
|
||||
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
|
||||
model = self.model_class(**init_dict)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(**inputs_dict)
|
||||
|
||||
output, _ = torch.split(output, 3, dim=1)
|
||||
|
||||
self.assertIsNotNone(output)
|
||||
expected_shape = inputs_dict["x"].shape
|
||||
self.assertEqual(output.shape, expected_shape, "Input and output shapes do not match")
|
||||
|
||||
def test_from_pretrained_hub(self):
|
||||
model, loading_info = GlideTextToImageUNetModel.from_pretrained(
|
||||
"fusing/unet-glide-text2im-dummy", output_loading_info=True
|
||||
assert torch.allclose(
|
||||
t1[23:26, 47:50].flatten().cpu(),
|
||||
torch.tensor([0.9646, 0.9804, 0.9892, 0.9615, 0.9787, 0.9882, 0.9582, 0.9769, 0.9872]),
|
||||
1e-3,
|
||||
)
|
||||
self.assertIsNotNone(model)
|
||||
self.assertEqual(len(loading_info["missing_keys"]), 0)
|
||||
|
||||
model.to(torch_device)
|
||||
image = model(**self.dummy_input)
|
||||
|
||||
assert image is not None, "Make sure output is not None"
|
||||
|
||||
def test_output_pretrained(self):
|
||||
model = GlideTextToImageUNetModel.from_pretrained("fusing/unet-glide-text2im-dummy")
|
||||
|
||||
torch.manual_seed(0)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.manual_seed_all(0)
|
||||
|
||||
noise = torch.randn((1, model.config.in_channels, model.config.resolution, model.config.resolution)).to(
|
||||
torch_device
|
||||
assert torch.allclose(
|
||||
t2[23:26, 47:50].flatten().cpu(),
|
||||
torch.tensor([0.3019, 0.2280, 0.1716, 0.3146, 0.2377, 0.1790, 0.3272, 0.2474, 0.1864]),
|
||||
1e-3,
|
||||
)
|
||||
emb = torch.randn((1, 16, model.config.transformer_dim)).to(torch_device)
|
||||
time_step = torch.tensor([10] * noise.shape[0], device=torch_device)
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(noise, time_step, emb)
|
||||
|
||||
output, _ = torch.split(output, 3, dim=1)
|
||||
output_slice = output[0, -1, -3:, -3:].flatten()
|
||||
# fmt: off
|
||||
expected_output_slice = torch.tensor([2.7766, -10.3558, -14.9149, -0.9376, -14.9175, -17.7679, -5.5565, -12.9521, -12.9845])
|
||||
# fmt: on
|
||||
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
|
||||
|
||||
|
||||
class UNetLDMModelTests(ModelTesterMixin, unittest.TestCase):
|
||||
model_class = UNetLDMModel
|
||||
|
||||
@property
|
||||
def dummy_input(self):
|
||||
batch_size = 4
|
||||
num_channels = 4
|
||||
sizes = (32, 32)
|
||||
|
||||
noise = floats_tensor((batch_size, num_channels) + sizes).to(torch_device)
|
||||
time_step = torch.tensor([10]).to(torch_device)
|
||||
|
||||
return {"x": noise, "timesteps": time_step}
|
||||
|
||||
@property
|
||||
def get_input_shape(self):
|
||||
return (4, 32, 32)
|
||||
|
||||
@property
|
||||
def get_output_shape(self):
|
||||
return (4, 32, 32)
|
||||
|
||||
def prepare_init_args_and_inputs_for_common(self):
|
||||
init_dict = {
|
||||
"image_size": 32,
|
||||
"in_channels": 4,
|
||||
"out_channels": 4,
|
||||
"model_channels": 32,
|
||||
"num_res_blocks": 2,
|
||||
"attention_resolutions": (16,),
|
||||
"channel_mult": (1, 2),
|
||||
"num_heads": 2,
|
||||
"conv_resample": True,
|
||||
}
|
||||
inputs_dict = self.dummy_input
|
||||
return init_dict, inputs_dict
|
||||
|
||||
def test_from_pretrained_hub(self):
|
||||
model, loading_info = UNetLDMModel.from_pretrained("fusing/unet-ldm-dummy", output_loading_info=True)
|
||||
self.assertIsNotNone(model)
|
||||
self.assertEqual(len(loading_info["missing_keys"]), 0)
|
||||
|
||||
model.to(torch_device)
|
||||
image = model(**self.dummy_input)
|
||||
|
||||
assert image is not None, "Make sure output is not None"
|
||||
|
||||
def test_output_pretrained(self):
|
||||
model = UNetLDMModel.from_pretrained("fusing/unet-ldm-dummy")
|
||||
model.eval()
|
||||
|
||||
torch.manual_seed(0)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.manual_seed_all(0)
|
||||
|
||||
noise = torch.randn(1, model.config.in_channels, model.config.image_size, model.config.image_size)
|
||||
time_step = torch.tensor([10] * noise.shape[0])
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(noise, time_step)
|
||||
|
||||
output_slice = output[0, -1, -3:, -3:].flatten()
|
||||
# fmt: off
|
||||
expected_output_slice = torch.tensor([-13.3258, -20.1100, -15.9873, -17.6617, -23.0596, -17.9419, -13.3675, -16.1889, -12.3800])
|
||||
# fmt: on
|
||||
|
||||
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
|
||||
|
||||
|
||||
class UNetGradTTSModelTests(ModelTesterMixin, unittest.TestCase):
|
||||
model_class = UNetGradTTSModel
|
||||
|
||||
@property
|
||||
def dummy_input(self):
|
||||
batch_size = 4
|
||||
num_features = 32
|
||||
seq_len = 16
|
||||
|
||||
noise = floats_tensor((batch_size, num_features, seq_len)).to(torch_device)
|
||||
condition = floats_tensor((batch_size, num_features, seq_len)).to(torch_device)
|
||||
mask = floats_tensor((batch_size, 1, seq_len)).to(torch_device)
|
||||
time_step = torch.tensor([10] * batch_size).to(torch_device)
|
||||
|
||||
return {"x": noise, "timesteps": time_step, "mu": condition, "mask": mask}
|
||||
|
||||
@property
|
||||
def get_input_shape(self):
|
||||
return (4, 32, 16)
|
||||
|
||||
@property
|
||||
def get_output_shape(self):
|
||||
return (4, 32, 16)
|
||||
|
||||
def prepare_init_args_and_inputs_for_common(self):
|
||||
init_dict = {
|
||||
"dim": 64,
|
||||
"groups": 4,
|
||||
"dim_mults": (1, 2),
|
||||
"n_feats": 32,
|
||||
"pe_scale": 1000,
|
||||
"n_spks": 1,
|
||||
}
|
||||
inputs_dict = self.dummy_input
|
||||
return init_dict, inputs_dict
|
||||
|
||||
def test_from_pretrained_hub(self):
|
||||
model, loading_info = UNetGradTTSModel.from_pretrained("fusing/unet-grad-tts-dummy", output_loading_info=True)
|
||||
self.assertIsNotNone(model)
|
||||
self.assertEqual(len(loading_info["missing_keys"]), 0)
|
||||
|
||||
model.to(torch_device)
|
||||
image = model(**self.dummy_input)
|
||||
|
||||
assert image is not None, "Make sure output is not None"
|
||||
|
||||
def test_output_pretrained(self):
|
||||
model = UNetGradTTSModel.from_pretrained("fusing/unet-grad-tts-dummy")
|
||||
model.eval()
|
||||
|
||||
torch.manual_seed(0)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.manual_seed_all(0)
|
||||
|
||||
num_features = model.config.n_feats
|
||||
seq_len = 16
|
||||
noise = torch.randn((1, num_features, seq_len))
|
||||
condition = torch.randn((1, num_features, seq_len))
|
||||
mask = torch.randn((1, 1, seq_len))
|
||||
time_step = torch.tensor([10])
|
||||
|
||||
with torch.no_grad():
|
||||
output = model(noise, time_step, condition, mask)
|
||||
|
||||
output_slice = output[0, -3:, -3:].flatten()
|
||||
# fmt: off
|
||||
expected_output_slice = torch.tensor([-0.0690, -0.0531, 0.0633, -0.0660, -0.0541, 0.0650, -0.0656, -0.0555, 0.0617])
|
||||
# fmt: on
|
||||
|
||||
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
|
||||
|
||||
|
||||
class PipelineTesterMixin(unittest.TestCase):
|
||||
def test_from_pretrained_save_pretrained(self):
|
||||
# 1. Load models
|
||||
model = UNetModel(ch=32, ch_mult=(1, 2), num_res_blocks=2, attn_resolutions=(16,), resolution=32)
|
||||
schedular = DDPMScheduler(timesteps=10)
|
||||
|
||||
ddpm = DDPMPipeline(model, schedular)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
ddpm.save_pretrained(tmpdirname)
|
||||
new_ddpm = DDPMPipeline.from_pretrained(tmpdirname)
|
||||
|
||||
generator = torch.manual_seed(0)
|
||||
|
||||
image = ddpm(generator=generator)
|
||||
generator = generator.manual_seed(0)
|
||||
new_image = new_ddpm(generator=generator)
|
||||
|
||||
assert (image - new_image).abs().sum() < 1e-5, "Models don't give the same forward pass"
|
||||
|
||||
@slow
|
||||
def test_from_pretrained_hub(self):
|
||||
model_path = "fusing/ddpm-cifar10"
|
||||
|
||||
ddpm = DDPMPipeline.from_pretrained(model_path)
|
||||
ddpm_from_hub = DiffusionPipeline.from_pretrained(model_path)
|
||||
|
||||
ddpm.noise_scheduler.num_timesteps = 10
|
||||
ddpm_from_hub.noise_scheduler.num_timesteps = 10
|
||||
|
||||
generator = torch.manual_seed(0)
|
||||
|
||||
image = ddpm(generator=generator)
|
||||
generator = generator.manual_seed(0)
|
||||
new_image = ddpm_from_hub(generator=generator)
|
||||
|
||||
assert (image - new_image).abs().sum() < 1e-5, "Models don't give the same forward pass"
|
||||
|
||||
@slow
|
||||
def test_ddpm_cifar10(self):
|
||||
generator = torch.manual_seed(0)
|
||||
model_id = "fusing/ddpm-cifar10"
|
||||
|
||||
unet = UNetModel.from_pretrained(model_id)
|
||||
noise_scheduler = DDPMScheduler.from_config(model_id)
|
||||
noise_scheduler = noise_scheduler.set_format("pt")
|
||||
|
||||
ddpm = DDPMPipeline(unet=unet, noise_scheduler=noise_scheduler)
|
||||
image = ddpm(generator=generator)
|
||||
|
||||
image_slice = image[0, -1, -3:, -3:].cpu()
|
||||
|
||||
assert image.shape == (1, 3, 32, 32)
|
||||
expected_slice = torch.tensor([0.2250, 0.3375, 0.2360, 0.0930, 0.3440, 0.3156, 0.1937, 0.3585, 0.1761])
|
||||
assert (image_slice.flatten() - expected_slice).abs().max() < 1e-2
|
||||
|
||||
@slow
|
||||
def test_ddim_cifar10(self):
|
||||
generator = torch.manual_seed(0)
|
||||
model_id = "fusing/ddpm-cifar10"
|
||||
|
||||
unet = UNetModel.from_pretrained(model_id)
|
||||
noise_scheduler = DDIMScheduler(tensor_format="pt")
|
||||
|
||||
ddim = DDIMPipeline(unet=unet, noise_scheduler=noise_scheduler)
|
||||
image = ddim(generator=generator, eta=0.0)
|
||||
|
||||
image_slice = image[0, -1, -3:, -3:].cpu()
|
||||
|
||||
assert image.shape == (1, 3, 32, 32)
|
||||
expected_slice = torch.tensor(
|
||||
[-0.7383, -0.7385, -0.7298, -0.7364, -0.7414, -0.7239, -0.6737, -0.6813, -0.7068]
|
||||
assert torch.allclose(
|
||||
t3[23:26, 47:50].flatten().cpu(),
|
||||
torch.tensor([-0.9801, -0.9464, -0.9349, -0.3952, 0.8887, -0.9709, 0.5299, -0.2853, -0.9927]),
|
||||
1e-3,
|
||||
)
|
||||
assert (image_slice.flatten() - expected_slice).abs().max() < 1e-2
|
||||
|
||||
@slow
|
||||
def test_pndm_cifar10(self):
|
||||
generator = torch.manual_seed(0)
|
||||
model_id = "fusing/ddpm-cifar10"
|
||||
|
||||
unet = UNetModel.from_pretrained(model_id)
|
||||
noise_scheduler = PNDMScheduler(tensor_format="pt")
|
||||
|
||||
pndm = PNDMPipeline(unet=unet, noise_scheduler=noise_scheduler)
|
||||
image = pndm(generator=generator)
|
||||
|
||||
image_slice = image[0, -1, -3:, -3:].cpu()
|
||||
|
||||
assert image.shape == (1, 3, 32, 32)
|
||||
expected_slice = torch.tensor(
|
||||
[-0.7888, -0.7870, -0.7759, -0.7823, -0.8014, -0.7608, -0.6818, -0.7130, -0.7471]
|
||||
)
|
||||
assert (image_slice.flatten() - expected_slice).abs().max() < 1e-2
|
||||
|
||||
@slow
|
||||
def test_ldm_text2img(self):
|
||||
model_id = "fusing/latent-diffusion-text2im-large"
|
||||
ldm = LatentDiffusionPipeline.from_pretrained(model_id)
|
||||
|
||||
prompt = "A painting of a squirrel eating a burger"
|
||||
generator = torch.manual_seed(0)
|
||||
image = ldm([prompt], generator=generator, num_inference_steps=20)
|
||||
|
||||
image_slice = image[0, -1, -3:, -3:].cpu()
|
||||
|
||||
assert image.shape == (1, 3, 256, 256)
|
||||
expected_slice = torch.tensor([0.7295, 0.7358, 0.7256, 0.7435, 0.7095, 0.6884, 0.7325, 0.6921, 0.6458])
|
||||
assert (image_slice.flatten() - expected_slice).abs().max() < 1e-2
|
||||
|
||||
@slow
|
||||
def test_glide_text2img(self):
|
||||
model_id = "fusing/glide-base"
|
||||
glide = GlidePipeline.from_pretrained(model_id)
|
||||
|
||||
prompt = "a pencil sketch of a corgi"
|
||||
generator = torch.manual_seed(0)
|
||||
image = glide(prompt, generator=generator, num_inference_steps_upscale=20)
|
||||
|
||||
image_slice = image[0, :3, :3, -1].cpu()
|
||||
|
||||
assert image.shape == (1, 256, 256, 3)
|
||||
expected_slice = torch.tensor([0.7119, 0.7073, 0.6460, 0.7780, 0.7423, 0.6926, 0.7378, 0.7189, 0.7784])
|
||||
assert (image_slice.flatten() - expected_slice).abs().max() < 1e-2
|
||||
|
||||
@slow
|
||||
def test_grad_tts(self):
|
||||
model_id = "fusing/grad-tts-libri-tts"
|
||||
grad_tts = GradTTSPipeline.from_pretrained(model_id)
|
||||
noise_scheduler = GradTTSScheduler()
|
||||
grad_tts.noise_scheduler = noise_scheduler
|
||||
|
||||
text = "Hello world, I missed you so much."
|
||||
generator = torch.manual_seed(0)
|
||||
|
||||
# generate mel spectograms using text
|
||||
mel_spec = grad_tts(text, generator=generator)
|
||||
|
||||
assert mel_spec.shape == (1, 80, 143)
|
||||
expected_slice = torch.tensor(
|
||||
[-6.7584, -6.8347, -6.3293, -6.6437, -6.7233, -6.4684, -6.1187, -6.3172, -6.6890]
|
||||
)
|
||||
assert (mel_spec[0, :3, :3].cpu().flatten() - expected_slice).abs().max() < 1e-2
|
||||
|
||||
def test_module_from_pipeline(self):
|
||||
model = DiffWave(num_res_layers=4)
|
||||
noise_scheduler = DDPMScheduler(timesteps=12)
|
||||
|
||||
bddm = BDDMPipeline(model, noise_scheduler)
|
||||
|
||||
# check if the library name for the diffwave moduel is set to pipeline module
|
||||
self.assertTrue(bddm.config["diffwave"][0] == "pipeline_bddm")
|
||||
|
||||
# check if we can save and load the pipeline
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
bddm.save_pretrained(tmpdirname)
|
||||
_ = BDDMPipeline.from_pretrained(tmpdirname)
|
||||
# check if the same works using the DifusionPipeline class
|
||||
_ = DiffusionPipeline.from_pretrained(tmpdirname)
|
||||
|
||||
Reference in New Issue
Block a user