import os import openai import PyPDF2 from deep_translator import GoogleTranslator from dotenv import load_dotenv import tiktoken import pytesseract import fitz # PyMuPDF for PDF processing import docx # For processing DOCX files from PIL import Image # Load environment variables load_dotenv() # Initialize OpenAI client openai_api_key = os.getenv("OPENAI_API_KEY") client = openai.OpenAI(api_key=openai_api_key) # Define model specifications MODEL_SPECS = { 'gpt-4o': { 'max_context_tokens': 128000, 'max_output_tokens': 4096, }, 'gpt-4o-mini': { 'max_context_tokens': 128000, 'max_output_tokens': 16384, }, 'gpt-4': { 'max_context_tokens': 8192, 'max_output_tokens': 8192, }, # Add other models as needed } # Set the path for Tesseract OCR (only needed on Windows) pytesseract.pytesseract.tesseract_cmd = r'C:\\Program Files\\Tesseract-OCR\\tesseract.exe' # Adjust path accordingly # Function to extract text from PDF, using OCR for scanned documents def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) text = "" for page_num in range(doc.page_count): page = doc[page_num] page_text = page.get_text() # If no text (i.e., scanned PDF), use OCR if not page_text.strip(): pix = page.get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) page_text = pytesseract.image_to_string(img) text += page_text return text # Function to handle .docx files def load_docx_file(docx_path): doc = docx.Document(docx_path) full_text = [] for para in doc.paragraphs: full_text.append(para.text) return '\n'.join(full_text) # Function to handle .txt files def load_txt_file(txt_path): with open(txt_path, 'r', encoding='utf-8') as f: return f.read() # Function to handle file based on its extension def load_file_based_on_extension(file_path): if file_path.endswith('.pdf'): return extract_text_from_pdf(file_path) elif file_path.endswith('.docx'): return load_docx_file(file_path) elif file_path.endswith('.txt'): return load_txt_file(file_path) else: raise ValueError(f"Unsupported file format: {file_path}") # Function to process a folder and index all files within it def process_folder(folder_path): documents = [] for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) if os.path.isfile(file_path): try: text = load_file_based_on_extension(file_path) documents.append(text) except ValueError as e: print(f"Skipping unsupported file: {file_path} ({e})") return ' '.join(documents) # Combine all documents text # Function to count tokens def count_tokens(text, model_name): encoding = tiktoken.encoding_for_model(model_name) num_tokens = len(encoding.encode(text)) return num_tokens # Function to split text into chunks def split_text_into_chunks(text, max_tokens, model_name): encoding = tiktoken.encoding_for_model(model_name) tokens = encoding.encode(text) chunks = [] start = 0 text_length = len(tokens) while start < text_length: end = start + max_tokens chunk_tokens = tokens[start:end] chunk_text = encoding.decode(chunk_tokens) chunks.append(chunk_text) start = end return chunks # Modified summarize_text function def summarize_text(text, length, model_name, additional_prompt): model_specs = MODEL_SPECS.get(model_name) if not model_specs: raise ValueError(f"Model specifications not found for model {model_name}") max_output_tokens = model_specs['max_output_tokens'] max_context_tokens = model_specs['max_context_tokens'] if length > max_output_tokens: length = max_output_tokens input_token_count = count_tokens(text, model_name) buffer_tokens = 500 if input_token_count + buffer_tokens + length > max_context_tokens: max_chunk_tokens = max_context_tokens - buffer_tokens - length chunks = split_text_into_chunks(text, max_chunk_tokens, model_name) summaries = [summarize_text(chunk, length, model_name, additional_prompt) for chunk in chunks] combined_summary = ' '.join(summaries) final_summary = summarize_text(combined_summary, length, model_name, additional_prompt) return final_summary else: prompt = ( f"Please provide a clear and concise summary of the following text in approximately {length} words. " "Ensure that the summary does not include any special characters, symbols, or markdown formatting. " "Use plain language and proper punctuation." ) if additional_prompt: prompt += f"\n\nAdditional instructions: {additional_prompt}" prompt += f"\n\nText to summarize:\n{text}" # Use the chat completion as per your snippet completion = client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": prompt} ], max_tokens=length ) return completion.choices[0].message.content.strip() # Function to calculate summary length based on desired audio duration def calculate_summary_length_by_duration(duration_minutes, voice_speed): words_per_minute = 150 if voice_speed == 'normal' else 120 summary_length = int(duration_minutes * words_per_minute) return summary_length # Function to translate the summarized text using deep-translator def translate_text(text, target_language): translated = GoogleTranslator(source='auto', target=target_language).translate(text) return translated # Function to estimate audio duration def estimate_audio_duration(text, voice_speed): word_count = len(text.split()) words_per_minute = 150 if voice_speed == 'normal' else 120 duration_minutes = word_count / words_per_minute duration_seconds = duration_minutes * 60 return duration_seconds # Function to convert text to audio using OpenAI TTS-1 def text_to_speech_openai(text, audio_path, voice, speed): response = client.audio.speech.create( model="tts-1-hd", voice=voice, input=text ) response.stream_to_file(audio_path) def process_input(pdf_path=None, input_text=None, summary_length=None, voice=None, language=None, voice_speed=None, model_name=None, additional_prompt=None, generate_audio=True, folder_path=None): if folder_path: extracted_text = process_folder(folder_path) elif pdf_path: extracted_text = load_file_based_on_extension(pdf_path) elif input_text: extracted_text = input_text else: raise ValueError("No input provided for processing.") summary_text = summarize_text(extracted_text, summary_length, model_name, additional_prompt) translated_summary = translate_text(summary_text, language) estimated_audio_duration = estimate_audio_duration(translated_summary, voice_speed) base_filename = os.path.splitext(os.path.basename(pdf_path or 'document'))[0] audio_file_path = os.path.join('uploads', f"{base_filename}_audio_{language}.mp3") summary_file_path = os.path.join('uploads', f"{base_filename}_summary_{language}.txt") with open(summary_file_path, "w", encoding="utf-8") as summary_file: summary_file.write(translated_summary) if generate_audio: text_to_speech_openai(translated_summary, audio_file_path, voice, voice_speed) return translated_summary, audio_file_path if generate_audio else None, summary_file_path, estimated_audio_duration