File size: 3,575 Bytes
c0e5ce3
 
 
 
 
326cf1a
 
c0e5ce3
6784228
94b5388
c0e5ce3
4837465
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0e5ce3
 
 
cbd8427
c0e5ce3
 
 
326cf1a
 
cbd8427
4837465
 
 
 
 
f516724
 
 
c0e5ce3
4837465
326cf1a
 
 
 
5c87c18
326cf1a
 
 
 
 
 
 
5c87c18
326cf1a
5c87c18
c0e5ce3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
326cf1a
c0e5ce3
 
 
 
9ae4002
c0e5ce3
cbd8427
 
d1aeee7
 
 
cbd8427
 
 
 
 
 
 
c0e5ce3
34976a0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import gradio as gr
from prodiapy import Prodia
from PIL import Image
from io import BytesIO
import requests
import random
import os
import base64
import json
import time


class Prodia:
    def __init__(self, api_key=os.getenv("PRODIA_API_KEY"), base=None):
        self.base = base or "https://api.prodia.com/v1"
        self.headers = {
            "X-Prodia-Key": api_key
        }
    
    def faceswap(self, params):
        response = self._post(f"{self.base}/faceswap", params)
        return response.json()
    
    def get_job(self, job_id):
        response = self._get(f"{self.base}/job/{job_id}")
        return response.json()

    def wait(self, job):
        job_result = job

        while job_result['status'] not in ['succeeded', 'failed']:
            time.sleep(0.25)
            job_result = self.get_job(job['job'])

        return job_result

    def _post(self, url, params):
        headers = {
            **self.headers,
            "Content-Type": "application/json"
        }
        response = requests.post(url, headers=headers, data=json.dumps(params))

        if response.status_code != 200:
            raise Exception(f"Bad Prodia Response: {response.status_code}")

        return response

    def _get(self, url):
        response = requests.get(url, headers=self.headers)

        if response.status_code != 200:
            raise Exception(f"Bad Prodia Response: {response.status_code}")

        return response


client = Prodia()


def infer(source, target):
    if source_image is None or target_image is None:
        return

    source_url = upload_image(source)
    target_url = upload_image(target)

    job = client.faceswap({
        "sourceUrl": source_url,
        "targetUrl": target_url
    })
    res = client.wait(job)

    if res['status'] == "failed":
        return

    return res['imageUrl']


def upload_image(file):
    files = {'file': open(file, 'rb')}
    img_id = requests.post(os.getenv("IMAGE_API_1"), files=files).json()['id']

    payload = {
        "content": "",
        "nonce": f"{random.randint(1, 10000000)}H9X42KSEJFNNH",
        "replies": [],
        "attachments": [img_id]
    }
    res = requests.post(os.getenv("IMAGE_API_2"), json=payload, headers={"x-session-token": os.getenv("SESSION_TOKEN")})

    return f"{os.getenv('IMAGE_API_1')}/{img_id}/{res.json()['attachments'][0]['filename']}"


def image_to_base64(image: Image):
    # Convert the image to bytes
    buffered = BytesIO()
    image.save(buffered, format="PNG")  # You can change format to PNG if needed

    # Encode the bytes to base64
    img_str = base64.b64encode(buffered.getvalue())

    return img_str.decode('utf-8')  # Convert bytes to string


with gr.Blocks() as demo:
    with gr.Column():
        gr.HTML("<h1><center>Face Swap</center></h1>")

    with gr.Row():
        with gr.Row():
            source_image = gr.Image(type="filepath", label="Source Image")
            target_image = gr.Image(type="filepath", label="Target Image")
        with gr.Column():
            result = gr.Image()
            run_button = gr.Button("Swap Face", variant="primary")

    gr.Examples(
        examples=[
            ["example1.jpg", "example2.jpg"],
            ["example3.jpg", "example4.jpg"],
            ["example5.jpg", "example6.jpg"]
        ],
        fn=infer,
        inputs=[source_image, target_image],
        outputs=[result]
    )

    run_button.click(fn=infer, inputs=[source_image, target_image], outputs=[result])

demo.queue(max_size=20, api_open=False).launch(show_api=False, max_threads=400)