DocChat_n_Talk / openai_tts_tool.py
capradeepgujaran's picture
Create openai_tts_tool.py
78decde verified
raw
history blame
7.84 kB
import os
import openai
import PyPDF2
from deep_translator import GoogleTranslator
from dotenv import load_dotenv
import tiktoken
import pytesseract
import fitz # PyMuPDF for PDF processing
import docx # For processing DOCX files
from PIL import Image
# Load environment variables
load_dotenv()
# Initialize OpenAI client
openai_api_key = os.getenv("OPENAI_API_KEY")
client = openai.OpenAI(api_key=openai_api_key)
# Define model specifications
MODEL_SPECS = {
'gpt-4o': {
'max_context_tokens': 128000,
'max_output_tokens': 4096,
},
'gpt-4o-mini': {
'max_context_tokens': 128000,
'max_output_tokens': 16384,
},
'gpt-4': {
'max_context_tokens': 8192,
'max_output_tokens': 8192,
},
# Add other models as needed
}
# Set the path for Tesseract OCR (only needed on Windows)
pytesseract.pytesseract.tesseract_cmd = r'C:\\Program Files\\Tesseract-OCR\\tesseract.exe' # Adjust path accordingly
# Function to extract text from PDF, using OCR for scanned documents
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
text = ""
for page_num in range(doc.page_count):
page = doc[page_num]
page_text = page.get_text()
# If no text (i.e., scanned PDF), use OCR
if not page_text.strip():
pix = page.get_pixmap()
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
page_text = pytesseract.image_to_string(img)
text += page_text
return text
# Function to handle .docx files
def load_docx_file(docx_path):
doc = docx.Document(docx_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
# Function to handle .txt files
def load_txt_file(txt_path):
with open(txt_path, 'r', encoding='utf-8') as f:
return f.read()
# Function to handle file based on its extension
def load_file_based_on_extension(file_path):
if file_path.endswith('.pdf'):
return extract_text_from_pdf(file_path)
elif file_path.endswith('.docx'):
return load_docx_file(file_path)
elif file_path.endswith('.txt'):
return load_txt_file(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
# Function to process a folder and index all files within it
def process_folder(folder_path):
documents = []
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
try:
text = load_file_based_on_extension(file_path)
documents.append(text)
except ValueError as e:
print(f"Skipping unsupported file: {file_path} ({e})")
return ' '.join(documents) # Combine all documents text
# Function to count tokens
def count_tokens(text, model_name):
encoding = tiktoken.encoding_for_model(model_name)
num_tokens = len(encoding.encode(text))
return num_tokens
# Function to split text into chunks
def split_text_into_chunks(text, max_tokens, model_name):
encoding = tiktoken.encoding_for_model(model_name)
tokens = encoding.encode(text)
chunks = []
start = 0
text_length = len(tokens)
while start < text_length:
end = start + max_tokens
chunk_tokens = tokens[start:end]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
start = end
return chunks
# Modified summarize_text function
def summarize_text(text, length, model_name, additional_prompt):
model_specs = MODEL_SPECS.get(model_name)
if not model_specs:
raise ValueError(f"Model specifications not found for model {model_name}")
max_output_tokens = model_specs['max_output_tokens']
max_context_tokens = model_specs['max_context_tokens']
if length > max_output_tokens:
length = max_output_tokens
input_token_count = count_tokens(text, model_name)
buffer_tokens = 500
if input_token_count + buffer_tokens + length > max_context_tokens:
max_chunk_tokens = max_context_tokens - buffer_tokens - length
chunks = split_text_into_chunks(text, max_chunk_tokens, model_name)
summaries = [summarize_text(chunk, length, model_name, additional_prompt) for chunk in chunks]
combined_summary = ' '.join(summaries)
final_summary = summarize_text(combined_summary, length, model_name, additional_prompt)
return final_summary
else:
prompt = (
f"Please provide a clear and concise summary of the following text in approximately {length} words. "
"Ensure that the summary does not include any special characters, symbols, or markdown formatting. "
"Use plain language and proper punctuation."
)
if additional_prompt:
prompt += f"\n\nAdditional instructions: {additional_prompt}"
prompt += f"\n\nText to summarize:\n{text}"
# Use the chat completion as per your snippet
completion = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": prompt}
],
max_tokens=length
)
return completion.choices[0].message.content.strip()
# Function to calculate summary length based on desired audio duration
def calculate_summary_length_by_duration(duration_minutes, voice_speed):
words_per_minute = 150 if voice_speed == 'normal' else 120
summary_length = int(duration_minutes * words_per_minute)
return summary_length
# Function to translate the summarized text using deep-translator
def translate_text(text, target_language):
translated = GoogleTranslator(source='auto', target=target_language).translate(text)
return translated
# Function to estimate audio duration
def estimate_audio_duration(text, voice_speed):
word_count = len(text.split())
words_per_minute = 150 if voice_speed == 'normal' else 120
duration_minutes = word_count / words_per_minute
duration_seconds = duration_minutes * 60
return duration_seconds
# Function to convert text to audio using OpenAI TTS-1
def text_to_speech_openai(text, audio_path, voice, speed):
response = client.audio.speech.create(
model="tts-1-hd",
voice=voice,
input=text
)
response.stream_to_file(audio_path)
def process_input(pdf_path=None, input_text=None, summary_length=None, voice=None, language=None, voice_speed=None, model_name=None, additional_prompt=None, generate_audio=True, folder_path=None):
if folder_path:
extracted_text = process_folder(folder_path)
elif pdf_path:
extracted_text = load_file_based_on_extension(pdf_path)
elif input_text:
extracted_text = input_text
else:
raise ValueError("No input provided for processing.")
summary_text = summarize_text(extracted_text, summary_length, model_name, additional_prompt)
translated_summary = translate_text(summary_text, language)
estimated_audio_duration = estimate_audio_duration(translated_summary, voice_speed)
base_filename = os.path.splitext(os.path.basename(pdf_path or 'document'))[0]
audio_file_path = os.path.join('uploads', f"{base_filename}_audio_{language}.mp3")
summary_file_path = os.path.join('uploads', f"{base_filename}_summary_{language}.txt")
with open(summary_file_path, "w", encoding="utf-8") as summary_file:
summary_file.write(translated_summary)
if generate_audio:
text_to_speech_openai(translated_summary, audio_file_path, voice, voice_speed)
return translated_summary, audio_file_path if generate_audio else None, summary_file_path, estimated_audio_duration