|
import os, json, tempfile, requests, runpod |
|
|
|
import torch |
|
from torchvision import transforms |
|
from transformers import AutoModelForImageSegmentation |
|
from PIL import Image |
|
|
|
def download_file(url, save_dir='/content/input'): |
|
os.makedirs(save_dir, exist_ok=True) |
|
file_name = url.split('/')[-1] |
|
file_path = os.path.join(save_dir, file_name) |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
with open(file_path, 'wb') as file: |
|
file.write(response.content) |
|
return file_path |
|
|
|
with torch.inference_mode(): |
|
torch.set_float32_matmul_precision(["high", "highest"][0]) |
|
birefnet = AutoModelForImageSegmentation.from_pretrained("/content/BiRefNet", trust_remote_code=True).to("cuda") |
|
transform_image = transforms.Compose( |
|
[ |
|
transforms.Resize((1024, 1024)), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
|
] |
|
) |
|
|
|
@torch.inference_mode() |
|
def generate(input): |
|
values = input["input"] |
|
|
|
input_image_url = values['input_image_url'] |
|
input_image = download_file(input_image_url) |
|
|
|
input_image = Image.open(input_image) |
|
image_size = input_image.size |
|
input_images = transform_image(input_image).unsqueeze(0).to("cuda") |
|
preds = birefnet(input_images)[-1].sigmoid().cpu() |
|
pred = preds[0].squeeze() |
|
pred_pil = transforms.ToPILImage()(pred) |
|
mask = pred_pil.resize(image_size) |
|
input_image.putalpha(mask) |
|
input_image.save("/content/birefnet_tost.png") |
|
|
|
result = "/content/birefnet_tost.png" |
|
try: |
|
notify_uri = values['notify_uri'] |
|
del values['notify_uri'] |
|
notify_token = values['notify_token'] |
|
del values['notify_token'] |
|
discord_id = values['discord_id'] |
|
del values['discord_id'] |
|
if(discord_id == "discord_id"): |
|
discord_id = os.getenv('com_camenduru_discord_id') |
|
discord_channel = values['discord_channel'] |
|
del values['discord_channel'] |
|
if(discord_channel == "discord_channel"): |
|
discord_channel = os.getenv('com_camenduru_discord_channel') |
|
discord_token = values['discord_token'] |
|
del values['discord_token'] |
|
if(discord_token == "discord_token"): |
|
discord_token = os.getenv('com_camenduru_discord_token') |
|
job_id = values['job_id'] |
|
del values['job_id'] |
|
default_filename = os.path.basename(result) |
|
with open(result, "rb") as file: |
|
files = {default_filename: file.read()} |
|
payload = {"content": f"{json.dumps(values)} <@{discord_id}>"} |
|
response = requests.post( |
|
f"https://discord.com/api/v9/channels/{discord_channel}/messages", |
|
data=payload, |
|
headers={"Authorization": f"Bot {discord_token}"}, |
|
files=files |
|
) |
|
response.raise_for_status() |
|
result_url = response.json()['attachments'][0]['url'] |
|
notify_payload = {"jobId": job_id, "result": result_url, "status": "DONE"} |
|
web_notify_uri = os.getenv('com_camenduru_web_notify_uri') |
|
web_notify_token = os.getenv('com_camenduru_web_notify_token') |
|
if(notify_uri == "notify_uri"): |
|
requests.post(web_notify_uri, data=json.dumps(notify_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) |
|
else: |
|
requests.post(web_notify_uri, data=json.dumps(notify_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) |
|
requests.post(notify_uri, data=json.dumps(notify_payload), headers={'Content-Type': 'application/json', "Authorization": notify_token}) |
|
return {"jobId": job_id, "result": result_url, "status": "DONE"} |
|
except Exception as e: |
|
error_payload = {"jobId": job_id, "status": "FAILED"} |
|
try: |
|
if(notify_uri == "notify_uri"): |
|
requests.post(web_notify_uri, data=json.dumps(error_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) |
|
else: |
|
requests.post(web_notify_uri, data=json.dumps(error_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) |
|
requests.post(notify_uri, data=json.dumps(error_payload), headers={'Content-Type': 'application/json', "Authorization": notify_token}) |
|
except: |
|
pass |
|
return {"jobId": job_id, "result": f"FAILED: {str(e)}", "status": "FAILED"} |
|
finally: |
|
if os.path.exists(result): |
|
os.remove(result) |
|
|
|
runpod.serverless.start({"handler": generate}) |
|
|