|
''' |
|
Copyright 2023-2024 LangBridge Inc. |
|
All Rights Reserved. |
|
LangBridge Restricted |
|
''' |
|
|
|
import os |
|
import gradio as gr |
|
import whisperx |
|
import numpy as np |
|
import moviepy.editor as mp |
|
from moviepy.audio.AudioClip import AudioArrayClip |
|
from pytube import YouTube |
|
import deepl |
|
import torch |
|
import pyrubberband as pyrb |
|
import soundfile as sf |
|
import librosa |
|
from TTS.api import TTS |
|
|
|
HF_TOKEN = os.environ["HF_TOKEN"] |
|
DEEPL_TOKEN = os.environ["DEEPL_TOKEN"] |
|
|
|
|
|
os.environ["COQUI_TOS_AGREED"] = "1" |
|
|
|
|
|
def extract_audio(video_path): |
|
clip = mp.VideoFileClip(video_path) |
|
audio_path = os.path.splitext(video_path)[0] + ".wav" |
|
clip.audio.write_audiofile(audio_path) |
|
return audio_path |
|
|
|
|
|
|
|
def speech_diarization(audio_path, hf_token): |
|
device = "cuda" |
|
batch_size = 16 |
|
compute_type = "float16" |
|
model = whisperx.load_model("large-v2", device, compute_type=compute_type) |
|
|
|
|
|
audio = whisperx.load_audio(audio_path) |
|
result = model.transcribe(audio, batch_size=batch_size) |
|
|
|
|
|
import gc; gc.collect(); torch.cuda.empty_cache(); del model |
|
|
|
|
|
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) |
|
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) |
|
|
|
|
|
import gc; gc.collect(); torch.cuda.empty_cache(); del model_a |
|
|
|
|
|
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device) |
|
|
|
|
|
diarize_segments = diarize_model(audio) |
|
|
|
|
|
result = whisperx.assign_word_speakers(diarize_segments, result) |
|
print(f'\n[Original transcript]:\n{result["segments"]}\n') |
|
|
|
return result["segments"] |
|
|
|
|
|
|
|
def speaker_voice_clips(transcription, audio_path): |
|
|
|
snippets_timecodes = {} |
|
for segment in transcription: |
|
speaker = segment['speaker'] |
|
|
|
if speaker not in snippets_timecodes: |
|
snippets_timecodes[speaker] = [] |
|
|
|
if len(snippets_timecodes[speaker]) < 3: |
|
snippet = { |
|
'start': segment['start'], |
|
'end': segment['end'] |
|
} |
|
snippets_timecodes[speaker].append(snippet) |
|
|
|
|
|
original_audio = mp.AudioFileClip(audio_path) |
|
audio_file_directory = os.path.dirname(audio_path) |
|
|
|
voice_clips = {} |
|
for speaker, speaker_snippets in snippets_timecodes.items(): |
|
subclips = [] |
|
for snippet in speaker_snippets: |
|
start, end = snippet['start'], snippet['end'] |
|
subclip = original_audio.subclip(start, end) |
|
subclips.append(subclip) |
|
|
|
concatenated_clip = mp.concatenate_audioclips(subclips) |
|
|
|
output_filename = os.path.join(audio_file_directory, f"{speaker}_voice_clips.wav") |
|
concatenated_clip.write_audiofile(output_filename) |
|
voice_clips[speaker] = output_filename |
|
|
|
return voice_clips |
|
|
|
|
|
|
|
def translate_transcript(transcript, target_language, deepl_token): |
|
translator = deepl.Translator(deepl_token) |
|
|
|
translated_transcript = [] |
|
for segment in transcript: |
|
text_to_translate = segment['text'] |
|
translated_text = translator.translate_text(text_to_translate, target_lang=target_language) |
|
|
|
translated_segment = { |
|
'start': segment['start'], |
|
'end': segment['end'], |
|
'text': translated_text.text, |
|
'speaker': segment['speaker'] |
|
} |
|
|
|
translated_transcript.append(translated_segment) |
|
|
|
print(f'\n[Translated transcript]:\n{translated_transcript}\n') |
|
|
|
return translated_transcript |
|
|
|
|
|
|
|
def adjust_voice_pace(sound_array, sample_rate, target_duration): |
|
duration = len(sound_array) / sample_rate |
|
tempo_change = duration / target_duration |
|
sound_array_stretched = pyrb.time_stretch(np.array(sound_array), sample_rate, tempo_change) |
|
return sound_array_stretched |
|
|
|
|
|
|
|
def voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language_codes, speaker_model, audio_path): |
|
device = "cuda" |
|
xtts2_language_code = target_language_codes[0] |
|
vits_language_code = target_language_codes[1] |
|
|
|
|
|
selected_model = None |
|
|
|
if 'vits' in speaker_model.lower() or xtts2_language_code == 'uk': |
|
selected_model = f'tts_models/{vits_language_code}/fairseq/vits' |
|
else: |
|
selected_model = 'tts_models/multilingual/multi-dataset/xtts_v2' |
|
|
|
print(selected_model) |
|
|
|
|
|
tts = None |
|
final_audio_track = None |
|
|
|
try: |
|
tts = TTS(selected_model).to(device) |
|
|
|
last_end_time = 0 |
|
clips = [] |
|
|
|
|
|
for speech_item in translated_transcription: |
|
|
|
speech_item_duration = speech_item['end'] - speech_item['start'] |
|
|
|
|
|
gap_duration = speech_item['start'] - last_end_time |
|
if gap_duration > 0: |
|
silent_audio = np.zeros((int(44100 * gap_duration), 2)) |
|
silent_clip = AudioArrayClip(silent_audio, fps=44100) |
|
clips.append(silent_clip) |
|
print(f"\nAdded silence: Start={last_end_time}, Duration={gap_duration}") |
|
|
|
|
|
print(f"[{speech_item['speaker']}]") |
|
|
|
sample_rate = None |
|
audio = None |
|
if 'vits' in selected_model: |
|
audio = tts.tts(text=speech_item['text'], speaker_wav=speakers_voice_clips[speech_item['speaker']]) |
|
sample_rate = tts.synthesizer.output_sample_rate |
|
else: |
|
audio = tts.tts(text=speech_item['text'], speaker_wav=speakers_voice_clips[speech_item['speaker']], language=xtts2_language_code) |
|
sample_rate = tts.synthesizer.output_sample_rate |
|
|
|
|
|
audio_duration = len(audio) / sample_rate |
|
if speech_item_duration < audio_duration: |
|
audio = adjust_voice_pace(audio, sample_rate, speech_item_duration) |
|
|
|
|
|
new_sample_rate = 44100 |
|
audio = librosa.resample(np.array(audio), orig_sr=sample_rate, target_sr=new_sample_rate) |
|
|
|
|
|
audio = np.expand_dims(audio, axis=1) |
|
audio_stereo = np.repeat(audio, 2, axis=1) |
|
audio_clip = AudioArrayClip(audio_stereo, fps=44100) |
|
|
|
|
|
audio_clip = audio_clip.subclip(0, audio_clip.duration - 0.2) |
|
clips.append(audio_clip) |
|
print(f"Added speech: Start={speech_item['start']}, Final duration={audio_clip.duration}, Original duration={speech_item_duration}") |
|
|
|
last_end_time = speech_item['start'] + audio_clip.duration |
|
|
|
|
|
final_audio_track = mp.concatenate_audioclips(clips) |
|
|
|
audio_files_directory = os.path.dirname(audio_path) |
|
final_audio_track.write_audiofile(os.path.join(audio_files_directory, "translated_voice_track.wav"), fps=44100) |
|
|
|
except Exception as e: |
|
if tts is not None: |
|
import gc; gc.collect(); torch.cuda.empty_cache(); del tts |
|
raise e |
|
|
|
return final_audio_track |
|
|
|
|
|
def dub_video(video_path, translated_audio_track, target_language): |
|
video = mp.VideoFileClip(video_path) |
|
video = video.subclip(0, translated_audio_track.duration) |
|
original_audio = video.audio.volumex(0.15) |
|
dubbed_audio = mp.CompositeAudioClip([original_audio, translated_audio_track.set_start(0)]) |
|
video_with_dubbing = video.set_audio(dubbed_audio) |
|
|
|
video_with_dubbing_path = os.path.splitext(video_path)[0] + "_" + target_language + ".mp4" |
|
video_with_dubbing.write_videofile(video_with_dubbing_path) |
|
|
|
return video_with_dubbing_path |
|
|
|
|
|
|
|
def video_translation(video_path, target_language_codes, speaker_model, hf_token, deepl_token): |
|
|
|
original_audio_path = extract_audio(video_path) |
|
|
|
transcription = speech_diarization(original_audio_path, hf_token) |
|
|
|
translated_transcription = translate_transcript(transcription, target_language_codes[2], deepl_token) |
|
|
|
speakers_voice_clips = speaker_voice_clips(transcription, original_audio_path) |
|
|
|
translated_audio_track = voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language_codes, speaker_model, original_audio_path) |
|
|
|
video_with_dubbing = dub_video(video_path, translated_audio_track, target_language_codes[0]) |
|
|
|
return video_with_dubbing |
|
|
|
|
|
|
|
|
|
|
|
language_codes = { |
|
"Chinese": ("zh-cn", "zho", "zh"), |
|
"Czech": ("cs", "ces", "cs"), |
|
"Dutch": ("nl", "nld", "nl"), |
|
"English": ("en", "eng", "en-us"), |
|
"French": ("fr", "fra", "fr"), |
|
"German": ("de", "deu", "de"), |
|
"Hungarian": ("hu", "hun", "hu"), |
|
"Italian": ("it", "ita", "it"), |
|
"Japanese": ("ja", "jpn", "ja"), |
|
"Korean": ("ko", "kor", "ko"), |
|
"Polish": ("pl", "pol", "pl"), |
|
"Portuguese": ("pt", "por", "pt"), |
|
"Russian": ("ru", "rus", "ru"), |
|
"Spanish": ("es", "spa", "es"), |
|
"Turkish": ("tr", "tur", "tr"), |
|
"Ukrainian": ("uk", "ukr", "uk") |
|
} |
|
|
|
def check_video_duration(video_path): |
|
with mp.VideoFileClip(video_path) as video: |
|
duration = video.duration |
|
return duration > 180 |
|
|
|
def download_youtube_video(url): |
|
yt = YouTube(url) |
|
if yt.age_restricted: |
|
gr.Warning("The Youtube video you are trying to translate is age restricted. Manually download it using the following link(https://en.savefrom.net/) and use file upload, as pytube library doesn't support restricted videos download.") |
|
return None |
|
stream = yt.streams.filter(file_extension='mp4').first() |
|
output_path = stream.download() |
|
return output_path |
|
|
|
def translation_limit(): |
|
translator = deepl.Translator(DEEPL_TOKEN) |
|
usage = translator.get_usage() |
|
if usage.character.valid: |
|
characters_used = usage.character.count |
|
minutes_used = characters_used / 750 |
|
max_minutes = usage.character.limit / 750 |
|
percent_used = (minutes_used / max_minutes) * 100 |
|
|
|
|
|
used_time_str = f"{int(minutes_used)} min used" |
|
max_time_str = f"{int(max_minutes)} min total" |
|
if minutes_used >= 60: |
|
hours_used = int(minutes_used // 60) |
|
minutes_used = int(minutes_used % 60) |
|
used_time_str = f"{hours_used} hrs, {minutes_used} min used" |
|
if max_minutes >= 60: |
|
hours_max = int(max_minutes // 60) |
|
remaining_minutes_max = int(max_minutes % 60) |
|
max_time_str = f"{hours_max} hrs, {remaining_minutes_max} min total" |
|
|
|
progress_bar_html = ( |
|
"<div style='width: 100%; background-color: #adb5bd; position: relative; text-align: center; " |
|
"line-height: 2em; color: white; font-weight: bold;'>" |
|
"<div style='position: absolute; width: 100%; left: 0; top: 0; z-index: 1;'>" |
|
f"{used_time_str} / {max_time_str}" |
|
"</div>" |
|
f"<div style='height: 2em; background-color: #4caf50; width: {percent_used}%; z-index: 0;'>" |
|
"</div>" |
|
"</div>" |
|
) |
|
return progress_bar_html |
|
else: |
|
return "<div style='color: red; text-align: center;'>Translation limit is reached</div>" |
|
|
|
def clear_inputs(): |
|
return None, "", None, None |
|
|
|
def translate_video(video_path, youtube_link, target_language, speaker_model): |
|
try: |
|
if not video_path and not youtube_link: |
|
gr.Warning("You should either upload video or input a YouTube link") |
|
return translation_limit(), None |
|
if youtube_link: |
|
video_path = download_youtube_video(youtube_link) |
|
if video_path is None: |
|
gr.Warning("Video input did not process well, try again") |
|
return translation_limit(), None |
|
|
|
if check_video_duration(video_path): |
|
gr.Warning("Video is longer than 3 minutes, please provide a shorter one") |
|
return translation_limit(), None |
|
|
|
target_language_codes = language_codes[target_language] |
|
dubbed_video_path = video_translation(video_path, target_language_codes, speaker_model, HF_TOKEN, DEEPL_TOKEN) |
|
limit_info = translation_limit() |
|
return limit_info, dubbed_video_path |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
raise e |
|
|
|
css = """ |
|
.column-frame { |
|
border: 2px solid #AAA; |
|
border-radius: 10px; |
|
padding: 10px; |
|
margin: 10px; |
|
} |
|
""" |
|
|
|
initial_usage_info = translation_limit() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo: |
|
|
|
gr.Markdown("<h1 style='text-align: center;'>🌐AI Video Translation</h2>") |
|
|
|
with gr.Row(): |
|
with gr.Column(elem_classes=["column-frame"]): |
|
gr.Markdown("<h2 style='text-align: center;'>Inputs</h3>") |
|
translation_limit_info = gr.HTML(value=translation_limit()) |
|
video = gr.Video(label="Upload a video file") |
|
gr.Markdown("<h3 style='text-align: center;'>OR</h3>") |
|
youtube_link = gr.Textbox(label="Paste YouTube link") |
|
gr.Markdown("⚠️If you get a warning that the video is age restricted, manually download it using the following [link](https://downloaderto.com/) and use file upload, as pytube library doesn't support restricted videos download.") |
|
gr.Markdown("---") |
|
target_language = gr.Dropdown(list(language_codes.keys()), value="English", label="Select translation target language") |
|
speaker_model = gr.Dropdown(["(Recommended) XTTS_V2", "VITs (will be default for Ukrainian)"], value="(Recommended) XTTS_V2", label="Select text-to-speech generation model") |
|
with gr.Row(): |
|
clear_btn = gr.Button("Clear inputs") |
|
translate_btn = gr.Button("Translate") |
|
|
|
with gr.Column(): |
|
with gr.Row(elem_classes=["column-frame"]): |
|
with gr.Column(): |
|
gr.Markdown("<h2 style='text-align: center;'>Translated Video</h3>") |
|
output_video = gr.Video(label="Translated video") |
|
gr.Examples( |
|
[[None, 'https://www.youtube.com/watch?v=q4kkQSkrrtI', 'Japanese', "(Recommended) XTTS_V2"]], |
|
[video, youtube_link, target_language, speaker_model], |
|
[translation_limit_info, output_video], |
|
translate_video, |
|
run_on_click=True, |
|
) |
|
translate_btn.click( |
|
fn=translate_video, |
|
inputs=[video, youtube_link, target_language, speaker_model], |
|
outputs=[translation_limit_info, output_video] |
|
) |
|
|
|
clear_btn.click( |
|
fn=clear_inputs, |
|
inputs=[], |
|
outputs=[video, youtube_link, target_language, speaker_model] |
|
) |
|
|
|
demo.launch(show_error=True, debug=True, share=True) |