|
|
|
import os |
|
import sys |
|
import time |
|
import importlib |
|
import signal |
|
import re |
|
from typing import Dict, List, Any |
|
|
|
|
|
|
|
from packaging import version |
|
|
|
import logging |
|
logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage()) |
|
|
|
from modules import errors |
|
from modules.call_queue import wrap_queued_call, queue_lock, wrap_gradio_gpu_call |
|
|
|
import torch |
|
|
|
|
|
if ".dev" in torch.__version__ or "+git" in torch.__version__: |
|
torch.__long_version__ = torch.__version__ |
|
torch.__version__ = re.search(r'[\d.]+[\d]', torch.__version__).group(0) |
|
|
|
from modules import shared, devices, ui_tempdir |
|
import modules.codeformer_model as codeformer |
|
import modules.face_restoration |
|
import modules.gfpgan_model as gfpgan |
|
import modules.img2img |
|
|
|
import modules.lowvram |
|
import modules.paths |
|
import modules.scripts |
|
import modules.sd_hijack |
|
import modules.sd_models |
|
import modules.sd_vae |
|
import modules.txt2img |
|
import modules.script_callbacks |
|
import modules.textual_inversion.textual_inversion |
|
import modules.progress |
|
|
|
import modules.ui |
|
from modules import modelloader |
|
from modules.shared import cmd_opts, opts |
|
import modules.hypernetworks.hypernetwork |
|
|
|
from modules.processing import StableDiffusionProcessingTxt2Img, StableDiffusionProcessingImg2Img, process_images |
|
import base64 |
|
import io |
|
from fastapi import HTTPException |
|
from io import BytesIO |
|
import piexif |
|
import piexif.helper |
|
from PIL import PngImagePlugin,Image |
|
|
|
|
|
def initialize(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
modelloader.cleanup_models() |
|
modules.sd_models.setup_model() |
|
codeformer.setup_model(cmd_opts.codeformer_models_path) |
|
gfpgan.setup_model(cmd_opts.gfpgan_models_path) |
|
|
|
modelloader.list_builtin_upscalers() |
|
|
|
modelloader.load_upscalers() |
|
|
|
modules.sd_vae.refresh_vae_list() |
|
|
|
|
|
|
|
try: |
|
modules.sd_models.load_model() |
|
except Exception as e: |
|
errors.display(e, "loading stable diffusion model") |
|
print("", file=sys.stderr) |
|
print("Stable diffusion model failed to load, exiting", file=sys.stderr) |
|
exit(1) |
|
|
|
shared.opts.data["sd_model_checkpoint"] = shared.sd_model.sd_checkpoint_info.title |
|
|
|
shared.opts.onchange("sd_model_checkpoint", wrap_queued_call(lambda: modules.sd_models.reload_model_weights())) |
|
shared.opts.onchange("sd_vae", wrap_queued_call(lambda: modules.sd_vae.reload_vae_weights()), call=False) |
|
shared.opts.onchange("sd_vae_as_default", wrap_queued_call(lambda: modules.sd_vae.reload_vae_weights()), call=False) |
|
shared.opts.onchange("temp_dir", ui_tempdir.on_tmpdir_changed) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sigint_handler(sig, frame): |
|
print(f'Interrupted with signal {sig} in {frame}') |
|
os._exit(0) |
|
|
|
signal.signal(signal.SIGINT, sigint_handler) |
|
|
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
|
|
|
|
|
|
initialize() |
|
self.shared = shared |
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
""" |
|
data args: |
|
inputs (:obj: `str` | `PIL.Image` | `np.array`) |
|
kwargs |
|
Return: |
|
A :obj:`list` | `dict`: will be serialized and returned |
|
""" |
|
args = { |
|
"do_not_save_samples": True, |
|
"do_not_save_grid": True, |
|
"outpath_samples": "./output", |
|
"prompt": "lora:koreanDollLikeness_v15:0.66, best quality, ultra high res, (photorealistic:1.4), 1girl, beige sweater, black choker, smile, laughing, bare shoulders, solo focus, ((full body), (brown hair:1), looking at viewer", |
|
"negative_prompt": "paintings, sketches, (worst quality:2), (low quality:2), (normal quality:2), lowres, normal quality, ((monochrome)), ((grayscale)), skin spots, acnes, skin blemishes, age spot, glans, (ugly:1.331), (duplicate:1.331), (morbid:1.21), (mutilated:1.21), (tranny:1.331), mutated hands, (poorly drawn hands:1.331), blurry, 3hands,4fingers,3arms, bad anatomy, missing fingers, extra digit, fewer digits, cropped, jpeg artifacts,poorly drawn face,mutation,deformed", |
|
"sampler_name": "DPM++ SDE Karras", |
|
"steps": 20, |
|
"cfg_scale": 8, |
|
"width": 512, |
|
"height": 768, |
|
"seed": -1, |
|
} |
|
if data["inputs"]: |
|
if "prompt" in data["inputs"].keys(): |
|
prompt = data["inputs"]["prompt"] |
|
print("get prompt from request: ", prompt) |
|
args["prompt"] = prompt |
|
p = StableDiffusionProcessingTxt2Img(sd_model=self.shared.sd_model, **args) |
|
processed = process_images(p) |
|
single_image_b64 = encode_pil_to_base64(processed.images[0]).decode('utf-8') |
|
return { |
|
"img_data": single_image_b64, |
|
"parameters": processed.images[0].info.get('parameters', ""), |
|
} |
|
|
|
|
|
def manual_hack(): |
|
initialize() |
|
args = { |
|
|
|
"outpath_samples": "C:\\Users\\wolvz\\Desktop", |
|
"prompt": "lora:koreanDollLikeness_v15:0.66, best quality, ultra high res, (photorealistic:1.4), 1girl, beige sweater, black choker, smile, laughing, bare shoulders, solo focus, ((full body), (brown hair:1), looking at viewer", |
|
"negative_prompt": "paintings, sketches, (worst quality:2), (low quality:2), (normal quality:2), lowres, normal quality, ((monochrome)), ((grayscale)), skin spots, acnes, skin blemishes, age spot, glans", |
|
"sampler_name": "DPM++ SDE Karras", |
|
"steps": 20, |
|
"cfg_scale": 8, |
|
"width": 512, |
|
"height": 768, |
|
"seed": -1, |
|
} |
|
p = StableDiffusionProcessingTxt2Img(sd_model=shared.sd_model, **args) |
|
processed = process_images(p) |
|
|
|
|
|
def decode_base64_to_image(encoding): |
|
if encoding.startswith("data:image/"): |
|
encoding = encoding.split(";")[1].split(",")[1] |
|
try: |
|
image = Image.open(BytesIO(base64.b64decode(encoding))) |
|
return image |
|
except Exception as err: |
|
raise HTTPException(status_code=500, detail="Invalid encoded image") |
|
|
|
def encode_pil_to_base64(image): |
|
with io.BytesIO() as output_bytes: |
|
|
|
if opts.samples_format.lower() == 'png': |
|
use_metadata = False |
|
metadata = PngImagePlugin.PngInfo() |
|
for key, value in image.info.items(): |
|
if isinstance(key, str) and isinstance(value, str): |
|
metadata.add_text(key, value) |
|
use_metadata = True |
|
image.save(output_bytes, format="PNG", pnginfo=(metadata if use_metadata else None), quality=opts.jpeg_quality) |
|
|
|
elif opts.samples_format.lower() in ("jpg", "jpeg", "webp"): |
|
parameters = image.info.get('parameters', None) |
|
exif_bytes = piexif.dump({ |
|
"Exif": { piexif.ExifIFD.UserComment: piexif.helper.UserComment.dump(parameters or "", encoding="unicode") } |
|
}) |
|
if opts.samples_format.lower() in ("jpg", "jpeg"): |
|
image.save(output_bytes, format="JPEG", exif = exif_bytes, quality=opts.jpeg_quality) |
|
else: |
|
image.save(output_bytes, format="WEBP", exif = exif_bytes, quality=opts.jpeg_quality) |
|
|
|
else: |
|
raise HTTPException(status_code=500, detail="Invalid image format") |
|
|
|
bytes_data = output_bytes.getvalue() |
|
|
|
return base64.b64encode(bytes_data) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
handler = EndpointHandler("./") |
|
res = handler.__call__({}) |
|
|
|
|