DocChat_n_Talk / app.py
capradeepgujaran's picture
Update app.py
4e6b972 verified
raw
history blame
10.4 kB
# app.py
import os
import cv2
import numpy as np
from PIL import Image
import pytesseract
import gradio as gr
from pdf2image import convert_from_path
import PyPDF2
from llama_index.core import VectorStoreIndex, Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.core import get_response_synthesizer
from sentence_transformers import SentenceTransformer, util
import logging
from openai_tts_tool import generate_audio_and_text
import tempfile
# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s')
# Initialize global variables
vector_index = None
query_log = []
sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
# Define available languages for TTS
AVAILABLE_LANGUAGES = [
"English", "Arabic", "German", "Marathi", "Kannada",
"Filipino (Tagalog)", "French", "Gujarati", "Hindi",
"Malayalam", "Tamil", "Telugu", "Urdu", "Sinhala"
]
LANGUAGE_CODES = {
"English": "en", "Arabic": "ar", "German": "de",
"Marathi": "mr", "Kannada": "kn", "Filipino (Tagalog)": "tl",
"French": "fr", "Gujarati": "gu", "Hindi": "hi",
"Malayalam": "ml", "Tamil": "ta", "Telugu": "te",
"Urdu": "ur", "Sinhala": "si"
}
# Get available languages for OCR
try:
langs = os.popen('tesseract --list-langs').read().split('\n')[1:-1]
except:
langs = ['eng'] # Fallback to English if tesseract isn't properly configured
def create_temp_dir():
"""Create temporary directory if it doesn't exist"""
temp_dir = os.path.join(os.getcwd(), 'temp')
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return temp_dir
def preprocess_image(image_path):
"""Preprocess the image for better OCR results"""
img = cv2.imread(image_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
gray = cv2.GaussianBlur(gray, (5, 5), 0)
processed_image = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
temp_dir = create_temp_dir()
temp_filename = os.path.join(temp_dir, "processed_image.png")
cv2.imwrite(temp_filename, processed_image)
return temp_filename
def extract_text_from_image(image_path, lang='eng'):
"""Extract text from image using OCR"""
processed_image_path = preprocess_image(image_path)
text = pytesseract.image_to_string(Image.open(processed_image_path), lang=lang)
try:
os.remove(processed_image_path)
except:
pass
return text
def extract_text_from_pdf(pdf_path, lang='eng'):
"""Extract text from PDF file"""
text = ""
temp_dir = create_temp_dir()
try:
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
page_text = page.extract_text()
if page_text and page_text.strip():
text += page_text
else:
images = convert_from_path(pdf_path, first_page=page_num + 1, last_page=page_num + 1)
for image in images:
temp_image_path = os.path.join(temp_dir, f'temp_image_{page_num}.png')
image.save(temp_image_path, 'PNG')
text += extract_text_from_image(temp_image_path, lang=lang)
text += f"\n[OCR applied on page {page_num + 1}]\n"
try:
os.remove(temp_image_path)
except:
pass
except Exception as e:
return f"Error processing PDF: {str(e)}"
return text
def extract_text(file_path, lang='eng'):
"""Extract text from uploaded file"""
file_ext = file_path.lower().split('.')[-1]
if file_ext in ['pdf']:
return extract_text_from_pdf(file_path, lang)
elif file_ext in ['png', 'jpg', 'jpeg']:
return extract_text_from_image(file_path, lang)
else:
return f"Unsupported file type: {file_ext}"
def process_upload(api_key, files, lang):
"""Process uploaded files and create vector index"""
global vector_index
if not api_key:
return "Please provide a valid OpenAI API Key."
if not files:
return "No files uploaded."
documents = []
error_messages = []
image_heavy_docs = []
for file_path in files:
try:
text = extract_text(file_path, lang)
if text.strip(): # Only add non-empty documents
documents.append(Document(text=text))
else:
error_messages.append(f"No text extracted from {os.path.basename(file_path)}")
except Exception as e:
error_message = f"Error processing file {os.path.basename(file_path)}: {str(e)}"
logging.error(error_message)
error_messages.append(error_message)
if documents:
try:
embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key)
vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model)
success_message = f"Successfully indexed {len(documents)} files."
if error_messages:
success_message += f"\nErrors: {'; '.join(error_messages)}"
return success_message
except Exception as e:
return f"Error creating index: {str(e)}"
else:
return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}"
def query_app(query, model_name, use_similarity_check, api_key):
"""Process query and return response"""
global vector_index, query_log
if vector_index is None:
return "No documents indexed yet. Please upload documents first."
if not api_key:
return "Please provide a valid OpenAI API Key."
try:
llm = OpenAI(model=model_name, api_key=api_key)
response_synthesizer = get_response_synthesizer(llm=llm)
query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer)
response = query_engine.query(query)
return response.response
except Exception as e:
logging.error(f"Error during query processing: {e}")
return f"Error during query processing: {str(e)}"
def create_gradio_interface():
"""Create and configure the Gradio interface"""
with gr.Blocks(title="Document Processing and TTS App") as demo:
gr.Markdown("# πŸ“„ Document Processing, Text & Audio Generation App")
with gr.Tab("πŸ“€ Upload Documents"):
api_key_input = gr.Textbox(
label="Enter OpenAI API Key",
placeholder="Paste your OpenAI API Key here",
type="password"
)
file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath")
lang_dropdown = gr.Dropdown(choices=langs, label="Select OCR Language", value='eng')
upload_button = gr.Button("Upload and Index")
upload_status = gr.Textbox(label="Status", interactive=False)
with gr.Tab("❓ Ask a Question"):
query_input = gr.Textbox(label="Enter your question")
model_dropdown = gr.Dropdown(
choices=["gpt-4-0125-preview", "gpt-3.5-turbo-0125"],
label="Select Model",
value="gpt-3.5-turbo-0125"
)
similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False)
query_button = gr.Button("Ask")
answer_output = gr.Textbox(label="Answer", interactive=False)
with gr.Tab("πŸ—£οΈ Generate Audio and Text"):
text_input = gr.Textbox(label="Enter text for generation")
voice_type = gr.Dropdown(
choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"],
label="Voice Type",
value="alloy"
)
voice_speed = gr.Slider(
minimum=0.25,
maximum=4.0,
value=1.0,
label="Voice Speed"
)
language = gr.Dropdown(
choices=AVAILABLE_LANGUAGES,
label="Language for Audio and Script",
value="English"
)
output_option = gr.Radio(
choices=["audio", "script_text", "both"],
label="Output Option",
value="both"
)
generate_button = gr.Button("Generate")
audio_output = gr.Audio(label="Generated Audio")
script_output = gr.File(label="Script Text File")
status_output = gr.Textbox(label="Status", interactive=False)
# Wire up the components
upload_button.click(
fn=process_upload,
inputs=[api_key_input, file_upload, lang_dropdown],
outputs=[upload_status]
)
query_button.click(
fn=query_app,
inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input],
outputs=[answer_output]
)
answer_output.change(
fn=lambda ans: ans,
inputs=[answer_output],
outputs=[text_input]
)
def process_generation(api_key, input_text, model_name, voice_type, voice_speed, language, output_option):
"""Wrapper function to process generation with updated parameters"""
# Convert language name to code
language_code = LANGUAGE_CODES.get(language, "en") # Default to English if not found
return generate_audio_and_text(api_key, input_text, model_name, voice_type, voice_speed, language_code, output_option)
generate_button.click(
fn=process_generation,
inputs=[
api_key_input, text_input, "gpt-4o-mini", voice_type,
voice_speed, language, output_option
],
outputs=[audio_output, script_output, status_output]
)
return demo
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch()