Spaces:
Paused
Paused
import os | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import StreamingResponse | |
import torch | |
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler, DPMSolverSinglestepScheduler | |
from diffusers.pipelines import StableDiffusionInpaintPipeline | |
from huggingface_hub import hf_hub_download | |
import numpy as np | |
import random | |
from PIL import Image | |
import io | |
app = FastAPI() | |
MAX_SEED = np.iinfo(np.int32).max | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
# Load HF token from environment variable | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
# Dictionary to store loaded pipelines | |
loaded_pipelines = {} | |
# Function to load pipeline dynamically | |
def load_pipeline(model_name: str): | |
if model_name in loaded_pipelines: | |
return loaded_pipelines[model_name] | |
if model_name == "Fluently XL Final": | |
pipe = StableDiffusionXLPipeline.from_single_file( | |
hf_hub_download(repo_id="fluently/Fluently-XL-Final", filename="FluentlyXL-Final.safetensors", token=HF_TOKEN), | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) | |
elif model_name == "Fluently Anime": | |
pipe = StableDiffusionPipeline.from_pretrained( | |
"fluently/Fluently-anime", | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) | |
elif model_name == "Fluently Epic": | |
pipe = StableDiffusionPipeline.from_pretrained( | |
"fluently/Fluently-epic", | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) | |
elif model_name == "Fluently XL v4": | |
pipe = StableDiffusionXLPipeline.from_pretrained( | |
"fluently/Fluently-XL-v4", | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) | |
elif model_name == "Fluently XL v3 Lightning": | |
pipe = StableDiffusionXLPipeline.from_pretrained( | |
"fluently/Fluently-XL-v3-lightning", | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
pipe.scheduler = DPMSolverSinglestepScheduler.from_config(pipe.scheduler.config, use_karras_sigmas=False, timestep_spacing="trailing", lower_order_final=True) | |
elif model_name == "Fluently v4 inpaint": | |
pipe = StableDiffusionInpaintPipeline.from_pretrained( | |
"fluently/Fluently-v4-inpainting", | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
else: | |
raise ValueError(f"Unknown model: {model_name}") | |
pipe.to(device) | |
loaded_pipelines[model_name] = pipe | |
return pipe | |
def randomize_seed_fn(seed: int, randomize_seed: bool) -> int: | |
if randomize_seed: | |
seed = random.randint(0, MAX_SEED) | |
return seed | |
async def generate( | |
model: str = Form(...), | |
prompt: str = Form(...), | |
negative_prompt: str = Form(""), | |
use_negative_prompt: bool = Form(False), | |
seed: int = Form(0), | |
width: int = Form(1024), | |
height: int = Form(1024), | |
guidance_scale: float = Form(3), | |
randomize_seed: bool = Form(False), | |
inpaint_image: UploadFile = File(None), | |
mask_image: UploadFile = File(None), | |
blur_factor: float = Form(1.0), | |
strength: float = Form(0.75) | |
): | |
seed = int(randomize_seed_fn(seed, randomize_seed)) | |
if not use_negative_prompt: | |
negative_prompt = "" | |
inpaint_image_pil = Image.open(io.BytesIO(await inpaint_image.read())) if inpaint_image else None | |
mask_image_pil = Image.open(io.BytesIO(await mask_image.read())) if mask_image else None | |
pipe = load_pipeline(model) | |
if model in ["Fluently v4 inpaint"]: | |
blurred_mask = pipe.mask_processor.blur(mask_image_pil, blur_factor=blur_factor) | |
images = pipe( | |
prompt=prompt, | |
image=inpaint_image_pil, | |
mask_image=blurred_mask, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=30, | |
strength=strength, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
else: | |
images = pipe( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=25 if model == "Fluently XL Final" else 30, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
img = images[0] | |
img_byte_arr = io.BytesIO() | |
img.save(img_byte_arr, format='PNG') | |
img_byte_arr.seek(0) | |
return StreamingResponse(img_byte_arr, media_type="image/png") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |