Spaces:
Build error
Build error
import json | |
import os | |
import pathlib | |
import time | |
from pathlib import Path | |
from shutil import rmtree | |
import anvil.server | |
import anvil.media | |
import dotenv | |
import gradio as gr | |
import requests | |
from download import download_generator, caption_generator | |
dotenv.load_dotenv() | |
def call_gradio_api(api_name='test_api', data=()): | |
port = os.environ.get('SERVER_PORT', 8111) | |
gradio_base_url = os.environ.get('GRADIO_URL', f'http://127.0.0.1:{port}/api/') | |
gradio_url = f"{gradio_base_url}{api_name}" | |
payload = json.dumps({"data": data}) | |
headers = { | |
'accept': 'application/json', | |
'Content-Type': 'application/json' | |
} | |
print(f"Calling {gradio_url} with {payload}") | |
resp = requests.request("POST", gradio_url, headers=headers, data=payload) | |
print(f"Finished calling gradio API with result") | |
print(resp.text) | |
if api_name == 'caption': | |
return json.loads(resp.text) | |
if api_name == 'transcribe_translate': | |
return json.loads(resp.text) | |
# data = resp.json()['data'][0] | |
# data = json.loads(data) | |
# return {"status": data['status'], "captions": data['captions'], "language": data['language']} | |
data = resp.json()['data'][0] | |
if data: | |
video_path = data[0] | |
translated_video = anvil.media.from_file(video_path) | |
return {'translated_video': translated_video, | |
'log': data[1], | |
'text': data[2], | |
'language': data[3], | |
'duration': resp.json()['duration']} | |
def remote_download(url): | |
print(f"remote_download: Downloading {url}") | |
final_response = '' | |
subbed_video_media = None | |
whisper_result = None | |
for response in download_generator(url): | |
if 'whisper_result' in response: | |
whisper_result = response.get('whisper_result') | |
final_response = response['message'] | |
print(final_response) | |
if 'sub_video' in response: | |
subbed_video_media = response['sub_video'] | |
return subbed_video_media, final_response, whisper_result["text"], whisper_result["language"] | |
def test_api(url=''): | |
print(f'Request from Anvil with URL {url}, faking a long ass request that takes 15 seconds') | |
time.sleep(15) | |
# fake a long ass request that takes 15 seconds | |
# TODO: add a video output here to test how events are done | |
# TODO: add an anvil server pingback to show we completed the queue operation | |
return f"I've slept for 15 seconds and now I'm done. " | |
def caption(downloadable_url="", uid="", language="Autodetect", override_model_size=""): | |
""" | |
:param media_id: The twitter media ID object | |
:param user_id_str: The twitter user ID string | |
:param tweet_url: tweet URL can potentially not exist in the future, so we can upload on behalf of the user | |
:return: | |
""" | |
status, whisper_result_captions, detected_language = caption_generator(downloadable_url, uid, language, override_model_size) | |
anvil.server.launch_background_task('add_captions_to_video', uid, whisper_result_captions) | |
return {'status': status, 'message': 'started a background process to upload subtitles to {uid}' } | |
def transcribe_translate(downloadable_url="", uid="", language="Autodetect", override_model_size=""): | |
""" | |
:param media_id: The twitter media ID object | |
:param user_id_str: The twitter user ID string | |
:param tweet_url: tweet URL can potentially not exist in the future, so we can upload on behalf of the user | |
:return: | |
""" | |
status, whisper_result_captions, detected_language = caption_generator(downloadable_url, uid, language, override_model_size) | |
return json.dumps({"status": status, "captions": whisper_result_captions, "language": detected_language}) | |
def render_api_elements(url_input, download_status, output_text, sub_video, output_file): | |
with gr.Group(elem_id='fake_ass_group') as api_buttons: | |
# This is a hack to get APIs registered with the blocks interface | |
translate_result = gr.Textbox(visible=False) | |
translate_language = gr.Textbox(visible=False) | |
gr.Button("API", visible=False)\ | |
.click(api_name='cleanup_output_dir', | |
fn=cleanup_output_dir, queue=True, inputs=[], outputs=[]) | |
gr.Button("API", visible=False)\ | |
.click(api_name='test_api', queue=True, fn=test_api, inputs=[url_input], outputs=[]) | |
gr.Button("remote_download", visible=False)\ | |
.click(api_name='remote_download', queue=True, fn=remote_download, inputs=[url_input], outputs=[download_status, output_text, translate_result, translate_language]) | |
# creating fake elements just make gradio, cause I can't define an API signature like a sane person | |
gr.Button("caption", visible=False)\ | |
.click(api_name='caption', | |
queue=True, | |
fn=caption, | |
inputs=[ | |
gr.Text(label='tweet_url'), | |
gr.Text(label='media_uid'), | |
gr.Text(label='language (optional)'), | |
gr.Dropdown(label='Model Size', choices=['base', 'tiny', 'small', 'medium', 'large']), | |
], | |
outputs=[ | |
gr.Text(label='response_json') | |
]) | |
gr.Button("transcribe_translate", visible=False)\ | |
.click(api_name='transcribe_translate', | |
queue=True, | |
fn=transcribe_translate, | |
inputs=[ | |
gr.Text(label='tweet_url'), | |
gr.Text(label='media_uid'), | |
gr.Text(label='language (optional)'), | |
gr.Dropdown(label='Model Size', choices=['base', 'tiny', 'small', 'medium', 'large']), | |
], | |
outputs=[ | |
gr.Text(label='response_json') | |
]) | |
return api_buttons | |
def cleanup_output_dir(): | |
#make sure we're in the main directory | |
os.chdir(pathlib.Path(__file__).parent.parent.absolute()) | |
#delete the output directory contents | |
for path in Path("output").glob("**/*"): | |
if path.is_file(): | |
path.unlink() | |
elif path.is_dir(): | |
rmtree(path) |