Question Answering
NeMo
Akan
biology
xiaoshi's picture
Upload 3 files
2c0a335 verified
# Copyright (c) OpenMMLab. All rights reserved.
import json
import numpy as np
import torch
# triton_python_backend_utils is available in every Triton Python model. You
# need to use this module to create inference requests and responses. It also
# contains some utility functions for extracting information from model_config
# and converting Triton input/output types to numpy types.
import triton_python_backend_utils as pb_utils
from diffusers import (StableDiffusionXLPipeline,
AutoencoderKL,
ControlNetModel,
StableDiffusionXLImg2ImgPipeline,
StableDiffusionXLControlNetPipeline,
StableDiffusionXLControlNetImg2ImgPipeline,
StableDiffusionPipeline)
from diffusers.utils import load_image
from PIL import Image
def prepare_tpose_image(img):
tpose_img_ratio = {}
padding_color = (0, 0, 0)
# img0
padded_image = Image.new(img.mode, (1024, 768), padding_color)
img768 = img.resize((768,768))
padded_image.paste(img768, ((1024 - 768) // 2, 0))
tpose_img_ratio[0] = padded_image
# img1
img800 = img.resize((800, 800))
tpose_img_ratio[1] = img800
# img2
padded_image = Image.new(img.mode, (600, 800), padding_color)
img600 = img.resize((600, 600))
padded_image.paste(img600, (0, (800 - 600) // 2))
tpose_img_ratio[2] = padded_image
# img3
padded_image = Image.new(img.mode, (1024, 576), padding_color)
img576 = img.resize((576, 576))
padded_image.paste(img576, ((1024 - 576) // 2, 0))
tpose_img_ratio[3] = padded_image
# img4
padded_image = Image.new(img.mode, (448, 800), padding_color)
img448 = img.resize((448, 448))
padded_image.paste(img448, (0, (800 - 448) // 2))
tpose_img_ratio[4] = padded_image
# img5
padded_image = Image.new(img.mode, (1024, 680), padding_color)
img576 = img.resize((680, 680))
padded_image.paste(img576, ((1024 - 680) // 2, 0))
tpose_img_ratio[5] = padded_image
# img6
padded_image = Image.new(img.mode, (528, 800), padding_color)
img448 = img.resize((528, 528))
padded_image.paste(img448, (0, (800 - 528) // 2))
tpose_img_ratio[6] = padded_image
return tpose_img_ratio
class TritonPythonModel:
"""Your Python model must use the same class name.
Every Python model that is created must have "TritonPythonModel" as the
class name.
"""
def initialize(self, args):
"""`initialize` is called only once when the model is being loaded.
Implementing `initialize` function is optional. This function allows
the model to initialize any state associated with this model.
Parameters
----------
args : dict
Both keys and values are strings. The dictionary keys and values are:
* model_config: A JSON string containing the model configuration
* model_instance_kind: A string containing model instance kind
* model_instance_device_id: A string containing model instance
device ID
* model_repository: Model repository path
* model_version: Model version
* model_name: Model name
"""
print(args)
# You must parse model_config. JSON string is not parsed here
self.model_config = json.loads(args['model_config'])
weight_dtype = torch.float16
# pose control
self.controlnet = ControlNetModel.from_pretrained("/nvme/shared/huggingface_hub/models/controlnet-openpose-sdxl-1.0", torch_dtype=weight_dtype)
self.controlnet = self.controlnet.to(f"cuda:{args['model_instance_device_id']}")
self.tpose_image = load_image('/nvme/liuwenran/repos/magicmaker2-image-generation/data/t-pose.jpg')
# anime style
anime_ckpt_dir = '/nvme/shared/civitai_models/ckpts/models--gsdf--CounterfeitXL/snapshots/4708675873bd09833aabc3fd4cb2de5fcd1726ac'
self.pipeline_anime = StableDiffusionXLPipeline.from_pretrained(
anime_ckpt_dir, torch_dtype=weight_dtype
)
self.pipeline_anime = self.pipeline_anime.to(f"cuda:{args['model_instance_device_id']}")
# realistic style
realistic_ckpt_dir = '/nvme/shared/civitai_models/ckpt_save_pretrained/copaxTimelessxlSDXL1_v8'
self.pipeline_realistic = StableDiffusionXLPipeline.from_pretrained(
realistic_ckpt_dir, torch_dtype=weight_dtype
)
self.pipeline_realistic = self.pipeline_realistic.to(f"cuda:{args['model_instance_device_id']}")
# dim3 for oil painting style and sketch
dim3_ckpt_dir = '/nvme/shared/civitai_models/ckpt_save_pretrained/protovisionXLHighFidelity3D_release0630Bakedvae'
self.pipeline_oil_painting = StableDiffusionXLPipeline.from_pretrained(
dim3_ckpt_dir, torch_dtype=weight_dtype
)
oil_painting_lora_dir = '/nvme/shared/civitai_models/loras/ClassipeintXL1.9.safetensors'
self.pipeline_oil_painting.load_lora_weights(oil_painting_lora_dir)
self.pipeline_oil_painting = self.pipeline_oil_painting.to(f"cuda:{args['model_instance_device_id']}")
# sd xl base
# pretrained_model_name_or_path = "stabilityai/stable-diffusion-xl-base-1.0"
pretrained_model_name_or_path = '/nvme/shared/huggingface_hub/huggingface/hub/models--stabilityai--stable-diffusion-xl-base-1.0/snapshots/76d28af79639c28a79fa5c6c6468febd3490a37e'
# vae_path = "madebyollin/sdxl-vae-fp16-fix"
vae_path = '/nvme/shared/huggingface_hub/huggingface/hub/models--madebyollin--sdxl-vae-fp16-fix/snapshots/4df413ca49271c25289a6482ab97a433f8117d15'
vae = AutoencoderKL.from_pretrained(
vae_path,
torch_dtype=weight_dtype,
)
# guofeng style
guofeng_lora_dir = '/nvme/shared/civitai_models/loras/minimalism.safetensors'
self.pipeline_guofeng = StableDiffusionXLPipeline.from_pretrained(
pretrained_model_name_or_path, vae=vae, torch_dtype=weight_dtype
)
self.pipeline_guofeng.load_lora_weights(guofeng_lora_dir)
self.pipeline_guofeng = self.pipeline_guofeng.to(f"cuda:{args['model_instance_device_id']}")
# manghe style
manghe_lora_dir = '/nvme/shared/civitai_models/loras/mengwa.safetensors'
self.pipeline_manghe = StableDiffusionXLPipeline.from_pretrained(
pretrained_model_name_or_path, vae=vae, torch_dtype=weight_dtype
)
self.pipeline_manghe.load_lora_weights(manghe_lora_dir)
self.pipeline_manghe = self.pipeline_manghe.to(f"cuda:{args['model_instance_device_id']}")
self.ratio_dict = {
0: (1024, 768),
1: (800, 800),
2: (600, 800),
3: (1024, 576),
4: (448, 800),
5: (1024, 680),
6: (528, 800)
}
self.tpose_image_ratio = prepare_tpose_image(self.tpose_image)
sd15_dir = '/nvme/shared/stable-diffusion-v1-5'
self.sd15 = StableDiffusionPipeline.from_pretrained(sd15_dir)
self.sd15 = self.sd15.to(f"cuda:{args['model_instance_device_id']}")
def execute(self, requests):
"""`execute` must be implemented in every Python model. `execute`
function receives a list of pb_utils.InferenceRequest as the only
argument. This function is called when an inference is requested
for this model. Depending on the batching configuration (e.g. Dynamic
Batching) used, `requests` may contain multiple requests. Every
Python model, must create one pb_utils.InferenceResponse for every
pb_utils.InferenceRequest in `requests`. If there is an error, you can
set the error argument when creating a pb_utils.InferenceResponse.
Parameters
----------
requests : list
A list of pb_utils.InferenceRequest
Returns
-------
list
A list of pb_utils.InferenceResponse. The length of this list must
be the same as `requests`
"""
responses = []
# Every Python backend must iterate over everyone of the requests
# and create a pb_utils.InferenceResponse for each of them.
for request in requests:
# Get INPUT
prompt = pb_utils.get_input_tensor_by_name(request, 'PROMPT').as_numpy()
prompt = prompt.item().decode('utf-8')
style = pb_utils.get_input_tensor_by_name(request,'STYLE').as_numpy()
style = style.item().decode('utf-8')
ref_img = pb_utils.get_input_tensor_by_name(request,'REFIMAGE').as_numpy()
tpose = pb_utils.get_input_tensor_by_name(request,'TPOSE').as_numpy()
ratio = pb_utils.get_input_tensor_by_name(request,'RATIO').as_numpy()
print(f"prompt:{prompt} style:{style} ref_img:{ref_img.shape} tpose:{tpose} ratio:{ratio}")
tpose = tpose[0]
pipeline_infer = self.pipeline_anime
# load lora
if style == 'manghe':
pipeline_infer = self.pipeline_manghe
prompt = 'chibi,' + prompt
elif style == 'guofeng':
pipeline_infer = self.pipeline_guofeng
prompt = 'minimalist style, Flat illustration, Chinese style,' + prompt
elif style == 'xieshi':
pipeline_infer = self.pipeline_realistic
elif style == 'youhua':
pipeline_infer = self.pipeline_oil_painting
prompt = 'oil painting,' + prompt
elif style == 'chahua':
pipeline_infer = self.pipeline_realistic
prompt = 'sketch, sketch painting,' + prompt
prompt_to_append = ', best quality, extremely detailed, perfect, 8k, masterpeice'
prompt = prompt + prompt_to_append
negative_prompt = 'nude'
# use img2img pipeline to infer ref img
if ref_img.shape != (1,1,3):
if tpose:
pipeline_infer = StableDiffusionXLControlNetImg2ImgPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2,
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, self.controlnet, pipeline_infer.scheduler)
else:
pipeline_infer = StableDiffusionXLImg2ImgPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2,
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, pipeline_infer.scheduler)
else:
if tpose:
pipeline_infer = StableDiffusionXLControlNetPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2,
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, self.controlnet, pipeline_infer.scheduler)
else:
pipeline_infer = StableDiffusionXLPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2,
pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, pipeline_infer.scheduler)
ratio_type = ratio[0]
width, height = self.ratio_dict[ratio_type]
controlnet_conditioning_scale = 1.0
if ref_img.shape != (1, 1, 3):
init_image = Image.fromarray(ref_img)
if tpose:
image = pipeline_infer(prompt, negative_prompt=negative_prompt, controlnet_conditioning_scale=controlnet_conditioning_scale,
image=init_image.resize((width, height)),
control_image=self.tpose_image_ratio[ratio_type], strength=0.5).images[0]
else:
image = pipeline_infer(prompt, negative_prompt=negative_prompt, image=init_image, width=width, height=height, strength=0.5).images[0]
else:
if tpose:
image = pipeline_infer(prompt, negative_prompt=negative_prompt, controlnet_conditioning_scale=controlnet_conditioning_scale,
image=self.tpose_image_ratio[ratio_type]).images[0]
else:
image = pipeline_infer(prompt, negative_prompt=negative_prompt, num_inference_steps=25, width=width, height=height).images[0]
image_np = np.array(image).astype(np.float32) / 255.0
image_pt = torch.from_numpy(image_np.transpose(2, 0, 1)).unsqueeze(0)
image_pt = image_pt.to('cuda')
check_res, nsfw = self.sd15.run_safety_checker(image_pt, 'cuda', torch.float32)
if nsfw[0]:
image = Image.new("RGB", image.size, (0, 0, 0))
image = np.array(image).astype(np.uint8)
print(f"final result: {image.shape}, [{np.min(image)}-{np.max(image)}]")
# Create output tensors. You need pb_utils.Tensor
# objects to create pb_utils.InferenceResponse.
out_tensor = pb_utils.Tensor('OUTPUT', image)
# Create InferenceResponse. You can set an error here in case
# there was a problem with handling this inference request.
# Below is an example of how you can set errors in inference
# response:
#
# pb_utils.InferenceResponse(
# output_tensors=..., TritonError("An error occurred"))
inference_response = pb_utils.InferenceResponse(
output_tensors=[out_tensor])
responses.append(inference_response)
# You should return a list of pb_utils.InferenceResponse. Length
# of this list must match the length of `requests` list.
return responses
def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is optional. This function allows the
model to perform any necessary clean ups before exit.
"""
print('Cleaning up...')