BleachNick's picture
upload required packages
87d40d2
raw
history blame
32.4 kB
import gc
import random
import traceback
import unittest
import numpy as np
import torch
from PIL import Image
from transformers import (
CLIPImageProcessor,
CLIPTextModel,
CLIPTokenizer,
CLIPVisionModelWithProjection,
GPT2Tokenizer,
)
from diffusers import (
AutoencoderKL,
DPMSolverMultistepScheduler,
UniDiffuserModel,
UniDiffuserPipeline,
UniDiffuserTextDecoder,
)
from diffusers.utils.testing_utils import (
enable_full_determinism,
floats_tensor,
load_image,
nightly,
require_torch_2,
require_torch_gpu,
run_test_in_subprocess,
torch_device,
)
from diffusers.utils.torch_utils import randn_tensor
from ..pipeline_params import (
IMAGE_TO_IMAGE_IMAGE_PARAMS,
TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS,
TEXT_GUIDED_IMAGE_VARIATION_PARAMS,
)
from ..test_pipelines_common import PipelineKarrasSchedulerTesterMixin, PipelineLatentTesterMixin, PipelineTesterMixin
enable_full_determinism()
# Will be run via run_test_in_subprocess
def _test_unidiffuser_compile(in_queue, out_queue, timeout):
error = None
try:
inputs = in_queue.get(timeout=timeout)
torch_device = inputs.pop("torch_device")
seed = inputs.pop("seed")
inputs["generator"] = torch.Generator(device=torch_device).manual_seed(seed)
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
# pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
pipe = pipe.to(torch_device)
pipe.unet.to(memory_format=torch.channels_last)
pipe.unet = torch.compile(pipe.unet, mode="reduce-overhead", fullgraph=True)
pipe.set_progress_bar_config(disable=None)
image = pipe(**inputs).images
image_slice = image[0, -3:, -3:, -1].flatten()
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.2402, 0.2375, 0.2285, 0.2378, 0.2407, 0.2263, 0.2354, 0.2307, 0.2520])
assert np.abs(image_slice - expected_slice).max() < 1e-1
except Exception:
error = f"{traceback.format_exc()}"
results = {"error": error}
out_queue.put(results, timeout=timeout)
out_queue.join()
class UniDiffuserPipelineFastTests(
PipelineTesterMixin, PipelineLatentTesterMixin, PipelineKarrasSchedulerTesterMixin, unittest.TestCase
):
pipeline_class = UniDiffuserPipeline
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS
image_params = IMAGE_TO_IMAGE_IMAGE_PARAMS
# vae_latents, not latents, is the argument that corresponds to VAE latent inputs
image_latents_params = frozenset(["vae_latents"])
def get_dummy_components(self):
unet = UniDiffuserModel.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="unet",
)
scheduler = DPMSolverMultistepScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
solver_order=3,
)
vae = AutoencoderKL.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="vae",
)
text_encoder = CLIPTextModel.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="text_encoder",
)
clip_tokenizer = CLIPTokenizer.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="clip_tokenizer",
)
image_encoder = CLIPVisionModelWithProjection.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="image_encoder",
)
# From the Stable Diffusion Image Variation pipeline tests
clip_image_processor = CLIPImageProcessor(crop_size=32, size=32)
# image_processor = CLIPImageProcessor.from_pretrained("hf-internal-testing/tiny-random-clip")
text_tokenizer = GPT2Tokenizer.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="text_tokenizer",
)
text_decoder = UniDiffuserTextDecoder.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="text_decoder",
)
components = {
"vae": vae,
"text_encoder": text_encoder,
"image_encoder": image_encoder,
"clip_image_processor": clip_image_processor,
"clip_tokenizer": clip_tokenizer,
"text_decoder": text_decoder,
"text_tokenizer": text_tokenizer,
"unet": unet,
"scheduler": scheduler,
}
return components
def get_dummy_inputs(self, device, seed=0):
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
image = image.cpu().permute(0, 2, 3, 1)[0]
image = Image.fromarray(np.uint8(image)).convert("RGB")
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "np",
}
return inputs
def get_fixed_latents(self, device, seed=0):
if isinstance(device, str):
device = torch.device(device)
generator = torch.Generator(device=device).manual_seed(seed)
# Hardcode the shapes for now.
prompt_latents = randn_tensor((1, 77, 32), generator=generator, device=device, dtype=torch.float32)
vae_latents = randn_tensor((1, 4, 16, 16), generator=generator, device=device, dtype=torch.float32)
clip_latents = randn_tensor((1, 1, 32), generator=generator, device=device, dtype=torch.float32)
latents = {
"prompt_latents": prompt_latents,
"vae_latents": vae_latents,
"clip_latents": clip_latents,
}
return latents
def get_dummy_inputs_with_latents(self, device, seed=0):
# image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
# image = image.cpu().permute(0, 2, 3, 1)[0]
# image = Image.fromarray(np.uint8(image)).convert("RGB")
image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unidiffuser/unidiffuser_example_image.jpg",
)
image = image.resize((32, 32))
latents = self.get_fixed_latents(device, seed=seed)
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "np",
"prompt_latents": latents.get("prompt_latents"),
"vae_latents": latents.get("vae_latents"),
"clip_latents": latents.get("clip_latents"),
}
return inputs
def test_dict_tuple_outputs_equivalent(self):
expected_slice = None
if torch_device == "cpu":
expected_slice = np.array([0.7489, 0.3722, 0.4475, 0.5630, 0.5923, 0.4992, 0.3936, 0.5844, 0.4975])
super().test_dict_tuple_outputs_equivalent(expected_slice=expected_slice)
def test_unidiffuser_default_joint_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
# inputs = self.get_dummy_inputs(device)
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5760, 0.6270, 0.6571, 0.4965, 0.4638, 0.5663, 0.5254, 0.5068, 0.5716])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_joint_no_cfg_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
# inputs = self.get_dummy_inputs(device)
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
# Set guidance scale to 1.0 to turn off CFG
inputs["guidance_scale"] = 1.0
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5760, 0.6270, 0.6571, 0.4965, 0.4638, 0.5663, 0.5254, 0.5068, 0.5716])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_text2img_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete image for text-conditioned image generation
del inputs["image"]
image = unidiffuser_pipe(**inputs).images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.5758, 0.6269, 0.6570, 0.4967, 0.4639, 0.5664, 0.5257, 0.5067, 0.5715])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
def test_unidiffuser_default_image_0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img'
unidiffuser_pipe.set_image_mode()
assert unidiffuser_pipe.mode == "img"
inputs = self.get_dummy_inputs(device)
# Delete prompt and image for unconditional ("marginal") text generation.
del inputs["prompt"]
del inputs["image"]
image = unidiffuser_pipe(**inputs).images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.5760, 0.6270, 0.6571, 0.4966, 0.4638, 0.5663, 0.5254, 0.5068, 0.5715])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
def test_unidiffuser_default_text_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img'
unidiffuser_pipe.set_text_mode()
assert unidiffuser_pipe.mode == "text"
inputs = self.get_dummy_inputs(device)
# Delete prompt and image for unconditional ("marginal") text generation.
del inputs["prompt"]
del inputs["image"]
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_img2text_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_joint_v1(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained("hf-internal-testing/unidiffuser-test-v1")
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
# inputs = self.get_dummy_inputs(device)
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
inputs["data_type"] = 1
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5760, 0.6270, 0.6571, 0.4965, 0.4638, 0.5663, 0.5254, 0.5068, 0.5716])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_text2img_v1(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained("hf-internal-testing/unidiffuser-test-v1")
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete image for text-conditioned image generation
del inputs["image"]
image = unidiffuser_pipe(**inputs).images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.5758, 0.6269, 0.6570, 0.4967, 0.4639, 0.5664, 0.5257, 0.5067, 0.5715])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
def test_unidiffuser_default_img2text_v1(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained("hf-internal-testing/unidiffuser-test-v1")
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_text2img_multiple_images(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs(device)
# Delete image for text-conditioned image generation
del inputs["image"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
image = unidiffuser_pipe(**inputs).images
assert image.shape == (2, 32, 32, 3)
def test_unidiffuser_img2text_multiple_prompts(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
text = unidiffuser_pipe(**inputs).text
assert len(text) == 3
def test_unidiffuser_text2img_multiple_images_with_latents(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete image for text-conditioned image generation
del inputs["image"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
image = unidiffuser_pipe(**inputs).images
assert image.shape == (2, 32, 32, 3)
def test_unidiffuser_img2text_multiple_prompts_with_latents(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
text = unidiffuser_pipe(**inputs).text
assert len(text) == 3
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=2e-4)
@require_torch_gpu
def test_unidiffuser_default_joint_v1_cuda_fp16(self):
device = "cuda"
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained(
"hf-internal-testing/unidiffuser-test-v1", torch_dtype=torch.float16
)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
inputs["data_type"] = 1
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5049, 0.5498, 0.5854, 0.3052, 0.4460, 0.6489, 0.5122, 0.4810, 0.6138])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = '" This This'
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
@require_torch_gpu
def test_unidiffuser_default_text2img_v1_cuda_fp16(self):
device = "cuda"
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained(
"hf-internal-testing/unidiffuser-test-v1", torch_dtype=torch.float16
)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["image"]
inputs["data_type"] = 1
sample = unidiffuser_pipe(**inputs)
image = sample.images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5054, 0.5498, 0.5854, 0.3052, 0.4458, 0.6489, 0.5122, 0.4810, 0.6138])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
@require_torch_gpu
def test_unidiffuser_default_img2text_v1_cuda_fp16(self):
device = "cuda"
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained(
"hf-internal-testing/unidiffuser-test-v1", torch_dtype=torch.float16
)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
inputs["data_type"] = 1
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = '" This This'
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
@nightly
@require_torch_gpu
class UniDiffuserPipelineSlowTests(unittest.TestCase):
def setUp(self):
super().setUp()
gc.collect()
torch.cuda.empty_cache()
def tearDown(self):
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def get_inputs(self, device, seed=0, generate_latents=False):
generator = torch.manual_seed(seed)
image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unidiffuser/unidiffuser_example_image.jpg"
)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 3,
"guidance_scale": 8.0,
"output_type": "np",
}
if generate_latents:
latents = self.get_fixed_latents(device, seed=seed)
for latent_name, latent_tensor in latents.items():
inputs[latent_name] = latent_tensor
return inputs
def get_fixed_latents(self, device, seed=0):
if isinstance(device, str):
device = torch.device(device)
latent_device = torch.device("cpu")
generator = torch.Generator(device=latent_device).manual_seed(seed)
# Hardcode the shapes for now.
prompt_latents = randn_tensor((1, 77, 768), generator=generator, device=device, dtype=torch.float32)
vae_latents = randn_tensor((1, 4, 64, 64), generator=generator, device=device, dtype=torch.float32)
clip_latents = randn_tensor((1, 1, 512), generator=generator, device=device, dtype=torch.float32)
# Move latents onto desired device.
prompt_latents = prompt_latents.to(device)
vae_latents = vae_latents.to(device)
clip_latents = clip_latents.to(device)
latents = {
"prompt_latents": prompt_latents,
"vae_latents": vae_latents,
"clip_latents": clip_latents,
}
return latents
def test_unidiffuser_default_joint_v1(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
# inputs = self.get_dummy_inputs(device)
inputs = self.get_inputs(device=torch_device, generate_latents=True)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.2402, 0.2375, 0.2285, 0.2378, 0.2407, 0.2263, 0.2354, 0.2307, 0.2520])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-1
expected_text_prefix = "a living room"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
def test_unidiffuser_default_text2img_v1(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.0242, 0.0103, 0.0022, 0.0129, 0.0000, 0.0090, 0.0376, 0.0508, 0.0005])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
def test_unidiffuser_default_img2text_v1(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["prompt"]
sample = pipe(**inputs)
text = sample.text
expected_text_prefix = "An astronaut"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
@unittest.skip(reason="Skip torch.compile test to speed up the slow test suite.")
@require_torch_2
def test_unidiffuser_compile(self, seed=0):
inputs = self.get_inputs(torch_device, seed=seed, generate_latents=True)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
# Can't pickle a Generator object
del inputs["generator"]
inputs["torch_device"] = torch_device
inputs["seed"] = seed
run_test_in_subprocess(test_case=self, target_func=_test_unidiffuser_compile, inputs=inputs)
@nightly
@require_torch_gpu
class UniDiffuserPipelineNightlyTests(unittest.TestCase):
def setUp(self):
super().setUp()
gc.collect()
torch.cuda.empty_cache()
def tearDown(self):
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def get_inputs(self, device, seed=0, generate_latents=False):
generator = torch.manual_seed(seed)
image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unidiffuser/unidiffuser_example_image.jpg"
)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 3,
"guidance_scale": 8.0,
"output_type": "np",
}
if generate_latents:
latents = self.get_fixed_latents(device, seed=seed)
for latent_name, latent_tensor in latents.items():
inputs[latent_name] = latent_tensor
return inputs
def get_fixed_latents(self, device, seed=0):
if isinstance(device, str):
device = torch.device(device)
latent_device = torch.device("cpu")
generator = torch.Generator(device=latent_device).manual_seed(seed)
# Hardcode the shapes for now.
prompt_latents = randn_tensor((1, 77, 768), generator=generator, device=device, dtype=torch.float32)
vae_latents = randn_tensor((1, 4, 64, 64), generator=generator, device=device, dtype=torch.float32)
clip_latents = randn_tensor((1, 1, 512), generator=generator, device=device, dtype=torch.float32)
# Move latents onto desired device.
prompt_latents = prompt_latents.to(device)
vae_latents = vae_latents.to(device)
clip_latents = clip_latents.to(device)
latents = {
"prompt_latents": prompt_latents,
"vae_latents": vae_latents,
"clip_latents": clip_latents,
}
return latents
def test_unidiffuser_default_joint_v1_fp16(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
# inputs = self.get_dummy_inputs(device)
inputs = self.get_inputs(device=torch_device, generate_latents=True)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.2402, 0.2375, 0.2285, 0.2378, 0.2407, 0.2263, 0.2354, 0.2307, 0.2520])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 2e-1
expected_text_prefix = "a living room"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
def test_unidiffuser_default_text2img_v1_fp16(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.0242, 0.0103, 0.0022, 0.0129, 0.0000, 0.0090, 0.0376, 0.0508, 0.0005])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
def test_unidiffuser_default_img2text_v1_fp16(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["prompt"]
sample = pipe(**inputs)
text = sample.text
expected_text_prefix = "An astronaut"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix