import os import time from typing import Union, LiteralString from urllib.parse import urlparse, parse_qs import textwrap import streamlit as st from openai import OpenAI from pydub import AudioSegment from youtube_transcript_api import YouTubeTranscriptApi from deep_translator import GoogleTranslator import yt_dlp as youtube_dl client = OpenAI( api_key='sk-proj-BDwULLFZEeddHq3404VXT3BlbkFJuLyrqAyIt6KWAcZ9N6JB' ) def find_audio_files(path, extension=".mp3"): audio_files = [] for root, dirs, files in os.walk(path): for f in files: if f.endswith(extension): audio_files.append(os.path.join(root, f)) return audio_files def youtube_to_mp3(youtube_url: str, output_dir: str) -> Union[LiteralString, str, bytes]: ydl_config = { "format": "bestaudio/best", "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ], "outtmpl": os.path.join(output_dir, "%(title)s.%(ext)s"), "verbose": True, } if not os.path.exists(output_dir): os.makedirs(output_dir) with youtube_dl.YoutubeDL(ydl_config) as ydl: ydl.download([youtube_url]) return find_audio_files(output_dir)[0] def chunk_audio(filename, segment_length: int, output_dir): """segment lenght is in seconds""" # print(f"Chunking audio to {segment_length} second segments...") if not os.path.isdir(output_dir): os.mkdir(output_dir) # Load audio file audio = AudioSegment.from_mp3(filename) # Calculate duration in milliseconds duration = len(audio) # Calculate number of segments num_segments = duration // (segment_length * 1000) + 1 print(f"Chunking {num_segments} chunks...") # Iterate through segments and save them for i in range(num_segments): start = i * segment_length * 1000 end = min((i + 1) * segment_length * 1000, duration) segment = audio[start:end] segment.export(os.path.join(output_dir, f"segment_{i}.mp3"), format="mp3") chunked_audio_files = find_audio_files(output_dir) return sorted(chunked_audio_files) def translate_text(text, dest, source='auto'): wrapped_text = textwrap.wrap(text, 3500) tran_text = "" for line in wrapped_text: translation = GoogleTranslator(source=source, target=dest).translate(line) tran_text += translation + " " return tran_text def transcribe_audio(audio_files: list, model_name="whisper-1"): transcripts = "" for audio_file in audio_files: audio = open(audio_file, "rb") response = client.audio.transcriptions.create(model=model_name, file=audio, response_format="text") transcripts += response + " " return transcripts def get_video_id(youtube_url): """Extract video ID from YouTube URL.""" parsed_url = urlparse(youtube_url) video_id = parse_qs(parsed_url.query).get("v") return video_id[0] if video_id else None def get_transcript(video_id, language): tran = [] transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language]) print(transcript) if language == 'vi': tran += [t['text'] for t in transcript if t['text'] != '[âm nhạc]'] if language == 'en': tran += [t['text'] for t in transcript if t['text'] != '[music]'] return ' '.join(tran) def chunk_text(text, chunk_size=200, overlap_size=20): encoder = RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, chunk_overlap=overlap_size) return encoder.split_text(text=text) def tran_sum(sum_en, language): dest = 'en' if language == 'en': dest = 'vi' if language == 'vi': dest = 'en' summ_tran = translate_text(sum_en[0], dest, language) script_tran = translate_text(sum_en[1], dest, language) return summ_tran, script_tran def summarize_youtube_video(youtube_url, outputs_dir, language): # Tạo đường dẫn đầy đủ cho thư mục đầu ra video_id = get_video_id(youtube_url) transcript = get_transcript(video_id, language) if not transcript: outputs_dir = f"{outputs_dir}\\{video_id}" raw_audio_dir = f"{outputs_dir}\\raw_audio\\" chunks_dir = f"{outputs_dir}\\chunks" segment_length = 10 * 60 # chunk to 10 minute segments if not os.path.exists(outputs_dir): os.makedirs(outputs_dir) audio_filename = youtube_to_mp3(youtube_url, output_dir=raw_audio_dir) chunked_audio_files = chunk_audio( audio_filename, segment_length=segment_length, output_dir=chunks_dir ) transcript = transcribe_audio(chunked_audio_files) summ = summary(transcript, language) tran = tran_sum(summ, language) return tuple(summ), tuple(tran) def main(): st.set_page_config(layout="wide") st.title("YouTube Video Summarizer 🎥") st.markdown('', unsafe_allow_html=True) st.subheader('Built with the GPT2, Streamlit and ❤️') st.markdown('', unsafe_allow_html=True) # Expander for app details with st.expander("About the App"): st.write("This app allows you to summarize while watching a YouTube video.") st.write( "Enter a YouTube URL in the input box below and click 'Submit' to start. This app is built by AI Anytime.") # Input box for YouTube URL youtube_url = st.text_input("Enter YouTube URL") language_dict = {"English": "en", "Vietnamese": "vi"} # Language selection radio buttons selected_language = st.radio("Select Language:", ("English", "Vietnamese")) language_code = language_dict[selected_language] # Submit button if st.button("Submit") and youtube_url: start_time = time.time() # Start the timer # Download video summ, tran = summarize_youtube_video(youtube_url, "outputs", language_code) sum = summ[0] script = summ[1] sum_tran = tran[0] script_tran = tran[1] end_time = time.time() # End the timer elapsed_time = end_time - start_time # Centering the video and elapsed time st.markdown("""

Summarization of YouTube Video

Time taken: {elapsed_time:.2f} seconds

""".format(youtube_url=youtube_url.replace("watch?v=", "embed/"), elapsed_time=elapsed_time), unsafe_allow_html=True) col1, col2 = st.columns(2) with col1: st.subheader("Transcript") st.markdown( f'
{script}
', unsafe_allow_html=True) st.subheader("Summary") st.write(sum) with col2: st.subheader("Transcript Translate") st.markdown( f'
{script_tran}
', unsafe_allow_html=True) st.subheader("Summary Translate") st.write(sum_tran) from langchain.text_splitter import RecursiveCharacterTextSplitter def chunk_overlap_text(text, chunk_size=200, overlap_size=20): return RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, chunk_overlap=overlap_size).split_text(text=text) def summary(text, lang): output_dir = "distilbert/distilgpt2" if lang == 'vi': output_dir = "NlpHUST/gpt2-vietnamese" from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline model = AutoModelForCausalLM.from_pretrained(output_dir) tokenizer = AutoTokenizer.from_pretrained(output_dir) # Initialize the text generation pipeline with fine-tuned parameters pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, truncation=True, # Ensure text is truncated properly max_new_tokens=35, # Control the length of generated summary do_sample=True, # Use deterministic generation for consistency num_beams=5, # Use beam search for quality top_k=50, top_p=0.95, repetition_penalty=2.0, # Penalize repetition to reduce redundancy length_penalty=1.0, # Balance preference for length early_stopping=True # Stop early if the model is confident ) chunks = chunk_overlap_text(text) rs = "" print(len(chunks[0])) print(f"Number of chunks: {len(chunks)}") for t in chunks: prompt = t + " TL;DR " # Append the summary prompt print(f"Processing chunk: {prompt[:100]}...") # Print the beginning of the chunk for debugging sequences = pipe( prompt, num_return_sequences=1, return_full_text=False, pad_token_id=tokenizer.eos_token_id, clean_up_tokenization_spaces=True ) generated_summary = sequences[0]['generated_text'].strip().replace(prompt, "") rs += generated_summary + " " text = "" for t in chunks: text += t + " " return rs, text if __name__ == "__main__": main()