#! /usr/bin/env python
# coding=utf-8
# Copyright 2022 Bofeng Huang
import datetime
import logging
import os
import re
import warnings
import gradio as gr
import pandas as pd
import psutil
import pytube as pt
import torch
# import whisper
from faster_whisper import WhisperModel
from huggingface_hub import hf_hub_download, snapshot_download
from transformers.utils.logging import disable_progress_bar
import nltk
nltk.download("punkt")
from nltk.tokenize import sent_tokenize
warnings.filterwarnings("ignore")
disable_progress_bar()
# DEFAULT_MODEL_NAME = "bofenghuang/whisper-large-v2-cv11-german"
DEFAULT_MODEL_NAME = "bofenghuang/whisper-large-v2-cv11-german-ct2"
# CHECKPOINT_FILENAME = "checkpoint_openai.pt"
GEN_KWARGS = {
"task": "transcribe",
"language": "de",
# "without_timestamps": True,
# decode options
# "beam_size": 5,
# "patience": 2,
# disable fallback
# "compression_ratio_threshold": None,
# "logprob_threshold": None,
# vad threshold
# "no_speech_threshold": None,
}
logging.basicConfig(
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%SZ",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# device = 0 if torch.cuda.is_available() else "cpu"
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Model will be loaded on device `{device}`")
cached_models = {}
def format_timestamp(seconds):
return str(datetime.timedelta(seconds=round(seconds)))
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
HTML_str = (
f'
' "
"
)
return HTML_str
def download_audio_from_youtube(yt_url, downloaded_filename="audio.wav"):
yt = pt.YouTube(yt_url)
stream = yt.streams.filter(only_audio=True)[0]
# stream.download(filename="audio.mp3")
stream.download(filename=downloaded_filename)
return downloaded_filename
def download_video_from_youtube(yt_url, downloaded_filename="video.mp4"):
yt = pt.YouTube(yt_url)
stream = yt.streams.filter(progressive=True, file_extension="mp4").order_by("resolution").desc().first()
stream.download(filename=downloaded_filename)
logger.info(f"Download YouTube video from {yt_url}")
return downloaded_filename
def _print_memory_info():
memory = psutil.virtual_memory()
logger.info(
f"Memory info - Free: {memory.available / (1024 ** 3):.2f} Gb, used: {memory.percent}%, total: {memory.total / (1024 ** 3):.2f} Gb"
)
def _print_cuda_memory_info():
used_mem, tot_mem = torch.cuda.mem_get_info()
logger.info(
f"CUDA memory info - Free: {used_mem / 1024 ** 3:.2f} Gb, used: {(tot_mem - used_mem) / 1024 ** 3:.2f} Gb, total: {tot_mem / 1024 ** 3:.2f} Gb"
)
def print_memory_info():
_print_memory_info()
_print_cuda_memory_info()
def maybe_load_cached_pipeline(model_name):
model = cached_models.get(model_name)
if model is None:
# downloaded_model_path = hf_hub_download(repo_id=model_name, filename=CHECKPOINT_FILENAME)
downloaded_model_path = snapshot_download(repo_id=model_name)
# model = whisper.load_model(downloaded_model_path, device=device)
model = WhisperModel(downloaded_model_path, device=device, compute_type="float16")
logger.info(f"`{model_name}` has been loaded on device `{device}`")
print_memory_info()
cached_models[model_name] = model
return model
def infer(model, filename, with_timestamps, return_df=False):
if with_timestamps:
# model_outputs = model.transcribe(filename, **GEN_KWARGS)
model_outputs, _ = model.transcribe(filename, **GEN_KWARGS)
model_outputs = [segment._asdict() for segment in model_outputs]
if return_df:
# model_outputs_df = pd.DataFrame(model_outputs["segments"])
model_outputs_df = pd.DataFrame(model_outputs)
# print(model_outputs)
# print(model_outputs_df)
# print(model_outputs_df.info(verbose=True))
model_outputs_df = model_outputs_df[["start", "end", "text"]]
model_outputs_df["start"] = model_outputs_df["start"].map(format_timestamp)
model_outputs_df["end"] = model_outputs_df["end"].map(format_timestamp)
model_outputs_df["text"] = model_outputs_df["text"].str.strip()
return model_outputs_df
else:
return "\n\n".join(
[
f'Segment {segment["id"]+1} from {segment["start"]:.2f}s to {segment["end"]:.2f}s:\n{segment["text"].strip()}'
# for segment in model_outputs["segments"]
for segment in model_outputs
]
)
else:
# text = model.transcribe(filename, without_timestamps=True, **GEN_KWARGS)["text"]
model_outputs, _ = model.transcribe(filename, without_timestamps=True, **GEN_KWARGS)
text = " ".join([segment.text for segment in model_outputs])
if return_df:
return pd.DataFrame({"text": sent_tokenize(text)})
else:
return text
def transcribe(microphone, file_upload, with_timestamps, model_name=DEFAULT_MODEL_NAME):
warn_output = ""
if (microphone is not None) and (file_upload is not None):
warn_output = (
"WARNING: You've uploaded an audio file and used the microphone. "
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n"
)
elif (microphone is None) and (file_upload is None):
return "ERROR: You have to either use the microphone or upload an audio file"
file = microphone if microphone is not None else file_upload
model = maybe_load_cached_pipeline(model_name)
# text = model.transcribe(file, **GEN_KWARGS)["text"]
# text = infer(model, file, with_timestamps)
text = infer(model, file, with_timestamps, return_df=True)
logger.info(f'Transcription by `{model_name}`:\n{text.to_json(orient="index", force_ascii=False, indent=2)}\n')
# return warn_output + text
return text
def yt_transcribe(yt_url, with_timestamps, model_name=DEFAULT_MODEL_NAME):
# html_embed_str = _return_yt_html_embed(yt_url)
audio_file_path = download_audio_from_youtube(yt_url)
model = maybe_load_cached_pipeline(model_name)
# text = model.transcribe("audio.mp3", **GEN_KWARGS)["text"]
# text = infer(model, audio_file_path, with_timestamps)
text = infer(model, audio_file_path, with_timestamps, return_df=True)
logger.info(f'Transcription by `{model_name}` of "{yt_url}":\n{text.to_json(orient="index", force_ascii=False, indent=2)}\n')
# return html_embed_str, text
return text
def video_transcribe(video_file_path, with_timestamps, model_name=DEFAULT_MODEL_NAME):
if video_file_path is None:
raise ValueError("Failed to transcribe video as no video_file_path has been defined")
audio_file_path = re.sub(r"\.mp4$", ".wav", video_file_path)
os.system(f'ffmpeg -hide_banner -loglevel error -y -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file_path}"')
model = maybe_load_cached_pipeline(model_name)
# text = model.transcribe("audio.mp3", **GEN_KWARGS)["text"]
text = infer(model, audio_file_path, with_timestamps, return_df=True)
logger.info(f'Transcription by `{model_name}`:\n{text.to_json(orient="index", force_ascii=False, indent=2)}\n')
return text
# load default model
maybe_load_cached_pipeline(DEFAULT_MODEL_NAME)
# default_text_output_df = pd.DataFrame(columns=["start", "end", "text"])
default_text_output_df = pd.DataFrame(columns=["text"])
with gr.Blocks() as demo:
with gr.Tab("Transcribe Audio"):
gr.Markdown(
f"""
Whisper German Demo: Transcribe Audio
Transcribe long-form microphone or audio inputs!
Demo uses the fine-tuned checkpoint: {DEFAULT_MODEL_NAME} to transcribe audio files of arbitrary length.
Efficient inference is supported by [faster-whisper](https://github.com/guillaumekln/faster-whisper) and [CTranslate2](https://github.com/OpenNMT/CTranslate2).
"""
)
microphone_input = gr.inputs.Audio(source="microphone", type="filepath", label="Record", optional=True)
upload_input = gr.inputs.Audio(source="upload", type="filepath", label="Upload File", optional=True)
with_timestamps_input = gr.Checkbox(label="With timestamps?")
microphone_transcribe_btn = gr.Button("Transcribe Audio")
# gr.Markdown('''
# Here you will get generated transcrit.
# ''')
# microphone_text_output = gr.outputs.Textbox(label="Transcription")
text_output_df2 = gr.DataFrame(
value=default_text_output_df,
label="Transcription",
row_count=(0, "dynamic"),
max_rows=10,
wrap=True,
overflow_row_behaviour="paginate",
)
microphone_transcribe_btn.click(
transcribe, inputs=[microphone_input, upload_input, with_timestamps_input], outputs=text_output_df2
)
# with gr.Tab("Transcribe YouTube"):
# gr.Markdown(
# f"""
#
#
Whisper German Demo: Transcribe YouTube
#
# Transcribe long-form YouTube videos!
# Demo uses the fine-tuned checkpoint: {DEFAULT_MODEL_NAME} to transcribe video files of arbitrary length.
# """
# )
# yt_link_input2 = gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL")
# with_timestamps_input2 = gr.Checkbox(label="With timestamps?", value=True)
# yt_transcribe_btn = gr.Button("Transcribe YouTube")
# # yt_text_output = gr.outputs.Textbox(label="Transcription")
# text_output_df3 = gr.DataFrame(
# value=default_text_output_df,
# label="Transcription",
# row_count=(0, "dynamic"),
# max_rows=10,
# wrap=True,
# overflow_row_behaviour="paginate",
# )
# # yt_html_output = gr.outputs.HTML(label="YouTube Page")
# yt_transcribe_btn.click(yt_transcribe, inputs=[yt_link_input2, with_timestamps_input2], outputs=[text_output_df3])
with gr.Tab("Transcribe Video"):
gr.Markdown(
f"""
Whisper German Demo: Transcribe Video
Transcribe long-form YouTube videos or uploaded video inputs!
Demo uses the fine-tuned checkpoint: {DEFAULT_MODEL_NAME} to transcribe video files of arbitrary length.
Efficient inference is supported by [faster-whisper](https://github.com/guillaumekln/faster-whisper) and [CTranslate2](https://github.com/OpenNMT/CTranslate2).
"""
)
yt_link_input = gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL")
download_youtube_btn = gr.Button("Download Youtube video")
downloaded_video_output = gr.Video(label="Video file", mirror_webcam=False)
download_youtube_btn.click(download_video_from_youtube, inputs=[yt_link_input], outputs=[downloaded_video_output])
with_timestamps_input3 = gr.Checkbox(label="With timestamps?", value=True)
video_transcribe_btn = gr.Button("Transcribe video")
text_output_df = gr.DataFrame(
value=default_text_output_df,
label="Transcription",
row_count=(0, "dynamic"),
max_rows=10,
wrap=True,
overflow_row_behaviour="paginate",
)
video_transcribe_btn.click(video_transcribe, inputs=[downloaded_video_output, with_timestamps_input3], outputs=[text_output_df])
# demo.launch(server_name="0.0.0.0", debug=True)
# demo.launch(server_name="0.0.0.0", debug=True, share=True)
demo.launch(enable_queue=True)