import io import os import base64 from PIL import Image import cv2 import numpy as np from scripts.generate_prompt import load_wd14_tagger_model, generate_tags, preprocess_image as wd14_preprocess_image from scripts.lineart_util import scribble_xdog, get_sketch, canny import torch from diffusers import StableDiffusionPipeline, StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler, AutoencoderKL import gc from peft import PeftModel from dotenv import load_dotenv from scripts.hf_utils import download_file # グローバル変数 use_local = False model = None device = None torch_dtype = None # torch.float16 if device == "cuda" else torch.float32 sotai_gen_pipe = None refine_gen_pipe = None def get_file_path(filename, subfolder): if use_local: return os.path.join(subfolder, filename) else: return download_file(filename, subfolder) def ensure_rgb(image): if image.mode != 'RGB': return image.convert('RGB') return image def initialize(_use_local, use_gpu): # load_dotenv() global model, sotai_gen_pipe, refine_gen_pipe, use_local, device, torch_dtype device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if device == "cuda" else torch.float32 use_local = _use_local print('') print(f"Device: {device}, Local model: {_use_local}") print('') model = load_wd14_tagger_model() sotai_gen_pipe = initialize_sotai_model() refine_gen_pipe = initialize_refine_model() def load_lora(pipeline, lora_path, alpha=0.75): pipeline.load_lora_weights(lora_path) pipeline.fuse_lora(lora_scale=alpha) def initialize_sotai_model(): global device, torch_dtype sotai_sd_model_path = get_file_path(os.environ["sotai_sd_model_name"], subfolder=os.environ["sd_models_dir"]) # controlnet_path1 = get_file_path(os.environ["controlnet_name1"], subfolder=os.environ["controlnet_dir2"]) controlnet_path1 = get_file_path(os.environ["controlnet_name2"], subfolder=os.environ["controlnet_dir1"]) controlnet_path2 = get_file_path(os.environ["controlnet_name2"], subfolder=os.environ["controlnet_dir1"]) print(use_local, controlnet_path1) # Load the Stable Diffusion model sd_pipe = StableDiffusionPipeline.from_single_file( sotai_sd_model_path, torch_dtype=torch_dtype, use_safetensors=True ).to(device) # Load the ControlNet model controlnet1 = ControlNetModel.from_single_file( controlnet_path1, torch_dtype=torch_dtype ).to(device) # Load the ControlNet model controlnet2 = ControlNetModel.from_single_file( controlnet_path2, torch_dtype=torch_dtype ).to(device) # Create the ControlNet pipeline sotai_gen_pipe = StableDiffusionControlNetPipeline( vae=sd_pipe.vae, text_encoder=sd_pipe.text_encoder, tokenizer=sd_pipe.tokenizer, unet=sd_pipe.unet, scheduler=sd_pipe.scheduler, safety_checker=sd_pipe.safety_checker, feature_extractor=sd_pipe.feature_extractor, controlnet=[controlnet1, controlnet2] ).to(device) # LoRAの適用 lora_names = [ (os.environ["lora_name1"], 1.0), # (os.environ["lora_name2"], 0.3), ] for lora_name, alpha in lora_names: lora_path = get_file_path(lora_name, subfolder=os.environ["lora_dir"]) load_lora(sotai_gen_pipe, lora_path, alpha) # スケジューラーの設定 sotai_gen_pipe.scheduler = UniPCMultistepScheduler.from_config(sotai_gen_pipe.scheduler.config) return sotai_gen_pipe def initialize_refine_model(): global device, torch_dtype refine_sd_model_path = get_file_path(os.environ["refine_sd_model_name"], subfolder=os.environ["sd_models_dir"]) controlnet_path3 = get_file_path(os.environ["controlnet_name3"], subfolder=os.environ["controlnet_dir1"]) controlnet_path4 = get_file_path(os.environ["controlnet_name4"], subfolder=os.environ["controlnet_dir1"]) vae_path = get_file_path(os.environ["vae_name"], subfolder=os.environ["vae_dir"]) # Load the Stable Diffusion model sd_pipe = StableDiffusionPipeline.from_single_file( refine_sd_model_path, torch_dtype=torch_dtype, use_safetensors=True ).to(device) # controlnet_path = "models/cn/control_v11p_sd15_canny.pth" controlnet1 = ControlNetModel.from_single_file( controlnet_path3, torch_dtype=torch_dtype ).to(device) # Load the ControlNet model controlnet2 = ControlNetModel.from_single_file( controlnet_path4, torch_dtype=torch_dtype ).to(device) # Create the ControlNet pipeline refine_gen_pipe = StableDiffusionControlNetPipeline( vae=AutoencoderKL.from_single_file(vae_path, torch_dtype=torch_dtype).to(device), text_encoder=sd_pipe.text_encoder, tokenizer=sd_pipe.tokenizer, unet=sd_pipe.unet, scheduler=sd_pipe.scheduler, safety_checker=sd_pipe.safety_checker, feature_extractor=sd_pipe.feature_extractor, controlnet=[controlnet1, controlnet2], # 複数のControlNetを指定 ).to(device) # スケジューラーの設定 refine_gen_pipe.scheduler = UniPCMultistepScheduler.from_config(refine_gen_pipe.scheduler.config) return refine_gen_pipe def get_wd_tags(images: list) -> list: global model if model is None: raise ValueError("Model is not initialized") # initialize() preprocessed_images = [wd14_preprocess_image(img) for img in images] preprocessed_images = np.array(preprocessed_images) return generate_tags(preprocessed_images, os.environ["wd_model_name"], model) def preprocess_image_for_generation(image): if isinstance(image, str): # base64文字列の場合 image = Image.open(io.BytesIO(base64.b64decode(image))) elif isinstance(image, np.ndarray): # numpy配列の場合 image = Image.fromarray(image) elif not isinstance(image, Image.Image): raise ValueError("Unsupported image type") # 画像サイズの計算 input_width, input_height = image.size max_size = 736 output_width = max_size if input_height < input_width else int(input_width / input_height * max_size) output_height = max_size if input_height > input_width else int(input_height / input_width * max_size) image = image.resize((output_width, output_height)) return image, output_width, output_height def binarize_image(image: Image.Image) -> np.ndarray: image = np.array(image.convert('L')) # 色反転 image = 255 - image # ヒストグラム平坦化 clahe = cv2.createCLAHE(clipLimit=1.0, tileGridSize=(8, 8)) image = clahe.apply(image) # ガウシアンブラー適用 image = cv2.GaussianBlur(image, (5, 5), 0) # 適応的二値化 binary_image = cv2.adaptiveThreshold(image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 9, -8) return binary_image def create_rgba_image(binary_image: np.ndarray, color: list) -> Image.Image: rgba_image = np.zeros((binary_image.shape[0], binary_image.shape[1], 4), dtype=np.uint8) rgba_image[:, :, 0] = color[0] rgba_image[:, :, 1] = color[1] rgba_image[:, :, 2] = color[2] rgba_image[:, :, 3] = binary_image return Image.fromarray(rgba_image, 'RGBA') def generate_sotai_image(input_image: Image.Image, output_width: int, output_height: int) -> Image.Image: input_image = ensure_rgb(input_image) global sotai_gen_pipe if sotai_gen_pipe is None: raise ValueError("Model is not initialized") # initialize() prompt = "anime pose, girl, (white background:1.5), (monochrome:1.5), full body, sketch, eyes, breasts, (slim legs, skinny legs:1.2)" try: # 入力画像のリサイズ if input_image.size[0] > input_image.size[1]: input_image = input_image.resize((512, int(512 * input_image.size[1] / input_image.size[0]))) else: input_image = input_image.resize((int(512 * input_image.size[0] / input_image.size[1]), 512)) # EasyNegativeV2の内容 easy_negative_v2 = "(worst quality, low quality, normal quality:1.4), lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, jpeg artifacts, signature, watermark, username, blurry, artist name, (bad_prompt_version2:0.8)" output = sotai_gen_pipe( prompt, image=[input_image, input_image], negative_prompt=f"(wings:1.6), (clothes, garment, lighting, gray, missing limb, extra line, extra limb, extra arm, extra legs, hair, bangs, fringe, forelock, front hair, fill:1.4), (ink pool:1.6)", # negative_prompt=f"{easy_negative_v2}, (wings:1.6), (clothes, garment, lighting, gray, missing limb, extra line, extra limb, extra arm, extra legs, hair, bangs, fringe, forelock, front hair, fill:1.4), (ink pool:1.6)", num_inference_steps=20, guidance_scale=8, width=output_width, height=output_height, denoising_strength=0.13, num_images_per_prompt=1, # Equivalent to batch_size guess_mode=[True, True], # Equivalent to pixel_perfect controlnet_conditioning_scale=[1.2, 1.3], # 各ControlNetの重み guidance_start=[0.0, 0.0], guidance_end=[1.0, 1.0], ) generated_image = output.images[0] return generated_image finally: # メモリ解放 if device == "cuda": torch.cuda.empty_cache() gc.collect() def generate_refined_image(prompt: str, original_image: Image.Image, output_width: int, output_height: int, weight1: float, weight2: float) -> Image.Image: original_image = ensure_rgb(original_image) global refine_gen_pipe if refine_gen_pipe is None: raise ValueError("Model is not initialized") # initialize() try: original_image_np = np.array(original_image) # scribble_xdog scribble_image, _ = scribble_xdog(original_image_np, 2048, 20) original_image = original_image.resize((output_width, output_height)) output = refine_gen_pipe( prompt, image=[scribble_image, original_image], # 2つのControlNetに対応する入力画像 negative_prompt="extra limb, monochrome, black and white", num_inference_steps=20, width=output_width, height=output_height, controlnet_conditioning_scale=[weight1, weight2], # 各ControlNetの重み control_guidance_start=[0.0, 0.0], control_guidance_end=[1.0, 1.0], guess_mode=[False, False], # pixel_perfect ) generated_image = output.images[0] return generated_image finally: # メモリ解放 if device == "cuda": torch.cuda.empty_cache() gc.collect() def process_image(input_image, mode: str, weight1: float = 0.4, weight2: float = 0.3): input_image = ensure_rgb(input_image) # サイズを取得 input_width, input_height = input_image.size max_size = 736 output_width = max_size if input_height < input_width else int(input_width / input_height * max_size) output_height = max_size if input_height > input_width else int(input_height / input_width * max_size) if mode == "refine": # WD-14 taggerを使用してプロンプトを生成 image_np = np.array(ensure_rgb(input_image)) prompt = get_wd_tags([image_np])[0] prompt = f"{prompt}" print(prompt) refined_image = generate_refined_image(prompt, input_image, output_width, output_height, weight1, weight2) refined_image = refined_image.convert('RGB') # スケッチ画像を生成 refined_image_np = np.array(refined_image) sketch_image = get_sketch(refined_image_np, "both", 2048, 10) sketch_image = sketch_image.resize((output_width, output_height)) # 画像サイズを合わせる # スケッチ画像の二値化 sketch_binary = binarize_image(sketch_image) # RGBAに変換(透明なベース画像を作成)して、青い線を設定 sketch_image = create_rgba_image(sketch_binary, [0, 0, 255]) # 素体画像の生成 sotai_image = generate_sotai_image(refined_image, output_width, output_height) elif mode == "original": sotai_image = generate_sotai_image(input_image, output_width, output_height) # スケッチ画像の生成 input_image_np = np.array(input_image) sketch_image = get_sketch(input_image_np, "both", 2048, 16) elif mode == "sketch": # スケッチ画像の生成 input_image_np = np.array(input_image) sketch_image = get_sketch(input_image_np, "both", 2048, 16) # 素体画像の生成 sotai_image = generate_sotai_image(sketch_image, output_width, output_height) else: raise ValueError("Invalid mode") # 素体画像の二値化 sotai_binary = binarize_image(sotai_image) # RGBAに変換(透明なベース画像を作成)して、赤い線を設定 sotai_image = create_rgba_image(sotai_binary, [255, 0, 0]) return sotai_image, sketch_image def image_to_base64(img_array): buffered = io.BytesIO() img_array.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() def process_image_as_base64(input_image, mode: str, weight1: float = 0.4, weight2: float = 0.3): sotai_image, sketch_image = process_image(input_image, mode, weight1, weight2) return image_to_base64(sotai_image), image_to_base64(sketch_image)