Spaces:
Running
Running
import os | |
import openai | |
import PyPDF2 | |
from deep_translator import GoogleTranslator | |
from dotenv import load_dotenv | |
import tiktoken | |
import pytesseract | |
import fitz # PyMuPDF for PDF processing | |
import docx # For processing DOCX files | |
from PIL import Image | |
# Load environment variables | |
load_dotenv() | |
# Initialize OpenAI client | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
client = openai.OpenAI(api_key=openai_api_key) | |
# Define model specifications | |
MODEL_SPECS = { | |
'gpt-4o': { | |
'max_context_tokens': 128000, | |
'max_output_tokens': 4096, | |
}, | |
'gpt-4o-mini': { | |
'max_context_tokens': 128000, | |
'max_output_tokens': 16384, | |
}, | |
'gpt-4': { | |
'max_context_tokens': 8192, | |
'max_output_tokens': 8192, | |
}, | |
# Add other models as needed | |
} | |
# Set the path for Tesseract OCR (only needed on Windows) | |
pytesseract.pytesseract.tesseract_cmd = r'C:\\Program Files\\Tesseract-OCR\\tesseract.exe' # Adjust path accordingly | |
# Function to extract text from PDF, using OCR for scanned documents | |
def extract_text_from_pdf(pdf_path): | |
doc = fitz.open(pdf_path) | |
text = "" | |
for page_num in range(doc.page_count): | |
page = doc[page_num] | |
page_text = page.get_text() | |
# If no text (i.e., scanned PDF), use OCR | |
if not page_text.strip(): | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
page_text = pytesseract.image_to_string(img) | |
text += page_text | |
return text | |
# Function to handle .docx files | |
def load_docx_file(docx_path): | |
doc = docx.Document(docx_path) | |
full_text = [] | |
for para in doc.paragraphs: | |
full_text.append(para.text) | |
return '\n'.join(full_text) | |
# Function to handle .txt files | |
def load_txt_file(txt_path): | |
with open(txt_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
# Function to handle file based on its extension | |
def load_file_based_on_extension(file_path): | |
if file_path.endswith('.pdf'): | |
return extract_text_from_pdf(file_path) | |
elif file_path.endswith('.docx'): | |
return load_docx_file(file_path) | |
elif file_path.endswith('.txt'): | |
return load_txt_file(file_path) | |
else: | |
raise ValueError(f"Unsupported file format: {file_path}") | |
# Function to process a folder and index all files within it | |
def process_folder(folder_path): | |
documents = [] | |
for filename in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, filename) | |
if os.path.isfile(file_path): | |
try: | |
text = load_file_based_on_extension(file_path) | |
documents.append(text) | |
except ValueError as e: | |
print(f"Skipping unsupported file: {file_path} ({e})") | |
return ' '.join(documents) # Combine all documents text | |
# Function to count tokens | |
def count_tokens(text, model_name): | |
encoding = tiktoken.encoding_for_model(model_name) | |
num_tokens = len(encoding.encode(text)) | |
return num_tokens | |
# Function to split text into chunks | |
def split_text_into_chunks(text, max_tokens, model_name): | |
encoding = tiktoken.encoding_for_model(model_name) | |
tokens = encoding.encode(text) | |
chunks = [] | |
start = 0 | |
text_length = len(tokens) | |
while start < text_length: | |
end = start + max_tokens | |
chunk_tokens = tokens[start:end] | |
chunk_text = encoding.decode(chunk_tokens) | |
chunks.append(chunk_text) | |
start = end | |
return chunks | |
# Modified summarize_text function | |
def summarize_text(text, length, model_name, additional_prompt): | |
model_specs = MODEL_SPECS.get(model_name) | |
if not model_specs: | |
raise ValueError(f"Model specifications not found for model {model_name}") | |
max_output_tokens = model_specs['max_output_tokens'] | |
max_context_tokens = model_specs['max_context_tokens'] | |
if length > max_output_tokens: | |
length = max_output_tokens | |
input_token_count = count_tokens(text, model_name) | |
buffer_tokens = 500 | |
if input_token_count + buffer_tokens + length > max_context_tokens: | |
max_chunk_tokens = max_context_tokens - buffer_tokens - length | |
chunks = split_text_into_chunks(text, max_chunk_tokens, model_name) | |
summaries = [summarize_text(chunk, length, model_name, additional_prompt) for chunk in chunks] | |
combined_summary = ' '.join(summaries) | |
final_summary = summarize_text(combined_summary, length, model_name, additional_prompt) | |
return final_summary | |
else: | |
prompt = ( | |
f"Please provide a clear and concise summary of the following text in approximately {length} words. " | |
"Ensure that the summary does not include any special characters, symbols, or markdown formatting. " | |
"Use plain language and proper punctuation." | |
) | |
if additional_prompt: | |
prompt += f"\n\nAdditional instructions: {additional_prompt}" | |
prompt += f"\n\nText to summarize:\n{text}" | |
# Use the chat completion as per your snippet | |
completion = client.chat.completions.create( | |
model=model_name, | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant"}, | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=length | |
) | |
return completion.choices[0].message.content.strip() | |
# Function to calculate summary length based on desired audio duration | |
def calculate_summary_length_by_duration(duration_minutes, voice_speed): | |
words_per_minute = 150 if voice_speed == 'normal' else 120 | |
summary_length = int(duration_minutes * words_per_minute) | |
return summary_length | |
# Function to translate the summarized text using deep-translator | |
def translate_text(text, target_language): | |
translated = GoogleTranslator(source='auto', target=target_language).translate(text) | |
return translated | |
# Function to estimate audio duration | |
def estimate_audio_duration(text, voice_speed): | |
word_count = len(text.split()) | |
words_per_minute = 150 if voice_speed == 'normal' else 120 | |
duration_minutes = word_count / words_per_minute | |
duration_seconds = duration_minutes * 60 | |
return duration_seconds | |
# Function to convert text to audio using OpenAI TTS-1 | |
def text_to_speech_openai(text, audio_path, voice, speed): | |
response = client.audio.speech.create( | |
model="tts-1-hd", | |
voice=voice, | |
input=text | |
) | |
response.stream_to_file(audio_path) | |
def process_input(pdf_path=None, input_text=None, summary_length=None, voice=None, language=None, voice_speed=None, model_name=None, additional_prompt=None, generate_audio=True, folder_path=None): | |
if folder_path: | |
extracted_text = process_folder(folder_path) | |
elif pdf_path: | |
extracted_text = load_file_based_on_extension(pdf_path) | |
elif input_text: | |
extracted_text = input_text | |
else: | |
raise ValueError("No input provided for processing.") | |
summary_text = summarize_text(extracted_text, summary_length, model_name, additional_prompt) | |
translated_summary = translate_text(summary_text, language) | |
estimated_audio_duration = estimate_audio_duration(translated_summary, voice_speed) | |
base_filename = os.path.splitext(os.path.basename(pdf_path or 'document'))[0] | |
audio_file_path = os.path.join('uploads', f"{base_filename}_audio_{language}.mp3") | |
summary_file_path = os.path.join('uploads', f"{base_filename}_summary_{language}.txt") | |
with open(summary_file_path, "w", encoding="utf-8") as summary_file: | |
summary_file.write(translated_summary) | |
if generate_audio: | |
text_to_speech_openai(translated_summary, audio_file_path, voice, voice_speed) | |
return translated_summary, audio_file_path if generate_audio else None, summary_file_path, estimated_audio_duration |