mirror of
https://github.com/huggingface/diffusers.git
synced 2026-01-29 07:22:12 +03:00
282 lines
12 KiB
Python
282 lines
12 KiB
Python
# coding=utf-8
|
|
# Copyright 2025 HuggingFace Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
import safetensors
|
|
|
|
from diffusers.loaders.lora_base import LORA_ADAPTER_METADATA_KEY
|
|
|
|
|
|
sys.path.append("..")
|
|
from test_examples_utils import ExamplesTestsAccelerate, run_command # noqa: E402
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
logger = logging.getLogger()
|
|
stream_handler = logging.StreamHandler(sys.stdout)
|
|
logger.addHandler(stream_handler)
|
|
|
|
|
|
class DreamBoothLoRAFlux(ExamplesTestsAccelerate):
|
|
instance_data_dir = "docs/source/en/imgs"
|
|
instance_prompt = "photo"
|
|
pretrained_model_name_or_path = "hf-internal-testing/tiny-flux-pipe"
|
|
script_path = "examples/dreambooth/train_dreambooth_lora_flux.py"
|
|
transformer_layer_type = "single_transformer_blocks.0.attn.to_k"
|
|
|
|
def test_dreambooth_lora_flux(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path {self.pretrained_model_name_or_path}
|
|
--instance_data_dir {self.instance_data_dir}
|
|
--instance_prompt {self.instance_prompt}
|
|
--resolution 64
|
|
--train_batch_size 1
|
|
--gradient_accumulation_steps 1
|
|
--max_train_steps 2
|
|
--learning_rate 5.0e-04
|
|
--scale_lr
|
|
--lr_scheduler constant
|
|
--lr_warmup_steps 0
|
|
--output_dir {tmpdir}
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
# save_pretrained smoke test
|
|
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")))
|
|
|
|
# make sure the state_dict has the correct naming in the parameters.
|
|
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))
|
|
is_lora = all("lora" in k for k in lora_state_dict.keys())
|
|
self.assertTrue(is_lora)
|
|
|
|
# when not training the text encoder, all the parameters in the state dict should start
|
|
# with `"transformer"` in their names.
|
|
starts_with_transformer = all(key.startswith("transformer") for key in lora_state_dict.keys())
|
|
self.assertTrue(starts_with_transformer)
|
|
|
|
def test_dreambooth_lora_text_encoder_flux(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path {self.pretrained_model_name_or_path}
|
|
--instance_data_dir {self.instance_data_dir}
|
|
--instance_prompt {self.instance_prompt}
|
|
--resolution 64
|
|
--train_batch_size 1
|
|
--train_text_encoder
|
|
--gradient_accumulation_steps 1
|
|
--max_train_steps 2
|
|
--learning_rate 5.0e-04
|
|
--scale_lr
|
|
--lr_scheduler constant
|
|
--lr_warmup_steps 0
|
|
--output_dir {tmpdir}
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
# save_pretrained smoke test
|
|
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")))
|
|
|
|
# make sure the state_dict has the correct naming in the parameters.
|
|
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))
|
|
is_lora = all("lora" in k for k in lora_state_dict.keys())
|
|
self.assertTrue(is_lora)
|
|
|
|
starts_with_expected_prefix = all(
|
|
(key.startswith("transformer") or key.startswith("text_encoder")) for key in lora_state_dict.keys()
|
|
)
|
|
self.assertTrue(starts_with_expected_prefix)
|
|
|
|
def test_dreambooth_lora_latent_caching(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path {self.pretrained_model_name_or_path}
|
|
--instance_data_dir {self.instance_data_dir}
|
|
--instance_prompt {self.instance_prompt}
|
|
--resolution 64
|
|
--train_batch_size 1
|
|
--gradient_accumulation_steps 1
|
|
--max_train_steps 2
|
|
--cache_latents
|
|
--learning_rate 5.0e-04
|
|
--scale_lr
|
|
--lr_scheduler constant
|
|
--lr_warmup_steps 0
|
|
--output_dir {tmpdir}
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
# save_pretrained smoke test
|
|
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")))
|
|
|
|
# make sure the state_dict has the correct naming in the parameters.
|
|
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))
|
|
is_lora = all("lora" in k for k in lora_state_dict.keys())
|
|
self.assertTrue(is_lora)
|
|
|
|
# when not training the text encoder, all the parameters in the state dict should start
|
|
# with `"transformer"` in their names.
|
|
starts_with_transformer = all(key.startswith("transformer") for key in lora_state_dict.keys())
|
|
self.assertTrue(starts_with_transformer)
|
|
|
|
def test_dreambooth_lora_layers(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path {self.pretrained_model_name_or_path}
|
|
--instance_data_dir {self.instance_data_dir}
|
|
--instance_prompt {self.instance_prompt}
|
|
--resolution 64
|
|
--train_batch_size 1
|
|
--gradient_accumulation_steps 1
|
|
--max_train_steps 2
|
|
--cache_latents
|
|
--learning_rate 5.0e-04
|
|
--scale_lr
|
|
--lora_layers {self.transformer_layer_type}
|
|
--lr_scheduler constant
|
|
--lr_warmup_steps 0
|
|
--output_dir {tmpdir}
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
# save_pretrained smoke test
|
|
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")))
|
|
|
|
# make sure the state_dict has the correct naming in the parameters.
|
|
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))
|
|
is_lora = all("lora" in k for k in lora_state_dict.keys())
|
|
self.assertTrue(is_lora)
|
|
|
|
# when not training the text encoder, all the parameters in the state dict should start
|
|
# with `"transformer"` in their names. In this test, we only params of
|
|
# transformer.single_transformer_blocks.0.attn.to_k should be in the state dict
|
|
starts_with_transformer = all(
|
|
key.startswith("transformer.single_transformer_blocks.0.attn.to_k") for key in lora_state_dict.keys()
|
|
)
|
|
self.assertTrue(starts_with_transformer)
|
|
|
|
def test_dreambooth_lora_flux_checkpointing_checkpoints_total_limit(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path={self.pretrained_model_name_or_path}
|
|
--instance_data_dir={self.instance_data_dir}
|
|
--output_dir={tmpdir}
|
|
--instance_prompt={self.instance_prompt}
|
|
--resolution=64
|
|
--train_batch_size=1
|
|
--gradient_accumulation_steps=1
|
|
--max_train_steps=6
|
|
--checkpoints_total_limit=2
|
|
--checkpointing_steps=2
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
|
|
self.assertEqual(
|
|
{x for x in os.listdir(tmpdir) if "checkpoint" in x},
|
|
{"checkpoint-4", "checkpoint-6"},
|
|
)
|
|
|
|
def test_dreambooth_lora_flux_checkpointing_checkpoints_total_limit_removes_multiple_checkpoints(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path={self.pretrained_model_name_or_path}
|
|
--instance_data_dir={self.instance_data_dir}
|
|
--output_dir={tmpdir}
|
|
--instance_prompt={self.instance_prompt}
|
|
--resolution=64
|
|
--train_batch_size=1
|
|
--gradient_accumulation_steps=1
|
|
--max_train_steps=4
|
|
--checkpointing_steps=2
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
|
|
self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-2", "checkpoint-4"})
|
|
|
|
resume_run_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path={self.pretrained_model_name_or_path}
|
|
--instance_data_dir={self.instance_data_dir}
|
|
--output_dir={tmpdir}
|
|
--instance_prompt={self.instance_prompt}
|
|
--resolution=64
|
|
--train_batch_size=1
|
|
--gradient_accumulation_steps=1
|
|
--max_train_steps=8
|
|
--checkpointing_steps=2
|
|
--resume_from_checkpoint=checkpoint-4
|
|
--checkpoints_total_limit=2
|
|
""".split()
|
|
|
|
run_command(self._launch_args + resume_run_args)
|
|
|
|
self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-6", "checkpoint-8"})
|
|
|
|
def test_dreambooth_lora_with_metadata(self):
|
|
# Use a `lora_alpha` that is different from `rank`.
|
|
lora_alpha = 8
|
|
rank = 4
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
test_args = f"""
|
|
{self.script_path}
|
|
--pretrained_model_name_or_path {self.pretrained_model_name_or_path}
|
|
--instance_data_dir {self.instance_data_dir}
|
|
--instance_prompt {self.instance_prompt}
|
|
--resolution 64
|
|
--train_batch_size 1
|
|
--gradient_accumulation_steps 1
|
|
--max_train_steps 2
|
|
--lora_alpha={lora_alpha}
|
|
--rank={rank}
|
|
--learning_rate 5.0e-04
|
|
--scale_lr
|
|
--lr_scheduler constant
|
|
--lr_warmup_steps 0
|
|
--output_dir {tmpdir}
|
|
""".split()
|
|
|
|
run_command(self._launch_args + test_args)
|
|
# save_pretrained smoke test
|
|
state_dict_file = os.path.join(tmpdir, "pytorch_lora_weights.safetensors")
|
|
self.assertTrue(os.path.isfile(state_dict_file))
|
|
|
|
# Check if the metadata was properly serialized.
|
|
with safetensors.torch.safe_open(state_dict_file, framework="pt", device="cpu") as f:
|
|
metadata = f.metadata() or {}
|
|
|
|
metadata.pop("format", None)
|
|
raw = metadata.get(LORA_ADAPTER_METADATA_KEY)
|
|
if raw:
|
|
raw = json.loads(raw)
|
|
|
|
loaded_lora_alpha = raw["transformer.lora_alpha"]
|
|
self.assertTrue(loaded_lora_alpha == lora_alpha)
|
|
loaded_lora_rank = raw["transformer.r"]
|
|
self.assertTrue(loaded_lora_rank == rank)
|