File size: 4,559 Bytes
8f558df
21fcfe6
29c0cfc
0e31dfe
21fcfe6
8f558df
21fcfe6
a533ef3
425e364
29c0cfc
c35b197
aa31cd8
02558d9
a16627e
57b80dc
29c0cfc
 
 
 
 
a16627e
21fcfe6
5de9904
21fcfe6
29c0cfc
 
 
 
e539fb0
aa31cd8
 
e539fb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa31cd8
 
 
 
 
 
 
 
 
 
 
3549904
 
c35b197
 
393a49a
3549904
 
 
 
aa31cd8
 
29c0cfc
dcf6d05
a16627e
dcf6d05
 
 
a16627e
 
29c0cfc
dcf6d05
13775ff
dcf6d05
 
 
8f558df
29c0cfc
 
 
 
6bf8982
 
 
 
 
 
a16627e
 
29c0cfc
 
 
 
 
 
 
 
 
 
 
 
 
8f558df
 
 
 
 
 
 
 
 
 
a16627e
25d5485
6bf8982
 
29c0cfc
aa31cd8
29c0cfc
25d5485
 
8f558df
25d5485
21fcfe6
29c0cfc
 
 
8f558df
755339c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import gradio as gr
import spaces
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, TextIteratorStreamer
from qwen_vl_utils import process_vision_info
import torch
from PIL import Image
import subprocess
import numpy as np
import os
from threading import Thread
import uuid
import io

# Model and Processor Loading (Done once at startup)
MODEL_ID = "Qwen/Qwen2-VL-7B-Instruct"
model = Qwen2VLForConditionalGeneration.from_pretrained(
    MODEL_ID,
    trust_remote_code=True,
    torch_dtype=torch.float16
).to("cuda").eval()
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)

DESCRIPTION = "[Qwen2-VL-7B Demo](https://huggingface.co/Qwen/Qwen2-VL-7B-Instruct)"

image_extensions = Image.registered_extensions()
video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg", "wav", "gif", "webm", "m4v", "3gp")


def identify_and_save_blob(blob_path):
    """Identifies if the blob is an image or video and saves it accordingly."""
    try:
        with open(blob_path, 'rb') as file:
            blob_content = file.read()
            
            # Try to identify if it's an image
            try:
                Image.open(io.BytesIO(blob_content)).verify()  # Check if it's a valid image
                extension = ".png"  # Default to PNG for saving
                media_type = "image"
            except (IOError, SyntaxError):
                # If it's not a valid image, assume it's a video
                extension = ".mp4"  # Default to MP4 for saving
                media_type = "video"
            
            # Create a unique filename
            filename = f"temp_{uuid.uuid4()}_media{extension}"
            with open(filename, "wb") as f:
                f.write(blob_content)
                
            return filename, media_type
            
    except FileNotFoundError:
        raise ValueError(f"The file {blob_path} was not found.")
    except Exception as e:
        raise ValueError(f"An error occurred while processing the file: {e}")


@spaces.GPU
def qwen_inference(media_input, text_input=None):
    if isinstance(media_input, str):  # If it's a filepath
        media_path = media_input
        if media_path.endswith(tuple([i for i, f in image_extensions.items()])):
            media_type = "image"
        elif media_path.endswith(video_extensions): 
            media_type = "video"
        else:
            try:
                media_path, media_type = identify_and_save_blob(media_input)
                print(media_path, media_type)
            except Exception as e:
                print(e)
                raise ValueError(
                    "Unsupported media type. Please upload an image or video."
                )
        

    print(media_path)

    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": media_type,
                    media_type: media_path,
                    **({"fps": 8.0} if media_type == "video" else {}),
                },
                {"type": "text", "text": text_input},
            ],
        }
    ]

    text = processor.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    image_inputs, video_inputs = process_vision_info(messages)
    inputs = processor(
        text=[text],
        images=image_inputs,
        videos=video_inputs,
        padding=True,
        return_tensors="pt",
    ).to("cuda")

    streamer = TextIteratorStreamer(
        processor, skip_prompt=True, **{"skip_special_tokens": True}
    )
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024)

    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()

    buffer = ""
    for new_text in streamer:
        buffer += new_text
        yield buffer

css = """
  #output {
    height: 500px; 
    overflow: auto; 
    border: 1px solid #ccc; 
  }
"""

with gr.Blocks(css=css) as demo:
    gr.Markdown(DESCRIPTION)

    with gr.Tab(label="Image/Video Input"):
        with gr.Row():
            with gr.Column():
                input_media = gr.File(
                    label="Upload Image or Video", type="filepath" 
                )
                text_input = gr.Textbox(label="Question")
                submit_btn = gr.Button(value="Submit")
            with gr.Column():
                output_text = gr.Textbox(label="Output Text")

        submit_btn.click(
            qwen_inference, [input_media, text_input], [output_text]
        )

demo.launch(debug=True)