import os from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import StreamingResponse import torch from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler, DPMSolverSinglestepScheduler from diffusers.pipelines import StableDiffusionInpaintPipeline from huggingface_hub import hf_hub_download import numpy as np import random from PIL import Image import io app = FastAPI() MAX_SEED = np.iinfo(np.int32).max device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Load HF token from environment variable HF_TOKEN = os.getenv("HF_TOKEN") # Dictionary to store loaded pipelines loaded_pipelines = {} # Function to load pipeline dynamically def load_pipeline(model_name: str): if model_name in loaded_pipelines: return loaded_pipelines[model_name] if model_name == "Fluently XL Final": pipe = StableDiffusionXLPipeline.from_single_file( hf_hub_download(repo_id="fluently/Fluently-XL-Final", filename="FluentlyXL-Final.safetensors", token=HF_TOKEN), torch_dtype=torch.float16, use_safetensors=True, ) pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) elif model_name == "Fluently Anime": pipe = StableDiffusionPipeline.from_pretrained( "fluently/Fluently-anime", torch_dtype=torch.float16, use_safetensors=True, ) pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) elif model_name == "Fluently Epic": pipe = StableDiffusionPipeline.from_pretrained( "fluently/Fluently-epic", torch_dtype=torch.float16, use_safetensors=True, ) pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) elif model_name == "Fluently XL v4": pipe = StableDiffusionXLPipeline.from_pretrained( "fluently/Fluently-XL-v4", torch_dtype=torch.float16, use_safetensors=True, ) pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) elif model_name == "Fluently XL v3 Lightning": pipe = StableDiffusionXLPipeline.from_pretrained( "fluently/Fluently-XL-v3-lightning", torch_dtype=torch.float16, use_safetensors=True, ) pipe.scheduler = DPMSolverSinglestepScheduler.from_config(pipe.scheduler.config, use_karras_sigmas=False, timestep_spacing="trailing", lower_order_final=True) elif model_name == "Fluently v4 inpaint": pipe = StableDiffusionInpaintPipeline.from_pretrained( "fluently/Fluently-v4-inpainting", torch_dtype=torch.float16, use_safetensors=True, ) else: raise ValueError(f"Unknown model: {model_name}") pipe.to(device) loaded_pipelines[model_name] = pipe return pipe def randomize_seed_fn(seed: int, randomize_seed: bool) -> int: if randomize_seed: seed = random.randint(0, MAX_SEED) return seed @app.post("/generate") async def generate( model: str = Form(...), prompt: str = Form(...), negative_prompt: str = Form(""), use_negative_prompt: bool = Form(False), seed: int = Form(0), width: int = Form(1024), height: int = Form(1024), guidance_scale: float = Form(3), randomize_seed: bool = Form(False), inpaint_image: UploadFile = File(None), mask_image: UploadFile = File(None), blur_factor: float = Form(1.0), strength: float = Form(0.75) ): seed = int(randomize_seed_fn(seed, randomize_seed)) if not use_negative_prompt: negative_prompt = "" inpaint_image_pil = Image.open(io.BytesIO(await inpaint_image.read())) if inpaint_image else None mask_image_pil = Image.open(io.BytesIO(await mask_image.read())) if mask_image else None pipe = load_pipeline(model) if model in ["Fluently v4 inpaint"]: blurred_mask = pipe.mask_processor.blur(mask_image_pil, blur_factor=blur_factor) images = pipe( prompt=prompt, image=inpaint_image_pil, mask_image=blurred_mask, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=30, strength=strength, num_images_per_prompt=1, output_type="pil", ).images else: images = pipe( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=25 if model == "Fluently XL Final" else 30, num_images_per_prompt=1, output_type="pil", ).images img = images[0] img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='PNG') img_byte_arr.seek(0) return StreamingResponse(img_byte_arr, media_type="image/png") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)