''' |
Copyright 2023-2024 LangBridge Inc. |
All Rights Reserved. |
LangBridge Restricted |
''' |
import os |
import gradio as gr |
import whisperx |
import numpy as np |
import moviepy.editor as mp |
from moviepy.audio.AudioClip import AudioArrayClip |
from pytube import YouTube |
import deepl |
import torch |
import pyrubberband as pyrb |
import soundfile as sf |
import librosa |
from TTS.api import TTS |
HF_TOKEN = os.environ["HF_TOKEN"] |
DEEPL_TOKEN = os.environ["DEEPL_TOKEN"] |
os.environ["COQUI_TOS_AGREED"] = "1" |
def extract_audio(video_path): |
clip = mp.VideoFileClip(video_path) |
audio_path = os.path.splitext(video_path)[0] + ".wav" |
clip.audio.write_audiofile(audio_path) |
return audio_path |
def speech_diarization(audio_path, hf_token): |
device = "cuda" |
batch_size = 16 |
compute_type = "float16" |
model = whisperx.load_model("large-v2", device, compute_type=compute_type) |
audio = whisperx.load_audio(audio_path) |
result = model.transcribe(audio, batch_size=batch_size) |
import gc; gc.collect(); torch.cuda.empty_cache(); del model |
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) |
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) |
import gc; gc.collect(); torch.cuda.empty_cache(); del model_a |
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device) |
diarize_segments = diarize_model(audio) |
result = whisperx.assign_word_speakers(diarize_segments, result) |
print(f'\n[Original transcript]:\n{result["segments"]}\n') |
return result["segments"] |
def speaker_voice_clips(transcription, audio_path): |
snippets_timecodes = {} |
for segment in transcription: |
speaker = segment['speaker'] |
if speaker not in snippets_timecodes: |
snippets_timecodes[speaker] = [] |
if len(snippets_timecodes[speaker]) < 3: |
snippet = { |
'start': segment['start'], |
'end': segment['end'] |
} |
snippets_timecodes[speaker].append(snippet) |
original_audio = mp.AudioFileClip(audio_path) |
audio_file_directory = os.path.dirname(audio_path) |
voice_clips = {} |
for speaker, speaker_snippets in snippets_timecodes.items(): |
subclips = [] |
for snippet in speaker_snippets: |
start, end = snippet['start'], snippet['end'] |
subclip = original_audio.subclip(start, end) |
subclips.append(subclip) |
concatenated_clip = mp.concatenate_audioclips(subclips) |
output_filename = os.path.join(audio_file_directory, f"{speaker}_voice_clips.wav") |
concatenated_clip.write_audiofile(output_filename) |
voice_clips[speaker] = output_filename |
return voice_clips |
def translate_transcript(transcript, target_language, deepl_token): |
translator = deepl.Translator(deepl_token) |
translated_transcript = [] |
for segment in transcript: |
text_to_translate = segment['text'] |
translated_text = translator.translate_text(text_to_translate, target_lang=target_language) |
translated_segment = { |
'start': segment['start'], |
'end': segment['end'], |
'text': translated_text.text, |
'speaker': segment['speaker'] |
} |
translated_transcript.append(translated_segment) |
print(f'\n[Translated transcript]:\n{translated_transcript}\n') |
return translated_transcript |
def adjust_voice_pace(sound_array, sample_rate, target_duration): |
duration = len(sound_array) / sample_rate |
tempo_change = duration / target_duration |
sound_array_stretched = pyrb.time_stretch(np.array(sound_array), sample_rate, tempo_change) |
return sound_array_stretched |
def voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language_codes, speaker_model, audio_path): |
device = "cuda" |
xtts2_language_code = target_language_codes[0] |
vits_language_code = target_language_codes[1] |
selected_model = None |
if 'vits' in speaker_model.lower() or xtts2_language_code == 'uk': |
selected_model = f'tts_models/{vits_language_code}/fairseq/vits' |
else: |
selected_model = 'tts_models/multilingual/multi-dataset/xtts_v2' |
print(selected_model) |
tts = None |
final_audio_track = None |
try: |
tts = TTS(selected_model).to(device) |
last_end_time = 0 |
clips = [] |
for speech_item in translated_transcription: |
speech_item_duration = speech_item['end'] - speech_item['start'] |
gap_duration = speech_item['start'] - last_end_time |
if gap_duration > 0: |
silent_audio = np.zeros((int(44100 * gap_duration), 2)) |
silent_clip = AudioArrayClip(silent_audio, fps=44100) |
clips.append(silent_clip) |
print(f"\nAdded silence: Start={last_end_time}, Duration={gap_duration}") |
print(f"[{speech_item['speaker']}]") |
sample_rate = None |
audio = None |
if 'vits' in selected_model: |
audio = tts.tts(text=speech_item['text'], speaker_wav=speakers_voice_clips[speech_item['speaker']]) |
sample_rate = tts.synthesizer.output_sample_rate |
else: |
audio = tts.tts(text=speech_item['text'], speaker_wav=speakers_voice_clips[speech_item['speaker']], language=xtts2_language_code) |
sample_rate = tts.synthesizer.output_sample_rate |
audio_duration = len(audio) / sample_rate |
if speech_item_duration < audio_duration: |
audio = adjust_voice_pace(audio, sample_rate, speech_item_duration) |
new_sample_rate = 44100 |
audio = librosa.resample(np.array(audio), orig_sr=sample_rate, target_sr=new_sample_rate) |
audio = np.expand_dims(audio, axis=1) |
audio_stereo = np.repeat(audio, 2, axis=1) |
audio_clip = AudioArrayClip(audio_stereo, fps=44100) |
audio_clip = audio_clip.subclip(0, audio_clip.duration - 0.2) |
clips.append(audio_clip) |
print(f"Added speech: Start={speech_item['start']}, Final duration={audio_clip.duration}, Original duration={speech_item_duration}") |
last_end_time = speech_item['start'] + audio_clip.duration |
final_audio_track = mp.concatenate_audioclips(clips) |
audio_files_directory = os.path.dirname(audio_path) |
final_audio_track.write_audiofile(os.path.join(audio_files_directory, "translated_voice_track.wav"), fps=44100) |
except Exception as e: |
if tts is not None: |
import gc; gc.collect(); torch.cuda.empty_cache(); del tts |
raise e |
return final_audio_track |
def dub_video(video_path, translated_audio_track, target_language): |
video = mp.VideoFileClip(video_path) |
video = video.subclip(0, translated_audio_track.duration) |
original_audio = video.audio.volumex(0.15) |
dubbed_audio = mp.CompositeAudioClip([original_audio, translated_audio_track.set_start(0)]) |
video_with_dubbing = video.set_audio(dubbed_audio) |
video_with_dubbing_path = os.path.splitext(video_path)[0] + "_" + target_language + ".mp4" |
video_with_dubbing.write_videofile(video_with_dubbing_path) |
return video_with_dubbing_path |
def video_translation(video_path, target_language_codes, speaker_model, hf_token, deepl_token): |
original_audio_path = extract_audio(video_path) |
transcription = speech_diarization(original_audio_path, hf_token) |
translated_transcription = translate_transcript(transcription, target_language_codes[2], deepl_token) |
speakers_voice_clips = speaker_voice_clips(transcription, original_audio_path) |
translated_audio_track = voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language_codes, speaker_model, original_audio_path) |
video_with_dubbing = dub_video(video_path, translated_audio_track, target_language_codes[0]) |
return video_with_dubbing |
language_codes = { |
"Chinese": ("zh-cn", "zho", "zh"), |
"Czech": ("cs", "ces", "cs"), |
"Dutch": ("nl", "nld", "nl"), |
"English": ("en", "eng", "en-us"), |
"French": ("fr", "fra", "fr"), |
"German": ("de", "deu", "de"), |
"Hungarian": ("hu", "hun", "hu"), |
"Italian": ("it", "ita", "it"), |
"Japanese": ("ja", "jpn", "ja"), |
"Korean": ("ko", "kor", "ko"), |
"Polish": ("pl", "pol", "pl"), |
"Portuguese": ("pt", "por", "pt"), |
"Russian": ("ru", "rus", "ru"), |
"Spanish": ("es", "spa", "es"), |
"Turkish": ("tr", "tur", "tr"), |
"Ukrainian": ("uk", "ukr", "uk") |
} |
def check_video_duration(video_path): |
with mp.VideoFileClip(video_path) as video: |
duration = video.duration |
return duration > 180 |
def download_youtube_video(url): |
yt = YouTube(url) |
if yt.age_restricted: |
gr.Warning("The Youtube video you are trying to translate is age restricted. Manually download it using the following link(https://en.savefrom.net/) and use file upload, as pytube library doesn't support restricted videos download.") |
return None |
stream = yt.streams.filter(file_extension='mp4').first() |
output_path = stream.download() |
return output_path |
def translation_limit(): |
translator = deepl.Translator(DEEPL_TOKEN) |
usage = translator.get_usage() |
if usage.character.valid: |
characters_used = usage.character.count |
minutes_used = characters_used / 750 |
max_minutes = usage.character.limit / 750 |
percent_used = (minutes_used / max_minutes) * 100 |
used_time_str = f"{int(minutes_used)} min used" |
max_time_str = f"{int(max_minutes)} min total" |
if minutes_used >= 60: |
hours_used = int(minutes_used // 60) |
minutes_used = int(minutes_used % 60) |
used_time_str = f"{hours_used} hrs, {minutes_used} min used" |
if max_minutes >= 60: |
hours_max = int(max_minutes // 60) |
remaining_minutes_max = int(max_minutes % 60) |
max_time_str = f"{hours_max} hrs, {remaining_minutes_max} min total" |
progress_bar_html = ( |
"<div style='width: 100%; background-color: #adb5bd; position: relative; text-align: center; " |
"line-height: 2em; color: white; font-weight: bold;'>" |
"<div style='position: absolute; width: 100%; left: 0; top: 0; z-index: 1;'>" |
f"{used_time_str} / {max_time_str}" |
"</div>" |
f"<div style='height: 2em; background-color: #4caf50; width: {percent_used}%; z-index: 0;'>" |
"</div>" |
"</div>" |
) |
return progress_bar_html |
else: |
return "<div style='color: red; text-align: center;'>Translation limit is reached</div>" |
def clear_inputs(): |
return None, "", None, None |
def translate_video(video_path, youtube_link, target_language, speaker_model): |
try: |
if not video_path and not youtube_link: |
gr.Warning("You should either upload video or input a YouTube link") |
return translation_limit(), None |
if youtube_link: |
video_path = download_youtube_video(youtube_link) |
if video_path is None: |
gr.Warning("Video input did not process well, try again") |
return translation_limit(), None |
if check_video_duration(video_path): |
gr.Warning("Video is longer than 3 minutes, please provide a shorter one") |
return translation_limit(), None |
target_language_codes = language_codes[target_language] |
dubbed_video_path = video_translation(video_path, target_language_codes, speaker_model, HF_TOKEN, DEEPL_TOKEN) |
limit_info = translation_limit() |
return limit_info, dubbed_video_path |
except Exception as e: |
print(f"An error occurred: {e}") |
raise e |
css = """ |
.column-frame { |
border: 2px solid #AAA; |
border-radius: 10px; |
padding: 10px; |
margin: 10px; |
} |
""" |
initial_usage_info = translation_limit() |
with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo: |
gr.Markdown("<h1 style='text-align: center;'>🌐AI Video Translation</h2>") |
with gr.Row(): |
with gr.Column(elem_classes=["column-frame"]): |
gr.Markdown("<h2 style='text-align: center;'>Inputs</h3>") |
translation_limit_info = gr.HTML(value=translation_limit()) |
video = gr.Video(label="Upload a video file") |
gr.Markdown("<h3 style='text-align: center;'>OR</h3>") |
youtube_link = gr.Textbox(label="Paste YouTube link") |
gr.Markdown("⚠️If you get a warning that the video is age restricted, manually download it using the following [link](https://downloaderto.com/) and use file upload, as pytube library doesn't support restricted videos download.") |
gr.Markdown("---") |
target_language = gr.Dropdown(list(language_codes.keys()), value="English", label="Select translation target language") |
speaker_model = gr.Dropdown(["(Recommended) XTTS_V2", "VITs (will be default for Ukrainian)"], value="(Recommended) XTTS_V2", label="Select text-to-speech generation model") |
with gr.Row(): |
clear_btn = gr.Button("Clear inputs") |
translate_btn = gr.Button("Translate") |
with gr.Column(): |
with gr.Row(elem_classes=["column-frame"]): |
with gr.Column(): |
gr.Markdown("<h2 style='text-align: center;'>Translated Video</h3>") |
output_video = gr.Video(label="Translated video") |
gr.Examples( |
[[None, 'https://www.youtube.com/watch?v=q4kkQSkrrtI', 'Japanese', "(Recommended) XTTS_V2"]], |
[video, youtube_link, target_language, speaker_model], |
[translation_limit_info, output_video], |
translate_video, |
run_on_click=True, |
) |
translate_btn.click( |
fn=translate_video, |
inputs=[video, youtube_link, target_language, speaker_model], |
outputs=[translation_limit_info, output_video] |
) |
clear_btn.click( |
fn=clear_inputs, |
inputs=[], |
outputs=[video, youtube_link, target_language, speaker_model] |
) |
demo.launch(show_error=True, debug=True, share=True) |