Spaces:
Sleeping
Sleeping
from langchain_community.document_loaders import ( | |
PyPDFLoader, | |
UnstructuredWordDocumentLoader, | |
YoutubeLoader | |
) | |
from langchain_community.document_loaders.generic import GenericLoader | |
from langchain_community.document_loaders.parsers.audio import OpenAIWhisperParser | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from transformers import pipeline | |
import re | |
from pytube import YouTube | |
import os | |
import tempfile | |
class ContentProcessor: | |
def __init__(self): | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200 | |
) | |
# Initialize the transcriber once during startup | |
self.transcriber = pipeline( | |
"automatic-speech-recognition", | |
model="openai/whisper-small", | |
device="cpu" # or "cuda" if GPU is available | |
) | |
def process_pdf(self, file_path): | |
loader = PyPDFLoader(file_path) | |
pages = loader.load_and_split(self.text_splitter) | |
return pages | |
def process_docx(self, file_path): | |
loader = UnstructuredWordDocumentLoader(file_path) | |
pages = loader.load_and_split(self.text_splitter) | |
return pages | |
def process_youtube(self, video_url): | |
video_id = self._extract_video_id(video_url) | |
if not video_id: | |
raise ValueError("Invalid YouTube URL") | |
try: | |
# First attempt: Try getting transcript via YouTube API | |
return self._get_transcript_via_api(video_id) | |
except Exception as e: | |
# Second attempt: Download audio and transcribe | |
try: | |
return self._transcribe_audio(video_url) | |
except Exception as audio_error: | |
raise Exception(f"Failed to process video. No subtitles available and audio transcription failed: {str(audio_error)}") | |
def _extract_video_id(self, url): | |
# Extract video ID from YouTube URL | |
patterns = [ | |
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', | |
r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})', | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return None | |
def _get_transcript_via_api(self, video_id): | |
transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
full_transcript = " ".join([entry['text'] for entry in transcript_list]) | |
from langchain.schema import Document | |
doc = Document( | |
page_content=full_transcript, | |
metadata={"source": f"https://www.youtube.com/watch?v={video_id}"} | |
) | |
return self.text_splitter.split_documents([doc]) | |
def _transcribe_audio(self, video_url): | |
# Download audio using pytube | |
yt = YouTube(video_url) | |
audio_stream = yt.streams.filter(only_audio=True).first() | |
# Create temporary directory for audio file | |
with tempfile.TemporaryDirectory() as temp_dir: | |
audio_file = os.path.join(temp_dir, "audio.mp4") | |
audio_stream.download(output_path=temp_dir, filename="audio.mp4") | |
# Use Hugging Face Whisper to transcribe | |
result = self.transcriber(audio_file) | |
full_transcript = result['text'] | |
# Create a document-like structure | |
from langchain.schema import Document | |
doc = Document( | |
page_content=full_transcript, | |
metadata={"source": video_url} | |
) | |
# Split the document | |
return self.text_splitter.split_documents([doc]) | |
def process_audio(self, audio_file): | |
loader = GenericLoader( | |
audio_file, | |
parser=OpenAIWhisperParser() | |
) | |
transcript = loader.load() | |
return self.text_splitter.split_documents(transcript) |