sd / handler.py
zhengqilin
handler.py
d2dca8a
# inference handler for huggingface
import os
import sys
import time
import importlib
import signal
import re
from typing import Dict, List, Any
# from fastapi import FastAPI
# from fastapi.middleware.cors import CORSMiddleware
# from fastapi.middleware.gzip import GZipMiddleware
from packaging import version
import logging
logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage())
from modules import errors
from modules.call_queue import wrap_queued_call, queue_lock, wrap_gradio_gpu_call
import torch
# Truncate version number of nightly/local build of PyTorch to not cause exceptions with CodeFormer or Safetensors
if ".dev" in torch.__version__ or "+git" in torch.__version__:
torch.__long_version__ = torch.__version__
torch.__version__ = re.search(r'[\d.]+[\d]', torch.__version__).group(0)
from modules import shared, devices, ui_tempdir
import modules.codeformer_model as codeformer
import modules.face_restoration
import modules.gfpgan_model as gfpgan
import modules.img2img
import modules.lowvram
import modules.paths
import modules.scripts
import modules.sd_hijack
import modules.sd_models
import modules.sd_vae
import modules.txt2img
import modules.script_callbacks
import modules.textual_inversion.textual_inversion
import modules.progress
import modules.ui
from modules import modelloader
from modules.shared import cmd_opts, opts
import modules.hypernetworks.hypernetwork
from modules.processing import StableDiffusionProcessingTxt2Img, StableDiffusionProcessingImg2Img, process_images
import base64
import io
from fastapi import HTTPException
from io import BytesIO
import piexif
import piexif.helper
from PIL import PngImagePlugin,Image
def initialize():
# check_versions()
# extensions.list_extensions()
# localization.list_localizations(cmd_opts.localizations_dir)
# if cmd_opts.ui_debug_mode:
# shared.sd_upscalers = upscaler.UpscalerLanczos().scalers
# modules.scripts.load_scripts()
# return
modelloader.cleanup_models()
modules.sd_models.setup_model()
codeformer.setup_model(cmd_opts.codeformer_models_path)
gfpgan.setup_model(cmd_opts.gfpgan_models_path)
modelloader.list_builtin_upscalers()
# modules.scripts.load_scripts()
modelloader.load_upscalers()
modules.sd_vae.refresh_vae_list()
# modules.textual_inversion.textual_inversion.list_textual_inversion_templates()
try:
modules.sd_models.load_model()
except Exception as e:
errors.display(e, "loading stable diffusion model")
print("", file=sys.stderr)
print("Stable diffusion model failed to load, exiting", file=sys.stderr)
exit(1)
shared.opts.data["sd_model_checkpoint"] = shared.sd_model.sd_checkpoint_info.title
shared.opts.onchange("sd_model_checkpoint", wrap_queued_call(lambda: modules.sd_models.reload_model_weights()))
shared.opts.onchange("sd_vae", wrap_queued_call(lambda: modules.sd_vae.reload_vae_weights()), call=False)
shared.opts.onchange("sd_vae_as_default", wrap_queued_call(lambda: modules.sd_vae.reload_vae_weights()), call=False)
shared.opts.onchange("temp_dir", ui_tempdir.on_tmpdir_changed)
# shared.reload_hypernetworks()
# ui_extra_networks.intialize()
# ui_extra_networks.register_page(ui_extra_networks_textual_inversion.ExtraNetworksPageTextualInversion())
# ui_extra_networks.register_page(ui_extra_networks_hypernets.ExtraNetworksPageHypernetworks())
# ui_extra_networks.register_page(ui_extra_networks_checkpoints.ExtraNetworksPageCheckpoints())
# extra_networks.initialize()
# extra_networks.register_extra_network(extra_networks_hypernet.ExtraNetworkHypernet())
# if cmd_opts.tls_keyfile is not None and cmd_opts.tls_keyfile is not None:
# try:
# if not os.path.exists(cmd_opts.tls_keyfile):
# print("Invalid path to TLS keyfile given")
# if not os.path.exists(cmd_opts.tls_certfile):
# print(f"Invalid path to TLS certfile: '{cmd_opts.tls_certfile}'")
# except TypeError:
# cmd_opts.tls_keyfile = cmd_opts.tls_certfile = None
# print("TLS setup invalid, running webui without TLS")
# else:
# print("Running with TLS")
# make the program just exit at ctrl+c without waiting for anything
def sigint_handler(sig, frame):
print(f'Interrupted with signal {sig} in {frame}')
os._exit(0)
signal.signal(signal.SIGINT, sigint_handler)
class EndpointHandler():
def __init__(self, path=""):
# Preload all the elements you are going to need at inference.
# pseudo:
# self.model= load_model(path)
initialize()
self.shared = shared
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
data args:
inputs (:obj: `str` | `PIL.Image` | `np.array`)
kwargs
Return:
A :obj:`list` | `dict`: will be serialized and returned
"""
txt2img_args = {
"do_not_save_samples": True,
"do_not_save_grid": True,
"outpath_samples": "./output",
"prompt": "lora:koreanDollLikeness_v15:0.66, best quality, ultra high res, (photorealistic:1.4), 1girl, beige sweater, black choker, smile, laughing, bare shoulders, solo focus, ((full body), (brown hair:1), looking at viewer",
"negative_prompt": "paintings, sketches, (worst quality:2), (low quality:2), (normal quality:2), lowres, normal quality, ((monochrome)), ((grayscale)), skin spots, acnes, skin blemishes, age spot, glans, (ugly:1.331), (duplicate:1.331), (morbid:1.21), (mutilated:1.21), (tranny:1.331), mutated hands, (poorly drawn hands:1.331), blurry, 3hands,4fingers,3arms, bad anatomy, missing fingers, extra digit, fewer digits, cropped, jpeg artifacts,poorly drawn face,mutation,deformed",
"sampler_name": "DPM++ SDE Karras",
"steps": 20, # 25
"cfg_scale": 8,
"width": 512,
"height": 768,
"seed": -1,
}
img2img_args = {
"init_images": ["data:image/png;base64,"],
"resize_mode": 0,
"denoising_strength": 0.75,
"image_cfg_scale": 0,
"mask_blur": 4,
"inpainting_fill": 0,
"inpaint_full_res": 1,
"inpaint_full_res_padding": 0,
"inpainting_mask_invert": 0,
"initial_noise_multiplier": 0,
"prompt": "lora:koreanDollLikeness_v15:0.66, best quality, ultra high res, (photorealistic:1.4), 1girl, beige sweater, black choker, smile, laughing, bare shoulders, solo focus, ((full body), (brown hair:1), looking at viewer",
"styles": [],
"seed": -1,
"subseed": -1,
"subseed_strength": 0,
"seed_resize_from_h": -1,
"seed_resize_from_w": -1,
"sampler_name": "Euler a",
"batch_size": 1,
"n_iter": 1,
"steps": 50,
"cfg_scale": 7,
"width": 512,
"height": 512,
"restore_faces": 0,
"tiling": 0,
"negative_prompt": "paintings, sketches, (worst quality:2), (low quality:2), (normal quality:2), lowres, normal quality, ((monochrome)), ((grayscale)), skin spots, acnes, skin blemishes, age spot, glans, (ugly:1.331), (duplicate:1.331), (morbid:1.21), (mutilated:1.21), (tranny:1.331), mutated hands, (poorly drawn hands:1.331), blurry, 3hands,4fingers,3arms, bad anatomy, missing fingers, extra digit, fewer digits, cropped, jpeg artifacts,poorly drawn face,mutation,deformed",
"eta": 0,
"s_churn": 0,
"s_tmax": 0,
"s_tmin": 0,
"s_noise": 1,
"override_settings": {},
"override_settings_restore_afterwards": 1,
"script_args": [],
"sampler_index": "Euler"
}
p = None
if data["type"] == "txt2img":
if data["inputs"]:
for field in txt2img_args:
if field in data["inputs"].keys():
txt2img_args[field] = data["inputs"][field]
# if "prompt" in data["inputs"].keys():
# prompt = data["inputs"]["prompt"]
# print("get prompt from request: ", prompt)
# args["prompt"] = prompt
p = StableDiffusionProcessingTxt2Img(sd_model=self.shared.sd_model, **txt2img_args)
if data["type"] == "img2img":
if data["inputs"]:
for field in img2img_args:
if field in data["inputs"].keys():
img2img_args[field] = data["inputs"][field]
p = StableDiffusionProcessingImg2Img(sd_model=self.shared.sd_model, **img2img_args)
if p is None:
raise Exception("No processing object created")
processed = process_images(p)
single_image_b64 = encode_pil_to_base64(processed.images[0]).decode('utf-8')
return {
"img_data": single_image_b64,
"parameters": processed.images[0].info.get('parameters', ""),
}
def manual_hack():
initialize()
args = {
# todo: don't output res
"outpath_samples": "C:\\Users\\wolvz\\Desktop",
"prompt": "lora:koreanDollLikeness_v15:0.66, best quality, ultra high res, (photorealistic:1.4), 1girl, beige sweater, black choker, smile, laughing, bare shoulders, solo focus, ((full body), (brown hair:1), looking at viewer",
"negative_prompt": "paintings, sketches, (worst quality:2), (low quality:2), (normal quality:2), lowres, normal quality, ((monochrome)), ((grayscale)), skin spots, acnes, skin blemishes, age spot, glans",
"sampler_name": "DPM++ SDE Karras",
"steps": 20, # 25
"cfg_scale": 8,
"width": 512,
"height": 768,
"seed": -1,
}
p = StableDiffusionProcessingTxt2Img(sd_model=shared.sd_model, **args)
processed = process_images(p)
def decode_base64_to_image(encoding):
if encoding.startswith("data:image/"):
encoding = encoding.split(";")[1].split(",")[1]
try:
image = Image.open(BytesIO(base64.b64decode(encoding)))
return image
except Exception as err:
raise HTTPException(status_code=500, detail="Invalid encoded image")
def encode_pil_to_base64(image):
with io.BytesIO() as output_bytes:
if opts.samples_format.lower() == 'png':
use_metadata = False
metadata = PngImagePlugin.PngInfo()
for key, value in image.info.items():
if isinstance(key, str) and isinstance(value, str):
metadata.add_text(key, value)
use_metadata = True
image.save(output_bytes, format="PNG", pnginfo=(metadata if use_metadata else None), quality=opts.jpeg_quality)
elif opts.samples_format.lower() in ("jpg", "jpeg", "webp"):
parameters = image.info.get('parameters', None)
exif_bytes = piexif.dump({
"Exif": { piexif.ExifIFD.UserComment: piexif.helper.UserComment.dump(parameters or "", encoding="unicode") }
})
if opts.samples_format.lower() in ("jpg", "jpeg"):
image.save(output_bytes, format="JPEG", exif = exif_bytes, quality=opts.jpeg_quality)
else:
image.save(output_bytes, format="WEBP", exif = exif_bytes, quality=opts.jpeg_quality)
else:
raise HTTPException(status_code=500, detail="Invalid image format")
bytes_data = output_bytes.getvalue()
return base64.b64encode(bytes_data)
if __name__ == "__main__":
# manual_hack()
handler = EndpointHandler("./")
res = handler.__call__({})
# print(res)