|
import os, json, requests, random, runpod |
|
import torch |
|
from diffusers import AutoencoderKLCogVideoX, CogVideoXImageToVideoPipeline, CogVideoXTransformer3DModel |
|
from cogvideox.utils.lora_utils import merge_lora, unmerge_lora |
|
from diffusers.utils import export_to_video, load_image |
|
from transformers import T5EncoderModel, T5Tokenizer |
|
|
|
with torch.inference_mode(): |
|
model_id = "/runpod-volume/model" |
|
|
|
|
|
transformer = CogVideoXTransformer3DModel.from_pretrained(model_id, subfolder="transformer", torch_dtype=torch.float16).to("cuda") |
|
text_encoder = T5EncoderModel.from_pretrained(model_id, subfolder="text_encoder", torch_dtype=torch.float16).to("cuda") |
|
vae = AutoencoderKLCogVideoX.from_pretrained(model_id, subfolder="vae", torch_dtype=torch.float16).to("cuda") |
|
tokenizer = T5Tokenizer.from_pretrained(model_id, subfolder="tokenizer") |
|
|
|
|
|
pipe = CogVideoXImageToVideoPipeline.from_pretrained(model_id, tokenizer=tokenizer, text_encoder=text_encoder, transformer=transformer, vae=vae, torch_dtype=torch.float16).to("cuda") |
|
|
|
lora_path = "/content/shirtlift.safetensors" |
|
lora_weight = 1.0 |
|
|
|
|
|
pipe = merge_lora(pipe, lora_path, lora_weight).to("cuda") |
|
|
|
|
|
|
|
def download_file(url, save_dir, file_name): |
|
os.makedirs(save_dir, exist_ok=True) |
|
original_file_name = url.split('/')[-1] |
|
_, original_file_extension = os.path.splitext(original_file_name) |
|
file_path = os.path.join(save_dir, file_name + original_file_extension) |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
with open(file_path, 'wb') as file: |
|
file.write(response.content) |
|
return file_path |
|
|
|
@torch.inference_mode() |
|
def generate(input): |
|
values = input["input"] |
|
|
|
input_image = values['input_image_check'] |
|
input_image = download_file(url=input_image, save_dir='/content/input', file_name='input_image_tost') |
|
prompt = values['prompt'] |
|
|
|
|
|
|
|
|
|
guidance_scale = 6 |
|
use_dynamic_cfg = True |
|
num_inference_steps = 17 |
|
fps = 9 |
|
|
|
image = load_image(input_image) |
|
video = pipe(image=image, prompt=prompt, guidance_scale=guidance_scale, use_dynamic_cfg=use_dynamic_cfg, num_inference_steps=num_inference_steps).frames[0] |
|
export_to_video(video, "/content/cogvideox_5b_i2v_tost.mp4", fps=fps) |
|
|
|
result = "/content/cogvideox_5b_i2v_tost.mp4" |
|
try: |
|
default_filename = os.path.basename(result) |
|
print("Video saved to grid, uploading to huggingface") |
|
hf_api = HfApi() |
|
repo_id = "meepmoo/h4h4jejdf" |
|
tokenxf = os.getenv("HF_API_TOKEN") |
|
hf_api.upload_file(path_or_fileobj=result,path_in_repo=f"{default_filename}.mp4",repo_id=repo_id,token=tokenxf,repo_type="model") |
|
result_url = f"https://huggingface.co/{repo_id}/blob/main/{default_filename}.mp4" |
|
return {"jobId": job_id, "result": result_url, "status": "DONE"} |
|
except Exception as e: |
|
|
|
return {"jobId": job_id, "result": f"FAILED: {str(e)}", "status": "FAILED"} |
|
|
|
finally: |
|
if os.path.exists(result): |
|
os.remove(result) |
|
if os.path.exists(input_image): |
|
os.remove(input_image) |
|
|
|
runpod.serverless.start({"handler": generate}) |
|
|