|
|
|
import json |
|
import numpy as np |
|
import torch |
|
|
|
|
|
|
|
|
|
import triton_python_backend_utils as pb_utils |
|
from diffusers import (StableDiffusionXLPipeline, |
|
AutoencoderKL, |
|
ControlNetModel, |
|
StableDiffusionXLImg2ImgPipeline, |
|
StableDiffusionXLControlNetPipeline, |
|
StableDiffusionXLControlNetImg2ImgPipeline, |
|
StableDiffusionPipeline) |
|
|
|
from diffusers.utils import load_image |
|
|
|
from PIL import Image |
|
|
|
|
|
def prepare_tpose_image(img): |
|
tpose_img_ratio = {} |
|
padding_color = (0, 0, 0) |
|
|
|
|
|
padded_image = Image.new(img.mode, (1024, 768), padding_color) |
|
img768 = img.resize((768,768)) |
|
padded_image.paste(img768, ((1024 - 768) // 2, 0)) |
|
tpose_img_ratio[0] = padded_image |
|
|
|
|
|
img800 = img.resize((800, 800)) |
|
tpose_img_ratio[1] = img800 |
|
|
|
|
|
padded_image = Image.new(img.mode, (600, 800), padding_color) |
|
img600 = img.resize((600, 600)) |
|
padded_image.paste(img600, (0, (800 - 600) // 2)) |
|
tpose_img_ratio[2] = padded_image |
|
|
|
|
|
padded_image = Image.new(img.mode, (1024, 576), padding_color) |
|
img576 = img.resize((576, 576)) |
|
padded_image.paste(img576, ((1024 - 576) // 2, 0)) |
|
tpose_img_ratio[3] = padded_image |
|
|
|
|
|
padded_image = Image.new(img.mode, (448, 800), padding_color) |
|
img448 = img.resize((448, 448)) |
|
padded_image.paste(img448, (0, (800 - 448) // 2)) |
|
tpose_img_ratio[4] = padded_image |
|
|
|
|
|
padded_image = Image.new(img.mode, (1024, 680), padding_color) |
|
img576 = img.resize((680, 680)) |
|
padded_image.paste(img576, ((1024 - 680) // 2, 0)) |
|
tpose_img_ratio[5] = padded_image |
|
|
|
|
|
padded_image = Image.new(img.mode, (528, 800), padding_color) |
|
img448 = img.resize((528, 528)) |
|
padded_image.paste(img448, (0, (800 - 528) // 2)) |
|
tpose_img_ratio[6] = padded_image |
|
|
|
return tpose_img_ratio |
|
|
|
|
|
class TritonPythonModel: |
|
"""Your Python model must use the same class name. |
|
|
|
Every Python model that is created must have "TritonPythonModel" as the |
|
class name. |
|
""" |
|
|
|
def initialize(self, args): |
|
"""`initialize` is called only once when the model is being loaded. |
|
Implementing `initialize` function is optional. This function allows |
|
the model to initialize any state associated with this model. |
|
Parameters |
|
---------- |
|
args : dict |
|
Both keys and values are strings. The dictionary keys and values are: |
|
* model_config: A JSON string containing the model configuration |
|
* model_instance_kind: A string containing model instance kind |
|
* model_instance_device_id: A string containing model instance |
|
device ID |
|
* model_repository: Model repository path |
|
* model_version: Model version |
|
* model_name: Model name |
|
""" |
|
|
|
print(args) |
|
|
|
|
|
self.model_config = json.loads(args['model_config']) |
|
weight_dtype = torch.float16 |
|
|
|
|
|
self.controlnet = ControlNetModel.from_pretrained("/nvme/shared/huggingface_hub/models/controlnet-openpose-sdxl-1.0", torch_dtype=weight_dtype) |
|
self.controlnet = self.controlnet.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
self.tpose_image = load_image('/nvme/liuwenran/repos/magicmaker2-image-generation/data/t-pose.jpg') |
|
|
|
|
|
anime_ckpt_dir = '/nvme/shared/civitai_models/ckpts/models--gsdf--CounterfeitXL/snapshots/4708675873bd09833aabc3fd4cb2de5fcd1726ac' |
|
self.pipeline_anime = StableDiffusionXLPipeline.from_pretrained( |
|
anime_ckpt_dir, torch_dtype=weight_dtype |
|
) |
|
self.pipeline_anime = self.pipeline_anime.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
|
|
realistic_ckpt_dir = '/nvme/shared/civitai_models/ckpt_save_pretrained/copaxTimelessxlSDXL1_v8' |
|
self.pipeline_realistic = StableDiffusionXLPipeline.from_pretrained( |
|
realistic_ckpt_dir, torch_dtype=weight_dtype |
|
) |
|
self.pipeline_realistic = self.pipeline_realistic.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
|
|
dim3_ckpt_dir = '/nvme/shared/civitai_models/ckpt_save_pretrained/protovisionXLHighFidelity3D_release0630Bakedvae' |
|
self.pipeline_oil_painting = StableDiffusionXLPipeline.from_pretrained( |
|
dim3_ckpt_dir, torch_dtype=weight_dtype |
|
) |
|
oil_painting_lora_dir = '/nvme/shared/civitai_models/loras/ClassipeintXL1.9.safetensors' |
|
self.pipeline_oil_painting.load_lora_weights(oil_painting_lora_dir) |
|
self.pipeline_oil_painting = self.pipeline_oil_painting.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
|
|
|
|
pretrained_model_name_or_path = '/nvme/shared/huggingface_hub/huggingface/hub/models--stabilityai--stable-diffusion-xl-base-1.0/snapshots/76d28af79639c28a79fa5c6c6468febd3490a37e' |
|
|
|
vae_path = '/nvme/shared/huggingface_hub/huggingface/hub/models--madebyollin--sdxl-vae-fp16-fix/snapshots/4df413ca49271c25289a6482ab97a433f8117d15' |
|
vae = AutoencoderKL.from_pretrained( |
|
vae_path, |
|
torch_dtype=weight_dtype, |
|
) |
|
|
|
|
|
guofeng_lora_dir = '/nvme/shared/civitai_models/loras/minimalism.safetensors' |
|
self.pipeline_guofeng = StableDiffusionXLPipeline.from_pretrained( |
|
pretrained_model_name_or_path, vae=vae, torch_dtype=weight_dtype |
|
) |
|
self.pipeline_guofeng.load_lora_weights(guofeng_lora_dir) |
|
self.pipeline_guofeng = self.pipeline_guofeng.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
|
|
manghe_lora_dir = '/nvme/shared/civitai_models/loras/mengwa.safetensors' |
|
self.pipeline_manghe = StableDiffusionXLPipeline.from_pretrained( |
|
pretrained_model_name_or_path, vae=vae, torch_dtype=weight_dtype |
|
) |
|
self.pipeline_manghe.load_lora_weights(manghe_lora_dir) |
|
self.pipeline_manghe = self.pipeline_manghe.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
self.ratio_dict = { |
|
0: (1024, 768), |
|
1: (800, 800), |
|
2: (600, 800), |
|
3: (1024, 576), |
|
4: (448, 800), |
|
5: (1024, 680), |
|
6: (528, 800) |
|
} |
|
|
|
self.tpose_image_ratio = prepare_tpose_image(self.tpose_image) |
|
|
|
sd15_dir = '/nvme/shared/stable-diffusion-v1-5' |
|
self.sd15 = StableDiffusionPipeline.from_pretrained(sd15_dir) |
|
self.sd15 = self.sd15.to(f"cuda:{args['model_instance_device_id']}") |
|
|
|
|
|
def execute(self, requests): |
|
"""`execute` must be implemented in every Python model. `execute` |
|
function receives a list of pb_utils.InferenceRequest as the only |
|
argument. This function is called when an inference is requested |
|
for this model. Depending on the batching configuration (e.g. Dynamic |
|
Batching) used, `requests` may contain multiple requests. Every |
|
Python model, must create one pb_utils.InferenceResponse for every |
|
pb_utils.InferenceRequest in `requests`. If there is an error, you can |
|
set the error argument when creating a pb_utils.InferenceResponse. |
|
Parameters |
|
---------- |
|
requests : list |
|
A list of pb_utils.InferenceRequest |
|
Returns |
|
------- |
|
list |
|
A list of pb_utils.InferenceResponse. The length of this list must |
|
be the same as `requests` |
|
""" |
|
|
|
responses = [] |
|
|
|
|
|
|
|
for request in requests: |
|
|
|
|
|
prompt = pb_utils.get_input_tensor_by_name(request, 'PROMPT').as_numpy() |
|
prompt = prompt.item().decode('utf-8') |
|
|
|
style = pb_utils.get_input_tensor_by_name(request,'STYLE').as_numpy() |
|
style = style.item().decode('utf-8') |
|
|
|
ref_img = pb_utils.get_input_tensor_by_name(request,'REFIMAGE').as_numpy() |
|
tpose = pb_utils.get_input_tensor_by_name(request,'TPOSE').as_numpy() |
|
ratio = pb_utils.get_input_tensor_by_name(request,'RATIO').as_numpy() |
|
|
|
print(f"prompt:{prompt} style:{style} ref_img:{ref_img.shape} tpose:{tpose} ratio:{ratio}") |
|
|
|
tpose = tpose[0] |
|
pipeline_infer = self.pipeline_anime |
|
|
|
if style == 'manghe': |
|
pipeline_infer = self.pipeline_manghe |
|
prompt = 'chibi,' + prompt |
|
elif style == 'guofeng': |
|
pipeline_infer = self.pipeline_guofeng |
|
prompt = 'minimalist style, Flat illustration, Chinese style,' + prompt |
|
elif style == 'xieshi': |
|
pipeline_infer = self.pipeline_realistic |
|
elif style == 'youhua': |
|
pipeline_infer = self.pipeline_oil_painting |
|
prompt = 'oil painting,' + prompt |
|
elif style == 'chahua': |
|
pipeline_infer = self.pipeline_realistic |
|
prompt = 'sketch, sketch painting,' + prompt |
|
|
|
prompt_to_append = ', best quality, extremely detailed, perfect, 8k, masterpeice' |
|
prompt = prompt + prompt_to_append |
|
|
|
negative_prompt = 'nude' |
|
|
|
if ref_img.shape != (1,1,3): |
|
if tpose: |
|
pipeline_infer = StableDiffusionXLControlNetImg2ImgPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, |
|
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, self.controlnet, pipeline_infer.scheduler) |
|
else: |
|
pipeline_infer = StableDiffusionXLImg2ImgPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, |
|
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, pipeline_infer.scheduler) |
|
else: |
|
if tpose: |
|
pipeline_infer = StableDiffusionXLControlNetPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, |
|
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, self.controlnet, pipeline_infer.scheduler) |
|
else: |
|
pipeline_infer = StableDiffusionXLPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, |
|
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, pipeline_infer.scheduler) |
|
|
|
ratio_type = ratio[0] |
|
width, height = self.ratio_dict[ratio_type] |
|
|
|
controlnet_conditioning_scale = 1.0 |
|
|
|
if ref_img.shape != (1, 1, 3): |
|
init_image = Image.fromarray(ref_img) |
|
if tpose: |
|
image = pipeline_infer(prompt, negative_prompt=negative_prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, |
|
image=init_image.resize((width, height)), |
|
control_image=self.tpose_image_ratio[ratio_type], strength=0.5).images[0] |
|
else: |
|
image = pipeline_infer(prompt, negative_prompt=negative_prompt, image=init_image, width=width, height=height, strength=0.5).images[0] |
|
|
|
else: |
|
if tpose: |
|
image = pipeline_infer(prompt, negative_prompt=negative_prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, |
|
image=self.tpose_image_ratio[ratio_type]).images[0] |
|
else: |
|
image = pipeline_infer(prompt, negative_prompt=negative_prompt, num_inference_steps=25, width=width, height=height).images[0] |
|
|
|
image_np = np.array(image).astype(np.float32) / 255.0 |
|
image_pt = torch.from_numpy(image_np.transpose(2, 0, 1)).unsqueeze(0) |
|
image_pt = image_pt.to('cuda') |
|
check_res, nsfw = self.sd15.run_safety_checker(image_pt, 'cuda', torch.float32) |
|
if nsfw[0]: |
|
image = Image.new("RGB", image.size, (0, 0, 0)) |
|
|
|
image = np.array(image).astype(np.uint8) |
|
print(f"final result: {image.shape}, [{np.min(image)}-{np.max(image)}]") |
|
|
|
|
|
|
|
out_tensor = pb_utils.Tensor('OUTPUT', image) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
inference_response = pb_utils.InferenceResponse( |
|
output_tensors=[out_tensor]) |
|
responses.append(inference_response) |
|
|
|
|
|
|
|
|
|
return responses |
|
|
|
def finalize(self): |
|
"""`finalize` is called only once when the model is being unloaded. |
|
|
|
Implementing `finalize` function is optional. This function allows the |
|
model to perform any necessary clean ups before exit. |
|
""" |
|
print('Cleaning up...') |
|
|