Spaces:
Running
Running
# import torch | |
# import gradio as gr | |
# import yt_dlp as youtube_dl | |
# from transformers import pipeline | |
# from huggingface_hub import model_info | |
# import re | |
# import tempfile | |
# import os | |
# MODEL_NAME = "razhan/whisper-small-ckb" | |
# BATCH_SIZE = 1 | |
# FILE_LIMIT_MB = 10 | |
# YT_LENGTH_LIMIT_S = 60 * 10 | |
# device = 0 if torch.cuda.is_available() else "cpu" | |
# pipe = pipeline( | |
# task="automatic-speech-recognition", | |
# model=MODEL_NAME, | |
# chunk_length_s=30, | |
# device=device, | |
# ) | |
# pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(task="transcribe") | |
# def transcribe(microphone, file_upload): | |
# warn_output = "" | |
# if (microphone is not None) and (file_upload is not None): | |
# warn_output = ( | |
# "WARNING: You've uploaded an audio file and used the microphone. " | |
# "The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" | |
# ) | |
# elif (microphone is None) and (file_upload is None): | |
# return "ERROR: You have to either use the microphone or upload an audio file" | |
# file = microphone if microphone is not None else file_upload | |
# text = pipe(file)["text"] | |
# return warn_output + text | |
# def _return_yt_html_embed(yt_url): | |
# if 'youtu.be' in yt_url: | |
# video_id = yt_url.split('/')[-1].split('?')[0] | |
# else: | |
# video_id = yt_url.split("?v=")[-1].split('&')[0] | |
# HTML_str = ( | |
# f'<center><iframe width="560" height="315" src="https://www.youtube.com/embed/{video_id}" ' | |
# 'frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" ' | |
# 'allowfullscreen></iframe></center>' | |
# ) | |
# return HTML_str | |
# def yt_transcribe(yt_url, task="transcribe", max_filesize=75.0, progress=gr.Progress()): | |
# html_embed_str = _return_yt_html_embed(yt_url) | |
# with tempfile.TemporaryDirectory() as tmpdirname: | |
# filepath = os.path.join(tmpdirname, "video.mp4") | |
# download_yt_audio(yt_url, filepath) | |
# with open(filepath, "rb") as f: | |
# inputs = f.read() | |
# inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
# inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
# start_time = time.time() | |
# outputs = pipe(inputs, chunk_length_s=30, batch_size=BATCH_SIZE, generate_kwargs={"task": task, "language": "persian"}, return_timestamps=False) | |
# exec_time = time.time() - start_time | |
# logging.info(print(f"transcribe: {exec_time} sec.")) | |
# return html_embed_str, txt, exec_time | |
# def download_yt_audio(yt_url, filename, progress=gr.Progress()): | |
# if '&list' in yt_url: | |
# yt_url = yt_url.split('&list')[0] | |
# info_loader = youtube_dl.YoutubeDL() | |
# try: | |
# info = info_loader.extract_info(yt_url, download=False) | |
# except youtube_dl.utils.DownloadError as err: | |
# raise gr.Error(str(err)) | |
# file_length = info["duration_string"] | |
# file_h_m_s = file_length.split(":") | |
# file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
# if len(file_h_m_s) == 1: | |
# file_h_m_s.insert(0, 0) | |
# if len(file_h_m_s) == 2: | |
# file_h_m_s.insert(0, 0) | |
# file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
# if file_length_s > YT_LENGTH_LIMIT_S: | |
# yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
# file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
# raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") | |
# # ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} | |
# ydl_opts = {"outtmpl": filename, "format": "bestaudio/best"} | |
# with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
# try: | |
# ydl.download([yt_url]) | |
# except youtube_dl.utils.ExtractorError as err: | |
# raise gr.Error(str(err)) | |
# progress(1, desc="Video downloaded from YouTube!") | |
# mf_transcribe = gr.Interface( | |
# fn=transcribe, | |
# inputs=[ | |
# gr.Audio(sources="microphone", type="filepath"), | |
# gr.Audio(sources="upload", type="filepath"), | |
# ], | |
# outputs="text", | |
# title="Whisper Central Kurdish (Sorani) Demo: Transcribe Audio", | |
# description=( | |
# "Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the the fine-tuned" | |
# f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
# " of arbitrary length." | |
# ), | |
# allow_flagging="never", | |
# ) | |
# yt_transcribe = gr.Interface( | |
# fn=yt_transcribe, | |
# inputs=[gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL")], | |
# outputs=["html", | |
# gr.Textbox( | |
# label="Output", | |
# rtl=True, | |
# show_copy_button=True, | |
# ), | |
# gr.Text(label="Transcription Time") | |
# ], | |
# title="Whisper Central Kurdish (Sorani) Demo: Transcribe YouTube", | |
# description=( | |
# "Transcribe long-form YouTube videos with the click of a button! Demo uses the the fine-tuned checkpoint:" | |
# f" [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files of" | |
# " arbitrary length." | |
# ), | |
# allow_flagging="never", | |
# ) | |
# demo = gr.TabbedInterface([mf_transcribe, yt_transcribe], ["Transcribe Audio", "Transcribe YouTube"]) | |
# if __name__ == "__main__": | |
# demo.launch() | |
import spaces | |
import torch | |
import gradio as gr | |
import yt_dlp as youtube_dl | |
from pytubefix import YouTube | |
from pytubefix.cli import on_progress | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
import tempfile | |
import os | |
MODEL_NAME = "razhan/whisper-base-ckb" | |
BATCH_SIZE = 1 | |
FILE_LIMIT_MB = 10 | |
YT_LENGTH_LIMIT_S = 60 * 10 # limit to 1 hour YouTube files | |
device = 0 if torch.cuda.is_available() else "cpu" | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
chunk_length_s=30, | |
device=device, | |
) | |
# @spaces.GPU | |
def transcribe(inputs, task="transcribe"): | |
if inputs is None: | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] | |
return text | |
def _return_yt_html_embed(yt_url): | |
video_id = yt_url.split("?v=")[-1] | |
HTML_str = ( | |
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
" </center>" | |
) | |
return HTML_str | |
# def download_yt_audio(yt_url, filename): | |
# info_loader = youtube_dl.YoutubeDL() | |
# try: | |
# info = info_loader.extract_info(yt_url, download=False) | |
# except youtube_dl.utils.DownloadError as err: | |
# raise gr.Error(str(err)) | |
# file_length = info["duration_string"] | |
# file_h_m_s = file_length.split(":") | |
# file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
# if len(file_h_m_s) == 1: | |
# file_h_m_s.insert(0, 0) | |
# if len(file_h_m_s) == 2: | |
# file_h_m_s.insert(0, 0) | |
# file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
# if file_length_s > YT_LENGTH_LIMIT_S: | |
# yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
# file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
# raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") | |
# ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} | |
# with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
# try: | |
# ydl.download([yt_url]) | |
# except youtube_dl.utils.ExtractorError as err: | |
# raise gr.Error(str(err)) | |
# yt = pt.YouTube(yt_url) | |
# stream = yt.streams.filter(only_audio=True)[0] | |
# stream.download(filename=filename) | |
# @spaces.GPU | |
# def yt_transcribe(yt_url, task="transcribe", max_filesize=75.0): | |
# html_embed_str = _return_yt_html_embed(yt_url) | |
# with tempfile.TemporaryDirectory() as tmpdirname: | |
# # filepath = os.path.join(tmpdirname, "video.mp4") | |
# filepath = os.path.join(tmpdirname, "audio.mp3") | |
# download_yt_audio(yt_url, filepath) | |
# with open(filepath, "rb") as f: | |
# inputs = f.read() | |
# inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
# inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
# text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] | |
# return html_embed_str, text | |
def yt_transcribe(yt_url, task="transcribe", progress=gr.Progress(), max_filesize=75.0): | |
progress(0, desc="Loading audio file...") | |
html_embed_str = _return_yt_html_embed(yt_url) | |
try: | |
# yt = pytube.YouTube(yt_url) | |
# stream = yt.streams.filter(only_audio=True)[0] | |
yt = YouTube(yt_url, on_progress_callback = on_progress, use_po_token=True) | |
stream = yt.streams.get_audio_only() | |
except: | |
raise gr.Error("An error occurred while loading the YouTube video. Please try again.") | |
if stream.filesize_mb > max_filesize: | |
raise gr.Error(f"Maximum YouTube file size is {max_filesize}MB, got {stream.filesize_mb:.2f}MB.") | |
# stream.download(filename="audio.mp3") | |
stream.download(filename="audio.mp3", mp3=True) | |
with open("audio.mp3", "rb") as f: | |
inputs = f.read() | |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] | |
return html_embed_str, text | |
demo = gr.Blocks(theme=gr.themes.Ocean()) | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="microphone", type="filepath"), | |
# gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
], | |
outputs="text", | |
title="Whisper Central Kurdish (Sorani) Demo: Transcribe Audio", | |
description=( | |
"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the" | |
f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
" of arbitrary length." | |
), | |
flagging_mode="never", | |
) | |
file_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
# gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
], | |
outputs="text", | |
title="Whisper Central Kurdish (Sorani) Demo: Transcribe Audio", | |
description=( | |
"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the" | |
f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
" of arbitrary length." | |
), | |
flagging_mode="never", | |
) | |
yt_transcribe = gr.Interface( | |
fn=yt_transcribe, | |
inputs=[ | |
gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), | |
# gr.Radio(["transcribe", "translate"], label="Task", value="transcribe") | |
], | |
outputs=["html", "text"], | |
title="Whisper Central Kurdish (Sorani) Demo: Transcribe YouTube", | |
description=( | |
"Transcribe long-form YouTube videos with the click of a button! Demo uses the checkpoint" | |
f" [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe video files of" | |
" arbitrary length." | |
), | |
flagging_mode="never", | |
) | |
with demo: | |
# gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) | |
gr.TabbedInterface([mf_transcribe, file_transcribe], ["Microphone", "Audio file"]) | |
demo.queue().launch(ssr_mode=False) | |