import shutil import sys import time from pathlib import Path import anvil.server import anvil.media from whisper.utils import write_srt, write_vtt from yt_dlp import YoutubeDL from yt_dlp.utils import DownloadError import os import tempfile import json import argparse import whisper from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE import ffmpeg from utils.subs import bake_subs, get_srt from utils.utils import get_args original_dir = os.getcwd() output_dir = Path('output') args = get_args() model_size: str = args.get("model", os.environ.get("WHISPER_MODEL", "large")) preload_model: bool = args.get("preload") if preload_model: print("Preloading model") model = whisper.load_model(model_size) def download_generator(url, translate_action=True, source_language='Autodetect', corrected_subtitles=None): # Step 1 : check if video is available yield {"message": f"Checking {url} for videos"} try: meta = check_download(url) # print(json.dumps(meta, indent=2)) # if(meta['duration'] > 159) : # raise Exception("Video is too long, please use videos less than 159 seconds") yield {"message": f"Found video with {meta['duration']} seconds duration from {meta['extractor']}", "meta": meta} tempdir = output_dir/f"{meta['id']}" except Exception as e: yield {"message": f"{e}"} return # Step 2 : Download video and extract audio try: # check if we already have the folder and the main files if(tempdir.is_dir() and (tempdir/f"{meta['id']}.{meta['ext']}").is_file() and (tempdir/f"{meta['id']}.mp3").is_file()): yield {"message": f"Using cached files"} video = str((tempdir/f"{meta['id']}.{meta['ext']}").resolve()) audio = str((tempdir/f"{meta['id']}.mp3").resolve()) else: yield {"message": f"Starting download with URL {url}, this may take a while"} meta, video, audio = download(url, tempdir) yield {"message": f"Downloaded video and extracted audio", "video": video, "audio": audio, "meta": meta} except Exception as e: os.chdir(original_dir) yield {"message": f"{e}"} raise e srt_path = tempdir / f"{meta['id']}.srt" vtt_path = tempdir / f"{meta['id']}.vtt" if not corrected_subtitles: ### Step 3 : Transcribe with whisper yield {"message": f"[PLEASE WAIT] Starting whisper transcribe with {meta['id']}.mp3"} try: whisper_result = transcribe(audio, translate_action, source_language) with open(srt_path, "w", encoding="utf-8") as srt: write_srt(whisper_result["segments"], file=srt) with open(vtt_path, "w", encoding="utf-8") as vtt: write_vtt(whisper_result["segments"], file=vtt) whisper_result["srt"] = Path(srt_path).read_text() whisper_result["vtt"] = Path(vtt_path).read_text() yield {"message": f"Transcribe successful", "whisper_result": whisper_result, "meta": meta, "srt_path": srt_path, "vtt_path": vtt_path} except Exception as e: os.chdir(original_dir) yield {"message": f"{e}"} raise e else: ### step 3.5 : use corrected subtitles yield {"message": f"Using corrected subtitles"} with open(srt_path, "w", encoding="utf-8") as srt: srt.write(corrected_subtitles) yield {"message": f"Transcribe successful", "srt_path": srt_path, "meta": meta} ### Step 4 : Bake subtitles into video with ffmpeg yield {"message": f"[PLEASE WAIT] baking subtitles into video"} try: print('Stating to bake subtitles') subbed_video_path = tempdir / f"{meta['id']}_translated.mp4" fontsdir = Path('fonts') bake_subs(video, subbed_video_path.absolute() , srt_path.absolute(), fontsdir, translate_action) yield {"message": f"Subtitled video ready!", "sub_video": str(subbed_video_path.absolute()), "meta": meta, "vtt_path": vtt_path} except ffmpeg.Error as e: print('stdout:', e.stdout.decode('utf8')) print('stderr:', e.stderr.decode('utf8')) raise e except Exception as e: print('stdout:', e.stdout.decode('utf8')) print('stderr:', e.stderr.decode('utf8')) os.chdir(original_dir) print('error', file=sys.stderr) raise e yield {"message": f"{e}"} def user_uploaded_video_generator(video, translate_action=True, source_language='Autodetect', corrected_subtitles=None): video_name = Path(video).stem # create tempdir tempdir = output_dir / video_name tempdir.mkdir(parents=True, exist_ok=True) # copy video with shutil.copy2 video_path = tempdir / Path(video).name shutil.copy2(video, video_path) yield {"message": f"Extracting audio from {video_name}", "video": video_path} # TODO : extract audio from videos output_audio = tempdir / f"{video_name}.mp3" ffmpeg.input(video_path).output(filename=output_audio).run() yield {"message": f"Got audio from {video_name}", "video": video, "audio": output_audio} # Run whisper on the audio with language unless auto try: audio_file = output_audio print(f"Starting whisper transcribe with {output_audio}") transcribe_whisper_result = transcribe(audio_file, translate_action=False, language='Autodetect', override_model_size=model_size) yield {"message": f"Finished transcription, starting translation to {transcribe_whisper_result['language']}"} detected_language = LANGUAGES[transcribe_whisper_result["language"]] translate_whisper_result = transcribe(audio_file, translate_action=True, language=detected_language, override_model_size=model_size) yield {"message": f"Finished translation to English, preparing subtitle files"} with open(tempdir / f"{video_name}.vtt", "w", encoding="utf-8") as vtt: write_vtt(transcribe_whisper_result['segments'], file=vtt) # yield {"message": f"Created VTT files", "vtt_path": f"{video_name}.vtt", "vtt_en_path": f"{video_name}.en.vtt"} # write_srt(transcribe_whisper_result['segments'], tempdir / f"{video_name}.srt") # write_srt(translate_whisper_result['segments'], tempdir / f"{video_name}_en.srt") # yield {"message": f"Created SRT files", "srt_path": f"{video_name}.srt", "srt_en_path": f"{video_name}.en.srt"} # print(f"Transcribe successful!") except Exception as e: print(f"Could not transcribe file: {e}") return def caption_generator(social_media_url,uid, language="Autodetect", model_size=model_size): with tempfile.TemporaryDirectory() as tempdir: tempdir = Path(tempdir) # try: # print(f"Downloading {social_media_url} ") # meta = check_download(social_media_url) # print(f"Downloaded {meta['id']}.mp3 from {meta['uploader_id']} and url {meta['webpage_url']}") # except Exception as e: # print(f"Could not download file: {e}") # raise try: print(f"Starting audio only download with URL {social_media_url}, this may take a while") meta, audio = download_audio(social_media_url, tempdir, id=uid) print(f"Downloaded video and extracted audio") except Exception as e: print(f"Could not download file: {e}") raise # Run whisper on the audio with language unless auto try: print(f"Starting whisper transcribe with {uid}.mp3") transcribe_whisper_result = transcribe(audio, translate_action=False, language=language, override_model_size=model_size) detected_language = LANGUAGES[transcribe_whisper_result["language"]] print(f"Transcribe successful!, writing files") vtt_path = tempdir / f"{transcribe_whisper_result['language']}.vtt" with open(vtt_path.resolve(), "w", encoding="utf-8") as vtt: write_vtt(transcribe_whisper_result["segments"], file=vtt) whisper_result_captions = [ { "language_tag": transcribe_whisper_result["language"], "vtt_text": vtt_path.read_text(encoding="utf-8"), }, ] if detected_language != "en": print(f"Transcribe successful! Starting translation to English") translate_whisper_result = transcribe(audio, translate_action=True, language=detected_language, override_model_size=model_size) en_vtt_path = tempdir / f"en.vtt" with open(en_vtt_path.resolve(), "w", encoding="utf-8") as en_vtt: write_vtt(translate_whisper_result["segments"], file=en_vtt) print(f"Finished translation to English, preparing subtitle files") whisper_result_captions.append( { "language_tag": "en", "vtt_text": en_vtt_path.read_text(encoding="utf-8"), } ) except Exception as e: print(f"Could not transcribe file: {e}") raise print(f"Finished processing {uid} file, returning results") print(whisper_result_captions) return 'success', whisper_result_captions, detected_language # Run whisper with translation task enabled (and save to different srt file) # Call anvil background task with both files, and both the plain texts def progress_hook(d): if d['status'] == 'downloading': print("downloading " + str(round(float(d['downloaded_bytes']) / float(d['total_bytes']) * 100, 1)) + "%") yield f"{d['_percent_str']} downloaded" if d['status'] == 'finished': filename = d['filename'] print(filename) yield f"Downloaded {filename}" def download(url, tempdir, format="bestvideo[ext=mp4]+bestaudio/best", verbose=False, keepVideo=True, filename="%(id)s.%(ext)s"): try: ydl_opts = { "format": format, "keepvideo": keepVideo, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], "skip_download": False, "outtmpl": f"{tempdir}/{filename}", "noplaylist": True, "verbose": verbose, "quiet": False, "progress_hooks": [progress_hook], } ydl = YoutubeDL(ydl_opts) meta = ydl.extract_info( url, download=True, ) except DownloadError as e: raise e else: audio = tempdir / f"{meta['id']}.mp3" if (keepVideo): video = tempdir / f"{meta['id']}.{meta['ext']}" return meta, str(video.resolve()), str(audio.resolve()) else: return meta, None, str(audio.resolve()) def download_audio(url, tempdir, format="bestaudio/best", verbose=False, id=None): filename = f"{id}.%(ext)s" try: ydl_opts = { "format": format, "keepvideo": False, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], "skip_download": False, "outtmpl": f"{tempdir}/{filename}", "noplaylist": True, "verbose": verbose, "quiet": False, "progress_hooks": [progress_hook], } ydl = YoutubeDL(ydl_opts) meta = ydl.extract_info( url, download=True, ) except DownloadError as e: raise e else: audio = tempdir / f"{id}.mp3" return meta, str(audio.resolve()) def check_download(url): ydl_opts = { "format": "bestvideo[ext=mp4]+bestaudio/best", "skip_download": True, "verbose": False, } ydl = YoutubeDL(ydl_opts) try: meta = ydl.extract_info( url, download=False, ) except DownloadError as e: raise e else: return meta def transcribe(audio, translate_action=True, language='Autodetect', override_model_size=''): """ Transcribe audio file with whisper :param audio: - The audio file to transcribe :param translate_action: Bool - Whether to translate to English or keep original language :param language: String - The language to transcribe to, default is Autodetect :param override_model_size: Bool - Whether to override the model size :return: """ task = "translate" if translate_action else "transcribe" model_size_to_load = override_model_size if override_model_size else model_size print(f'Starting {task} with whisper size {model_size_to_load} on {audio}') global model if not preload_model or model_size != override_model_size: model = whisper.load_model(model_size_to_load) props = { "task": task, } if language != 'Autodetect': props["language"] = TO_LANGUAGE_CODE[language.lower()] if len(language) > 2 else language output = model.transcribe(audio, verbose=True, **props) output['segments'] = output['segments'] output['requested_language'] = language.lower() print(f'Finished transcribe from {LANGUAGES[output["language"]].capitalize()}', output["text"]) return output