ai-model-002 / app.py
jiuface's picture
Update app.py
facc283 verified
import spaces
from typing import Tuple, Union, List
import os
import time
import numpy as np
from PIL import Image
import requests
import torch
from diffusers import StableDiffusionControlNetImg2ImgPipeline, ControlNetModel, DDIMScheduler
from diffusers.pipelines.stable_diffusion import StableDiffusionSafetyChecker
from diffusers.models import AutoencoderKL
from diffusers.models.attention_processor import AttnProcessor2_0
from diffusers.pipelines.controlnet import StableDiffusionControlNetInpaintPipeline
from diffusers import ControlNetModel, UniPCMultistepScheduler, AutoPipelineForText2Image
from transformers import AutoImageProcessor, UperNetForSemanticSegmentation, AutoModelForDepthEstimation
from colors import ade_palette
from utils import map_colors_rgb
from diffusers import StableDiffusionXLPipeline
import gradio as gr
import gc
device = "cuda"
dtype = torch.float16
css = """
#img-display-container {
max-height: 50vh;
}
#img-display-input {
max-height: 40vh;
}
#img-display-output {
max-height: 40vh;
}
"""
def download_file(url, folder_path, filename):
if not os.path.exists(folder_path):
os.makedirs(folder_path)
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
print(f"File already exists: {file_path}")
else:
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024):
file.write(chunk)
print(f"File successfully downloaded and saved: {file_path}")
else:
print(f"Error downloading the file. Status code: {response.status_code}")
def download_models():
models = {
"MODEL": ("https://huggingface.co/dantea1118/juggernaut_reborn/resolve/main/juggernaut_reborn.safetensors?download=true", "models/models/Stable-diffusion", "juggernaut_reborn.safetensors"),
"UPSCALER_X2": ("https://huggingface.co/ai-forever/Real-ESRGAN/resolve/main/RealESRGAN_x2.pth?download=true", "models/upscalers/", "RealESRGAN_x2.pth"),
"UPSCALER_X4": ("https://huggingface.co/ai-forever/Real-ESRGAN/resolve/main/RealESRGAN_x4.pth?download=true", "models/upscalers/", "RealESRGAN_x4.pth"),
"NEGATIVE_1": ("https://huggingface.co/philz1337x/embeddings/resolve/main/verybadimagenegative_v1.3.pt?download=true", "models/embeddings", "verybadimagenegative_v1.3.pt"),
"NEGATIVE_2": ("https://huggingface.co/datasets/AddictiveFuture/sd-negative-embeddings/resolve/main/JuggernautNegative-neg.pt?download=true", "models/embeddings", "JuggernautNegative-neg.pt"),
"LORA_1": ("https://huggingface.co/philz1337x/loras/resolve/main/SDXLrender_v2.0.safetensors?download=true", "models/Lora", "SDXLrender_v2.0.safetensors"),
"LORA_2": ("https://huggingface.co/philz1337x/loras/resolve/main/more_details.safetensors?download=true", "models/Lora", "more_details.safetensors"),
"CONTROLNET": ("https://huggingface.co/lllyasviel/ControlNet-v1-1/resolve/main/control_v11f1e_sd15_tile.pth?download=true", "models/ControlNet", "control_v11f1e_sd15_tile.pth"),
"VAE": ("https://huggingface.co/stabilityai/sd-vae-ft-mse-original/resolve/main/vae-ft-mse-840000-ema-pruned.safetensors?download=true", "models/VAE", "vae-ft-mse-840000-ema-pruned.safetensors"),
}
for model, (url, folder, filename) in models.items():
download_file(url, folder, filename)
def timer_func(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
return result
return wrapper
class LazyLoadPipeline:
def __init__(self):
self.pipe = None
@timer_func
def load(self):
if self.pipe is None:
print("Starting to load the pipeline...")
self.pipe = self.setup_pipeline()
print(f"Moving pipeline to device: {device}")
self.pipe.to(device)
if USE_TORCH_COMPILE:
print("Compiling the model...")
self.pipe.unet = torch.compile(self.pipe.unet, mode="reduce-overhead", fullgraph=True)
@timer_func
def setup_pipeline(self):
print("Setting up the pipeline...")
controlnet = ControlNetModel.from_single_file(
"models/ControlNet/control_v11f1e_sd15_tile.pth", torch_dtype=torch.float16
)
safety_checker = StableDiffusionSafetyChecker.from_pretrained("CompVis/stable-diffusion-safety-checker")
model_path = "models/models/Stable-diffusion/juggernaut_reborn.safetensors"
pipe = StableDiffusionControlNetImg2ImgPipeline.from_single_file(
model_path,
controlnet=controlnet,
torch_dtype=torch.float16,
use_safetensors=True,
safety_checker=safety_checker
)
vae = AutoencoderKL.from_single_file(
"models/VAE/vae-ft-mse-840000-ema-pruned.safetensors",
torch_dtype=torch.float16
)
pipe.vae = vae
pipe.load_textual_inversion("models/embeddings/verybadimagenegative_v1.3.pt")
pipe.load_textual_inversion("models/embeddings/JuggernautNegative-neg.pt")
pipe.load_lora_weights("models/Lora/SDXLrender_v2.0.safetensors")
pipe.fuse_lora(lora_scale=0.5)
pipe.load_lora_weights("models/Lora/more_details.safetensors")
pipe.fuse_lora(lora_scale=1.)
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.3, b2=1.4)
return pipe
def __call__(self, *args, **kwargs):
return self.pipe(*args, **kwargs)
class LazyRealESRGAN:
def __init__(self, device, scale):
self.device = device
self.scale = scale
self.model = None
def load_model(self):
if self.model is None:
self.model = RealESRGAN(self.device, scale=self.scale)
self.model.load_weights(f'models/upscalers/RealESRGAN_x{self.scale}.pth', download=False)
def predict(self, img):
self.load_model()
return self.model.predict(img)
@timer_func
def resize_and_upscale(input_image, resolution):
scale = 2 if resolution <= 2048 else 4
input_image = input_image.convert("RGB")
W, H = input_image.size
k = float(resolution) / min(H, W)
H = int(round(H * k / 64.0)) * 64
W = int(round(W * k / 64.0)) * 64
img = input_image.resize((W, H), resample=Image.LANCZOS)
if scale == 2:
img = lazy_realesrgan_x2.predict(img)
else:
img = lazy_realesrgan_x4.predict(img)
return img
@timer_func
def create_hdr_effect(original_image, hdr):
if hdr == 0:
return original_image
cv_original = cv2.cvtColor(np.array(original_image), cv2.COLOR_RGB2BGR)
factors = [1.0 - 0.9 * hdr, 1.0 - 0.7 * hdr, 1.0 - 0.45 * hdr,
1.0 - 0.25 * hdr, 1.0, 1.0 + 0.2 * hdr,
1.0 + 0.4 * hdr, 1.0 + 0.6 * hdr, 1.0 + 0.8 * hdr]
images = [cv2.convertScaleAbs(cv_original, alpha=factor) for factor in factors]
merge_mertens = cv2.createMergeMertens()
hdr_image = merge_mertens.process(images)
hdr_image_8bit = np.clip(hdr_image * 255, 0, 255).astype('uint8')
return Image.fromarray(cv2.cvtColor(hdr_image_8bit, cv2.COLOR_BGR2RGB))
def prepare_image(input_image, resolution, hdr):
condition_image = resize_and_upscale(input_image, resolution)
condition_image = create_hdr_effect(condition_image, hdr)
return condition_image
@spaces.GPU
@timer_func
def gradio_process_image(input_image, resolution, num_inference_steps, strength, hdr, guidance_scale):
print("Starting image processing...")
torch.cuda.empty_cache()
condition_image = prepare_image(input_image, resolution, hdr)
prompt = "masterpiece, best quality, highres"
negative_prompt = "low quality, normal quality, ugly, blurry, blur, lowres, bad anatomy, bad hands, cropped, worst quality, verybadimagenegative_v1.3, JuggernautNegative-neg"
options = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"image": condition_image,
"control_image": condition_image,
"width": condition_image.size[0],
"height": condition_image.size[1],
"strength": strength,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
"generator": torch.Generator(device=device).manual_seed(0),
}
print("Running inference...")
result = lazy_pipe(**options).images[0]
print("Image processing completed successfully")
# Convert input_image and result to numpy arrays
input_array = np.array(input_image)
result_array = np.array(result)
return [input_array, result_array]
def filter_items(
colors_list: Union[List, np.ndarray],
items_list: Union[List, np.ndarray],
items_to_remove: Union[List, np.ndarray]
) -> Tuple[Union[List, np.ndarray], Union[List, np.ndarray]]:
"""
Filters items and their corresponding colors from given lists, excluding
specified items.
Args:
colors_list: A list or numpy array of colors corresponding to items.
items_list: A list or numpy array of items.
items_to_remove: A list or numpy array of items to be removed.
Returns:
A tuple of two lists or numpy arrays: filtered colors and filtered
items.
"""
filtered_colors = []
filtered_items = []
for color, item in zip(colors_list, items_list):
if item not in items_to_remove:
filtered_colors.append(color)
filtered_items.append(item)
return filtered_colors, filtered_items
def get_segmentation_pipeline(
) -> Tuple[AutoImageProcessor, UperNetForSemanticSegmentation]:
"""Method to load the segmentation pipeline
Returns:
Tuple[AutoImageProcessor, UperNetForSemanticSegmentation]: segmentation pipeline
"""
image_processor = AutoImageProcessor.from_pretrained(
"openmmlab/upernet-convnext-small"
)
image_segmentor = UperNetForSemanticSegmentation.from_pretrained(
"openmmlab/upernet-convnext-small"
)
return image_processor, image_segmentor
@torch.inference_mode()
@spaces.GPU
@timer_func
def segment_image(
image: Image,
image_processor: AutoImageProcessor,
image_segmentor: UperNetForSemanticSegmentation
) -> Image:
"""
Segments an image using a semantic segmentation model.
Args:
image (Image): The input image to be segmented.
image_processor (AutoImageProcessor): The processor to prepare the
image for segmentation.
image_segmentor (UperNetForSemanticSegmentation): The semantic
segmentation model used to identify different segments in the image.
Returns:
Image: The segmented image with each segment colored differently based
on its identified class.
"""
# image_processor, image_segmentor = get_segmentation_pipeline()
pixel_values = image_processor(image, return_tensors="pt").pixel_values
with torch.no_grad():
outputs = image_segmentor(pixel_values)
seg = image_processor.post_process_semantic_segmentation(
outputs, target_sizes=[image.size[::-1]])[0]
color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8)
palette = np.array(ade_palette())
for label, color in enumerate(palette):
color_seg[seg == label, :] = color
color_seg = color_seg.astype(np.uint8)
seg_image = Image.fromarray(color_seg).convert('RGB')
return seg_image
def get_depth_pipeline():
feature_extractor = AutoImageProcessor.from_pretrained("LiheYoung/depth-anything-large-hf",
torch_dtype=dtype)
depth_estimator = AutoModelForDepthEstimation.from_pretrained("LiheYoung/depth-anything-large-hf",
torch_dtype=dtype)
return feature_extractor, depth_estimator
@torch.inference_mode()
@spaces.GPU
@timer_func
def get_depth_image(
image: Image,
feature_extractor: AutoImageProcessor,
depth_estimator: AutoModelForDepthEstimation
) -> Image:
image_to_depth = feature_extractor(images=image, return_tensors="pt").to(device)
with torch.no_grad():
depth_map = depth_estimator(**image_to_depth).predicted_depth
width, height = image.size
depth_map = torch.nn.functional.interpolate(
depth_map.unsqueeze(1).float(),
size=(height, width),
mode="bicubic",
align_corners=False,
)
depth_min = torch.amin(depth_map, dim=[1, 2, 3], keepdim=True)
depth_max = torch.amax(depth_map, dim=[1, 2, 3], keepdim=True)
depth_map = (depth_map - depth_min) / (depth_max - depth_min)
image = torch.cat([depth_map] * 3, dim=1)
image = image.permute(0, 2, 3, 1).cpu().numpy()[0]
image = Image.fromarray((image * 255.0).clip(0, 255).astype(np.uint8))
return image
def resize_dimensions(dimensions, target_size):
"""
Resize PIL to target size while maintaining aspect ratio
If smaller than target size leave it as is
"""
width, height = dimensions
# Check if both dimensions are smaller than the target size
if width < target_size and height < target_size:
return dimensions
# Determine the larger side
if width > height:
# Calculate the aspect ratio
aspect_ratio = height / width
# Resize dimensions
return (target_size, int(target_size * aspect_ratio))
else:
# Calculate the aspect ratio
aspect_ratio = width / height
# Resize dimensions
return (int(target_size * aspect_ratio), target_size)
def flush():
gc.collect()
torch.cuda.empty_cache()
class ControlNetDepthDesignModelMulti:
""" Produces random noise images """
def __init__(self):
""" Initialize your model(s) here """
#os.environ['HF_HUB_OFFLINE'] = "True"
self.seed = 323*111
self.neg_prompt = "window, door, low resolution, banner, logo, watermark, text, deformed, blurry, out of focus, surreal, ugly, beginner"
self.control_items = ["windowpane;window", "door;double;door"]
self.additional_quality_suffix = "interior design, 4K, high resolution, photorealistic"
@spaces.GPU
@timer_func
def generate_design(self, empty_room_image: Image, prompt: str, guidance_scale: int = 10, num_steps: int = 50, strength: float =0.9, img_size: int = 640) -> Image:
"""
Given an image of an empty room and a prompt
generate the designed room according to the prompt
Inputs -
empty_room_image - An RGB PIL Image of the empty room
prompt - Text describing the target design elements of the room
Returns -
design_image - PIL Image of the same size as the empty room image
If the size is not the same the submission will fail.
"""
print(prompt)
flush()
self.generator = torch.Generator(device=device).manual_seed(self.seed)
pos_prompt = prompt + f', {self.additional_quality_suffix}'
orig_w, orig_h = empty_room_image.size
new_width, new_height = resize_dimensions(empty_room_image.size, img_size)
input_image = empty_room_image.resize((new_width, new_height))
real_seg = np.array(segment_image(input_image,
seg_image_processor,
image_segmentor))
unique_colors = np.unique(real_seg.reshape(-1, real_seg.shape[2]), axis=0)
unique_colors = [tuple(color) for color in unique_colors]
segment_items = [map_colors_rgb(i) for i in unique_colors]
chosen_colors, segment_items = filter_items(
colors_list=unique_colors,
items_list=segment_items,
items_to_remove=self.control_items
)
mask = np.zeros_like(real_seg)
for color in chosen_colors:
color_matches = (real_seg == color).all(axis=2)
mask[color_matches] = 1
image_np = np.array(input_image)
image = Image.fromarray(image_np).convert("RGB")
mask_image = Image.fromarray((mask * 255).astype(np.uint8)).convert("RGB")
segmentation_cond_image = Image.fromarray(real_seg).convert("RGB")
image_depth = get_depth_image(image, depth_feature_extractor, depth_estimator)
# generate image that would be used as IP-adapter
flush()
new_width_ip = int(new_width / 8) * 8
new_height_ip = int(new_height / 8) * 8
ip_image = guide_pipe(pos_prompt,
num_inference_steps=num_steps,
negative_prompt=self.neg_prompt,
height=new_height_ip,
width=new_width_ip,
generator=[self.generator]).images[0]
flush()
generated_image = pipe(
prompt=pos_prompt,
negative_prompt=self.neg_prompt,
num_inference_steps=num_steps,
strength=strength,
guidance_scale=guidance_scale,
generator=[self.generator],
image=image,
mask_image=mask_image,
ip_adapter_image=ip_image,
control_image=[image_depth, segmentation_cond_image],
controlnet_conditioning_scale=[0.5, 0.5]
).images[0]
flush()
design_image = generated_image.resize(
(orig_w, orig_h), Image.Resampling.LANCZOS
)
return design_image
def create_demo(model):
gr.Markdown("### Just try space ...")
with gr.Row():
with gr.Column():
input_image = gr.Image(label="Input Image", type='pil', elem_id='img-display-input')
input_text = gr.Textbox(label='Prompt', placeholder='Please upload your image first', lines=2)
with gr.Accordion('Advanced options', open=False):
num_steps = gr.Slider(label='Steps',
minimum=1,
maximum=50,
value=50,
step=1)
img_size = gr.Slider(label='Image size',
minimum=256,
maximum=768,
value=768,
step=64)
guidance_scale = gr.Slider(label='Guidance Scale',
minimum=0.1,
maximum=30.0,
value=10.0,
step=0.1)
seed = gr.Slider(label='Seed',
minimum=-1,
maximum=2147483647,
value=323*111,
step=1,
randomize=True)
strength = gr.Slider(label='Strength',
minimum=0.1,
maximum=1.0,
value=0.9,
step=0.1)
a_prompt = gr.Textbox(
label='Added Prompt',
value="interior design, 4K, high resolution, photorealistic")
n_prompt = gr.Textbox(
label='Negative Prompt',
value="window, door, low resolution, banner, logo, watermark, text, deformed, blurry, out of focus, surreal, ugly, beginner")
submit = gr.Button("Submit")
with gr.Column():
design_image = gr.Image(label="Output Mask", elem_id='img-display-output')
def on_submit(image, text, num_steps, guidance_scale, seed, strength, a_prompt, n_prompt, img_size):
model.seed = seed
model.neg_prompt = n_prompt
model.additional_quality_suffix = a_prompt
with torch.no_grad():
out_img = model.generate_design(image, text, guidance_scale=guidance_scale, num_steps=num_steps, strength=strength, img_size=img_size)
return out_img
submit.click(on_submit, inputs=[input_image, input_text, num_steps, guidance_scale, seed, strength, a_prompt, n_prompt, img_size], outputs=design_image)
controlnet_depth= ControlNetModel.from_pretrained(
"controlnet_depth", torch_dtype=dtype, use_safetensors=True)
controlnet_seg = ControlNetModel.from_pretrained(
"own_controlnet", torch_dtype=dtype, use_safetensors=True)
pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained(
"SG161222/Realistic_Vision_V6.0_B1_noVAE",
#"models/runwayml--stable-diffusion-inpainting",
controlnet=[controlnet_depth, controlnet_seg],
safety_checker=None,
torch_dtype=dtype
)
pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models",
weight_name="ip-adapter_sd15.bin")
pipe.set_ip_adapter_scale(0.4)
pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config)
pipe = pipe.to(device)
guide_pipe = StableDiffusionXLPipeline.from_pretrained("segmind/SSD-1B",
torch_dtype=dtype, use_safetensors=True, variant="fp16")
guide_pipe = guide_pipe.to(device)
seg_image_processor, image_segmentor = get_segmentation_pipeline()
depth_feature_extractor, depth_estimator = get_depth_pipeline()
depth_estimator = depth_estimator.to(device)
#download_models()
#lazy_realesrgan_x2 = LazyRealESRGAN(device, scale=2)
#lazy_realesrgan_x4 = LazyRealESRGAN(device, scale=4)
#lazy_pipe = LazyLoadPipeline()
#lazy_pipe.load()
def main():
model = ControlNetDepthDesignModelMulti()
print('Models uploaded successfully')
title = "# Just try zeroGPU"
description = """
For test only
"""
with gr.Blocks() as demo:
gr.Markdown(title)
gr.Markdown(description)
create_demo(model)
demo.queue().launch(share=False)
if __name__ == '__main__':
main()