Spaces:
Build error
Build error
import shutil | |
import sys | |
import time | |
from pathlib import Path | |
import anvil.server | |
import anvil.media | |
from whisper.utils import write_srt, write_vtt | |
from yt_dlp import YoutubeDL | |
from yt_dlp.utils import DownloadError | |
import os | |
import tempfile | |
import json | |
import argparse | |
import whisper | |
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE | |
import ffmpeg | |
from utils.subs import bake_subs, get_srt | |
from utils.utils import get_args | |
original_dir = os.getcwd() | |
output_dir = Path('output') | |
args = get_args() | |
model_size: str = args.get("model", os.environ.get("WHISPER_MODEL", "large")) | |
preload_model: bool = args.get("preload") | |
if preload_model: | |
print("Preloading model") | |
model = whisper.load_model(model_size) | |
def download_generator(url, translate_action=True, source_language='Autodetect', corrected_subtitles=None): | |
# Step 1 : check if video is available | |
yield {"message": f"Checking {url} for videos"} | |
try: | |
meta = check_download(url) | |
# print(json.dumps(meta, indent=2)) | |
# if(meta['duration'] > 159) : | |
# raise Exception("Video is too long, please use videos less than 159 seconds") | |
yield {"message": f"Found video with {meta['duration']} seconds duration from {meta['extractor']}", "meta": meta} | |
tempdir = output_dir/f"{meta['id']}" | |
except Exception as e: | |
yield {"message": f"{e}"} | |
return | |
# Step 2 : Download video and extract audio | |
try: | |
# check if we already have the folder and the main files | |
if(tempdir.is_dir() and (tempdir/f"{meta['id']}.{meta['ext']}").is_file() and (tempdir/f"{meta['id']}.mp3").is_file()): | |
yield {"message": f"Using cached files"} | |
video = str((tempdir/f"{meta['id']}.{meta['ext']}").resolve()) | |
audio = str((tempdir/f"{meta['id']}.mp3").resolve()) | |
else: | |
yield {"message": f"Starting download with URL {url}, this may take a while"} | |
meta, video, audio = download(url, tempdir) | |
yield {"message": f"Downloaded video and extracted audio", "video": video, "audio": audio, "meta": meta} | |
except Exception as e: | |
os.chdir(original_dir) | |
yield {"message": f"{e}"} | |
raise e | |
srt_path = tempdir / f"{meta['id']}.srt" | |
vtt_path = tempdir / f"{meta['id']}.vtt" | |
if not corrected_subtitles: | |
### Step 3 : Transcribe with whisper | |
yield {"message": f"[PLEASE WAIT] Starting whisper transcribe with {meta['id']}.mp3"} | |
try: | |
whisper_result = transcribe(audio, translate_action, source_language) | |
with open(srt_path, "w", encoding="utf-8") as srt: | |
write_srt(whisper_result["segments"], file=srt) | |
with open(vtt_path, "w", encoding="utf-8") as vtt: | |
write_vtt(whisper_result["segments"], file=vtt) | |
whisper_result["srt"] = Path(srt_path).read_text() | |
whisper_result["vtt"] = Path(vtt_path).read_text() | |
yield {"message": f"Transcribe successful", "whisper_result": whisper_result, "meta": meta, "srt_path": srt_path, "vtt_path": vtt_path} | |
except Exception as e: | |
os.chdir(original_dir) | |
yield {"message": f"{e}"} | |
raise e | |
else: | |
### step 3.5 : use corrected subtitles | |
yield {"message": f"Using corrected subtitles"} | |
with open(srt_path, "w", encoding="utf-8") as srt: | |
srt.write(corrected_subtitles) | |
yield {"message": f"Transcribe successful", "srt_path": srt_path, "meta": meta} | |
### Step 4 : Bake subtitles into video with ffmpeg | |
yield {"message": f"[PLEASE WAIT] baking subtitles into video"} | |
try: | |
print('Stating to bake subtitles') | |
subbed_video_path = tempdir / f"{meta['id']}_translated.mp4" | |
fontsdir = Path('fonts') | |
bake_subs(video, subbed_video_path.absolute() , srt_path.absolute(), fontsdir, translate_action) | |
yield {"message": f"Subtitled video ready!", "sub_video": str(subbed_video_path.absolute()), "meta": meta, "vtt_path": vtt_path} | |
except ffmpeg.Error as e: | |
print('stdout:', e.stdout.decode('utf8')) | |
print('stderr:', e.stderr.decode('utf8')) | |
raise e | |
except Exception as e: | |
print('stdout:', e.stdout.decode('utf8')) | |
print('stderr:', e.stderr.decode('utf8')) | |
os.chdir(original_dir) | |
print('error', file=sys.stderr) | |
raise e | |
yield {"message": f"{e}"} | |
def user_uploaded_video_generator(video, translate_action=True, source_language='Autodetect', corrected_subtitles=None): | |
video_name = Path(video).stem | |
# create tempdir | |
tempdir = output_dir / video_name | |
tempdir.mkdir(parents=True, exist_ok=True) | |
# copy video with shutil.copy2 | |
video_path = tempdir / Path(video).name | |
shutil.copy2(video, video_path) | |
yield {"message": f"Extracting audio from {video_name}", "video": video_path} | |
# TODO : extract audio from videos | |
output_audio = tempdir / f"{video_name}.mp3" | |
ffmpeg.input(video_path).output(filename=output_audio).run() | |
yield {"message": f"Got audio from {video_name}", "video": video, "audio": output_audio} | |
# Run whisper on the audio with language unless auto | |
try: | |
audio_file = output_audio | |
print(f"Starting whisper transcribe with {output_audio}") | |
transcribe_whisper_result = transcribe(audio_file, translate_action=False, language='Autodetect', override_model_size=model_size) | |
yield {"message": f"Finished transcription, starting translation to {transcribe_whisper_result['language']}"} | |
detected_language = LANGUAGES[transcribe_whisper_result["language"]] | |
translate_whisper_result = transcribe(audio_file, translate_action=True, language=detected_language, override_model_size=model_size) | |
yield {"message": f"Finished translation to English, preparing subtitle files"} | |
with open(tempdir / f"{video_name}.vtt", "w", encoding="utf-8") as vtt: | |
write_vtt(transcribe_whisper_result['segments'], file=vtt) | |
# yield {"message": f"Created VTT files", "vtt_path": f"{video_name}.vtt", "vtt_en_path": f"{video_name}.en.vtt"} | |
# write_srt(transcribe_whisper_result['segments'], tempdir / f"{video_name}.srt") | |
# write_srt(translate_whisper_result['segments'], tempdir / f"{video_name}_en.srt") | |
# yield {"message": f"Created SRT files", "srt_path": f"{video_name}.srt", "srt_en_path": f"{video_name}.en.srt"} | |
# print(f"Transcribe successful!") | |
except Exception as e: | |
print(f"Could not transcribe file: {e}") | |
return | |
def caption_generator(social_media_url,uid, language="Autodetect", model_size=model_size): | |
with tempfile.TemporaryDirectory() as tempdir: | |
tempdir = Path(tempdir) | |
# try: | |
# print(f"Downloading {social_media_url} ") | |
# meta = check_download(social_media_url) | |
# print(f"Downloaded {meta['id']}.mp3 from {meta['uploader_id']} and url {meta['webpage_url']}") | |
# except Exception as e: | |
# print(f"Could not download file: {e}") | |
# raise | |
try: | |
print(f"Starting audio only download with URL {social_media_url}, this may take a while") | |
meta, audio = download_audio(social_media_url, tempdir, id=uid) | |
print(f"Downloaded video and extracted audio") | |
except Exception as e: | |
print(f"Could not download file: {e}") | |
raise | |
# Run whisper on the audio with language unless auto | |
try: | |
print(f"Starting whisper transcribe with {uid}.mp3") | |
transcribe_whisper_result = transcribe(audio, translate_action=False, language=language, override_model_size=model_size) | |
detected_language = LANGUAGES[transcribe_whisper_result["language"]] | |
print(f"Transcribe successful!, writing files") | |
vtt_path = tempdir / f"{transcribe_whisper_result['language']}.vtt" | |
with open(vtt_path.resolve(), "w", encoding="utf-8") as vtt: | |
write_vtt(transcribe_whisper_result["segments"], file=vtt) | |
whisper_result_captions = [ | |
{ | |
"language_tag": transcribe_whisper_result["language"], | |
"vtt_text": vtt_path.read_text(encoding="utf-8"), | |
}, | |
] | |
if detected_language != "en": | |
print(f"Transcribe successful! Starting translation to English") | |
translate_whisper_result = transcribe(audio, translate_action=True, language=detected_language, override_model_size=model_size) | |
en_vtt_path = tempdir / f"en.vtt" | |
with open(en_vtt_path.resolve(), "w", encoding="utf-8") as en_vtt: | |
write_vtt(translate_whisper_result["segments"], file=en_vtt) | |
print(f"Finished translation to English, preparing subtitle files") | |
whisper_result_captions.append( | |
{ | |
"language_tag": "en", | |
"vtt_text": en_vtt_path.read_text(encoding="utf-8"), | |
} | |
) | |
except Exception as e: | |
print(f"Could not transcribe file: {e}") | |
raise | |
print(f"Finished processing {uid} file, returning results") | |
print(whisper_result_captions) | |
return 'success', whisper_result_captions, detected_language | |
# Run whisper with translation task enabled (and save to different srt file) | |
# Call anvil background task with both files, and both the plain texts | |
def progress_hook(d): | |
if d['status'] == 'downloading': | |
print("downloading " + str(round(float(d['downloaded_bytes']) / float(d['total_bytes']) * 100, 1)) + "%") | |
yield f"{d['_percent_str']} downloaded" | |
if d['status'] == 'finished': | |
filename = d['filename'] | |
print(filename) | |
yield f"Downloaded {filename}" | |
def download(url, tempdir, format="bestvideo[ext=mp4]+bestaudio/best", verbose=False, keepVideo=True, filename="%(id)s.%(ext)s"): | |
try: | |
ydl_opts = { | |
"format": format, | |
"keepvideo": keepVideo, | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
"skip_download": False, | |
"outtmpl": f"{tempdir}/{filename}", | |
"noplaylist": True, | |
"verbose": verbose, | |
"quiet": False, | |
"progress_hooks": [progress_hook], | |
} | |
ydl = YoutubeDL(ydl_opts) | |
meta = ydl.extract_info( | |
url, | |
download=True, | |
) | |
except DownloadError as e: | |
raise e | |
else: | |
audio = tempdir / f"{meta['id']}.mp3" | |
if (keepVideo): | |
video = tempdir / f"{meta['id']}.{meta['ext']}" | |
return meta, str(video.resolve()), str(audio.resolve()) | |
else: | |
return meta, None, str(audio.resolve()) | |
def download_audio(url, tempdir, format="bestaudio/best", verbose=False, id=None): | |
filename = f"{id}.%(ext)s" | |
try: | |
ydl_opts = { | |
"format": format, | |
"keepvideo": False, | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
"skip_download": False, | |
"outtmpl": f"{tempdir}/{filename}", | |
"noplaylist": True, | |
"verbose": verbose, | |
"quiet": False, | |
"progress_hooks": [progress_hook], | |
} | |
ydl = YoutubeDL(ydl_opts) | |
meta = ydl.extract_info( | |
url, | |
download=True, | |
) | |
except DownloadError as e: | |
raise e | |
else: | |
audio = tempdir / f"{id}.mp3" | |
return meta, str(audio.resolve()) | |
def check_download(url): | |
ydl_opts = { | |
"format": "bestvideo[ext=mp4]+bestaudio/best", | |
"skip_download": True, | |
"verbose": False, | |
} | |
ydl = YoutubeDL(ydl_opts) | |
try: | |
meta = ydl.extract_info( | |
url, | |
download=False, | |
) | |
except DownloadError as e: | |
raise e | |
else: | |
return meta | |
def transcribe(audio, translate_action=True, language='Autodetect', override_model_size=''): | |
""" | |
Transcribe audio file with whisper | |
:param audio: - The audio file to transcribe | |
:param translate_action: Bool - Whether to translate to English or keep original language | |
:param language: String - The language to transcribe to, default is Autodetect | |
:param override_model_size: Bool - Whether to override the model size | |
:return: | |
""" | |
task = "translate" if translate_action else "transcribe" | |
model_size_to_load = override_model_size if override_model_size else model_size | |
print(f'Starting {task} with whisper size {model_size_to_load} on {audio}') | |
global model | |
if not preload_model or model_size != override_model_size: | |
model = whisper.load_model(model_size_to_load) | |
props = { | |
"task": task, | |
} | |
if language != 'Autodetect': | |
props["language"] = TO_LANGUAGE_CODE[language.lower()] if len(language) > 2 else language | |
output = model.transcribe(audio, verbose=True, **props) | |
output['segments'] = output['segments'] | |
output['requested_language'] = language.lower() | |
print(f'Finished transcribe from {LANGUAGES[output["language"]].capitalize()}', output["text"]) | |
return output | |