Spaces:
Sleeping
Sleeping
# For downloading from youtube and transcribing audio | |
from pytube import YouTube | |
from moviepy.editor import * | |
from pydub import AudioSegment | |
from pydub.utils import make_chunks | |
import pydub | |
from yt_dlp import YoutubeDL | |
from pathlib import Path | |
import subprocess | |
# For getting text from PDF | |
from zipfile import ZipFile | |
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter | |
from pdfminer.converter import TextConverter | |
from pdfminer.layout import LAParams | |
from pdfminer.pdfpage import PDFPage | |
from io import StringIO | |
# For transcription | |
import openai, whisper, torch | |
from faster_whisper import WhisperModel | |
import tiktoken | |
from nltk import tokenize | |
# For other stuff | |
import os, re | |
import time, math | |
from threading import Thread | |
# USEFUL CONSTANTS | |
# Duration is set to 6 minutes = 360 seconds = 360000 milliseconds | |
DURATION = 360000 | |
# Maximum audio file size is 18MB | |
MAX_FILE_SIZE_BYTES = 18000000 | |
# The model to use for transcription | |
WHISPER_MODEL = "tiny" | |
MODEL_SIZE = "base" | |
class DownloadAudio: | |
"""Downloads the audio from a youtube video and saves it to multiple .wav files in the specified folder""" | |
def __init__(self, link) -> None: | |
self.link = link | |
with YoutubeDL() as ydl: | |
self.yt = ydl.extract_info(self.link, download=False) | |
self.YOUTUBE_VIDEO_ID = link.split("=")[1] | |
self.WAV_FILE_NAME = f"{self.YOUTUBE_VIDEO_ID}.wav" | |
def get_yt_title(self) -> str: | |
"""Returns the title of the youtube video""" | |
return self.yt["title"] | |
def download(self, pathname:str) -> list: | |
""" | |
Download the audio from the youtube video and saves it to multiple .wav files | |
in the specified folder. Returns a list of the paths to the .wav files. | |
""" | |
# Check if the folder for the VIDEO_ID exists | |
if not os.path.exists(pathname): | |
os.mkdir(pathname) | |
FINAL_WAV_PATH = f"{pathname}/{self.WAV_FILE_NAME}" | |
if not os.path.exists(FINAL_WAV_PATH): | |
print("\n\n\n DOWNLOADING AUDIO \n\n\n") | |
current_dir = os.getcwd() | |
print(current_dir) | |
executable_path = os.path.join(current_dir, "exec/yt-dlp_linux") | |
# Download the video as an audio file using youtube-dl | |
original_download_path = f"{pathname}/audio.wav" | |
result = subprocess.run([executable_path, "-x", "--audio-format", "wav", "-o", original_download_path, self.link]) | |
if result.returncode != 0: | |
print("Failed to download audio. Retrying...") | |
return "FAILED" | |
sound = AudioSegment.from_wav(original_download_path) | |
sound.set_channels(1) | |
sound = sound.set_frame_rate(16000) | |
sound = sound.set_channels(1) | |
sound.export(FINAL_WAV_PATH, format="wav") | |
os.remove(original_download_path) | |
# Load the input .wav file | |
audio = AudioSegment.from_wav(FINAL_WAV_PATH) | |
# Get the duration of the input file in milliseconds | |
total_byte_size = os.path.getsize(FINAL_WAV_PATH) | |
# If the total duration is less than the duration of each segment, | |
# then just return the original file | |
if total_byte_size < MAX_FILE_SIZE_BYTES: | |
return [FINAL_WAV_PATH] | |
# Get the size of the wav file | |
channels = audio.channels | |
sample_width = audio.sample_width | |
duration_in_sec = math.ceil(len(audio) / 1000) | |
sample_rate = audio.frame_rate | |
bit_rate = sample_width * 8 | |
wav_file_size = (sample_rate * bit_rate * channels * duration_in_sec) / 8 | |
# Get the length of each chunk in milliseconds and make the chunks | |
chunk_length_in_sec = math.ceil((duration_in_sec * MAX_FILE_SIZE_BYTES ) / wav_file_size) #in sec | |
chunk_length_ms = chunk_length_in_sec * 1000 | |
chunks = make_chunks(audio, chunk_length_ms) | |
# Export all of the individual chunks as wav files | |
chunk_names = [] | |
for i, chunk in enumerate(chunks): | |
print(f"exporting chunk {i}") | |
chunk_name = f"{self.YOUTUBE_VIDEO_ID}_{i}.wav" | |
output_chunk_path = f"{pathname}/{chunk_name}" | |
chunk_names.append(output_chunk_path) | |
chunk.export(f"{output_chunk_path}", format="wav") | |
return chunk_names | |
class VideoTranscription: | |
"""Performs transcription on a PDF or a link to a youtube video""" | |
def __init__(self, datalink) -> None: | |
self.datalink = datalink | |
self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
self.model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8") | |
openai.api_key = os.environ.get("OPENAI_API_KEY") | |
def transcribe(self) -> dict: | |
"""Returns the transcription of the PDF or youtube video as a string""" | |
start_time = time.time() | |
if self.datalink.startswith("http"): | |
transcript = self.get_text_from_link() | |
else: | |
transcript = self.get_text_from_pdf() | |
end_time = time.time() | |
print(f"transcription took {end_time - start_time} seconds") | |
return transcript | |
def get_text_from_link(self) -> dict: | |
# Get the names of the stored wav files | |
YOUTUBE_VIDEO_ID = self.datalink.split("=")[1] | |
FOLDER_NAME = f"./tests/{YOUTUBE_VIDEO_ID}" | |
# Get the audio file | |
audio_file = DownloadAudio(self.datalink) | |
# Get the names of the stored wav files | |
file_names = audio_file.download(FOLDER_NAME) | |
print("FILE NAMES", file_names) | |
text_transcriptions = [""] * len(file_names) | |
def perform_transcription(file_name, i): | |
print("transcribing", file_name, " for ", i) | |
chunk_segments, _ = self.model.transcribe(file_name, beam_size=5) | |
for chunk_segment in chunk_segments: | |
text_transcriptions[i] += chunk_segment.text.replace("$", "\$") | |
# Initialize the threads | |
threads = [] | |
for i, file_name in enumerate(file_names): | |
threads.append(Thread(target=perform_transcription, args=(file_name, i))) | |
# Start the threads | |
for thread in threads: | |
thread.start() | |
# Wait for the threads to finish | |
for thread in threads: | |
thread.join() | |
# Get the transcription of each audio chunk | |
# for file_name in file_names: | |
# Get the transcription | |
# chunk_segments, _ = self.model.transcribe(original_file_name, beam_size=5) | |
# for chunk_segment in chunk_segments: | |
# text_transcriptions += chunk_segment.text.replace("$", "\$") | |
final_text_transcription = " ".join(text_transcriptions) | |
# Tokenize each sentence of the transcription. | |
sentences = tokenize.sent_tokenize(final_text_transcription) | |
segments = [] | |
for i, sentence in enumerate(sentences): | |
segment = { | |
"id":i, | |
"text":sentence, | |
"tokens":self.encoding.encode(sentence) | |
} | |
segments.append(segment) | |
final_transcription = { | |
"title": audio_file.get_yt_title(), | |
"text": final_text_transcription, | |
"segments": segments | |
} | |
return final_transcription | |
class AudioTranscription: | |
"""Performs transcription on a MP3 file""" | |
def __init__(self, audio_file) -> None: | |
self.file = audio_file | |
self.title = self.file.name | |
self.folder_name = f"./tests/{self.title}".replace(' ', '') | |
self.folder_name = self.folder_name[:self.folder_name.rindex('.')] | |
self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
self.model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8") | |
openai.api_key = os.environ.get("OPENAI_API_KEY") | |
def get_redacted_name(self): | |
return self.folder_name | |
def transcribe(self) -> dict: | |
"""Returns the transcription of the MP3 audio as a string""" | |
start_time = time.time() | |
if not os.path.exists(self.folder_name): | |
os.mkdir(self.folder_name) | |
if self.title.endswith('wav'): | |
audio = pydub.AudioSegment.from_wav(self.file) | |
file_type = 'wav' | |
elif self.title.endswith('mp3'): | |
audio = pydub.AudioSegment.from_mp3(self.file) | |
file_type = 'mp3' | |
save_path = Path(self.folder_name) / self.file.name | |
audio.export(save_path, format=file_type) | |
final_wav_path = save_path | |
if file_type == 'mp3': | |
sound = AudioSegment.from_mp3(save_path) | |
final_wav_path = self.folder_name + "/" + self.title[:-4]+'.wav' | |
sound.export(final_wav_path, format="wav") | |
chunk_segments, info = self.model.transcribe(final_wav_path, beam_size=5) | |
text_transcriptions = "" | |
for chunk_segment in chunk_segments: | |
text_transcriptions += chunk_segment.text.replace("$", "\$") | |
# Tokenize each sentence of the transcription. | |
sentences = tokenize.sent_tokenize(text_transcriptions) | |
segments = [] | |
for i, sentence in enumerate(sentences): | |
segment = { | |
"id":i, | |
"text":sentence, | |
"tokens":self.encoding.encode(sentence) | |
} | |
segments.append(segment) | |
final_transcription = { | |
"title": self.title, | |
"text": text_transcriptions, | |
"segments": segments | |
} | |
end_time = time.time() | |
print(f"transcription took {end_time - start_time} seconds") | |
return final_transcription | |
def convert_pdf_to_txt_pages(path): | |
texts = [] | |
rsrcmgr = PDFResourceManager() | |
retstr = StringIO() | |
laparams = LAParams() | |
device = TextConverter(rsrcmgr, retstr, laparams=laparams) | |
interpreter = PDFPageInterpreter(rsrcmgr, device) | |
size = 0 | |
c = 0 | |
file_pages = PDFPage.get_pages(path) | |
nbPages = len(list(file_pages)) | |
for page in PDFPage.get_pages(path): | |
interpreter.process_page(page) | |
t = retstr.getvalue() | |
if c == 0: | |
texts.append(t) | |
else: | |
texts.append(t[size:]) | |
c = c + 1 | |
size = len(t) | |
device.close() | |
retstr.close() | |
return texts, nbPages | |
class PDFTranscription: | |
def __init__(self, pdf_file): | |
self.file = pdf_file | |
self.title = pdf_file.name | |
self.folder_name = f"./tests/{self.title}".replace(' ', '') | |
self.folder_name = self.folder_name[:self.folder_name.rindex('.')] | |
self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
def get_redacted_name(self): | |
return self.folder_name | |
def transcribe(self): | |
text, nbpages = convert_pdf_to_txt_pages(self.file) | |
pdf_transcription = ''.join(text) | |
sentences = tokenize.sent_tokenize(pdf_transcription) | |
segments = [] | |
for i, sentence in enumerate(sentences): | |
segment = { | |
"id":i, | |
"text":sentence, | |
"tokens":self.encoding.encode(sentence) | |
} | |
segments.append(segment) | |
final_transcription = { | |
"title":self.title, | |
"text":pdf_transcription, | |
"segments":segments | |
} | |
return final_transcription | |