|
import faiss |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
import fitz |
|
from docx import Document |
|
from pptx import Presentation |
|
import gradio as gr |
|
|
|
|
|
retrieve = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
|
|
documents = [] |
|
doc_embeddings = [] |
|
index = None |
|
|
|
|
|
def process_pdf(file_path): |
|
try: |
|
doc = fitz.open(file_path) |
|
text = "" |
|
for page_num in range(doc.page_count): |
|
text += doc[page_num].get_text() |
|
return text |
|
except Exception as e: |
|
return f"Error reading PDF: {e}" |
|
|
|
|
|
def process_docx(file_path): |
|
try: |
|
doc = Document(file_path) |
|
text = "\n".join([para.text for para in doc.paragraphs]) |
|
return text |
|
except Exception as e: |
|
return f"Error reading DOCX: {e}" |
|
|
|
|
|
def process_pptx(file_path): |
|
try: |
|
presentation = Presentation(file_path) |
|
text = "" |
|
for slide in presentation.slides: |
|
for shape in slide.shapes: |
|
if hasattr(shape, "text"): |
|
text += shape.text + "\n" |
|
return text |
|
except Exception as e: |
|
return f"Error reading PPTX: {e}" |
|
|
|
|
|
def add_to_index(text): |
|
global index, doc_embeddings, documents |
|
if text.strip(): |
|
embedding = retrieve.encode([text])[0] |
|
doc_embeddings.append(embedding) |
|
documents.append(text) |
|
|
|
embeddings_matrix = np.array(doc_embeddings) |
|
index = faiss.IndexFlatL2(embeddings_matrix.shape[1]) |
|
index.add(embeddings_matrix) |
|
|
|
|
|
def load_document(file_path): |
|
if file_path.endswith('.pdf'): |
|
text = process_pdf(file_path) |
|
elif file_path.endswith('.docx'): |
|
text = process_docx(file_path) |
|
elif file_path.endswith('.pptx'): |
|
text = process_pptx(file_path) |
|
else: |
|
return "Unsupported file format" |
|
|
|
if isinstance(text, str) and "Error" not in text: |
|
add_to_index(text) |
|
return "Document loaded and indexed successfully." |
|
return text |
|
|
|
|
|
def retrieve_docs(query, k=2): |
|
if not index: |
|
return ["Index not initialized. Please upload and process a document first."] |
|
query_embedding = retrieve.encode([query]) |
|
distances, indices = index.search(np.array(query_embedding), k) |
|
results = [documents[i] for i in indices[0]] |
|
return results |
|
|
|
|
|
def generate_response(retrieved_docs): |
|
if retrieved_docs: |
|
context = " ".join(retrieved_docs) |
|
response = f"Generated response based on retrieved docs:\n\n{context[:500]}..." |
|
return response |
|
return "No relevant documents found to generate a response." |
|
|
|
|
|
def rag_application(query, file): |
|
|
|
if file: |
|
load_result = load_document(file.name) |
|
if "Error" in load_result: |
|
return load_result, "" |
|
|
|
|
|
retrieved_docs = retrieve_docs(query) |
|
docs_output = "\n".join([f"- {doc[:200]}..." for doc in retrieved_docs]) |
|
|
|
|
|
response = generate_response(retrieved_docs) |
|
return docs_output, response |
|
|
|
|
|
iface = gr.Interface( |
|
fn=rag_application, |
|
inputs=[ |
|
"text", |
|
"file" |
|
], |
|
outputs=[ |
|
"text", |
|
"text" |
|
], |
|
title="RAG Application with Single File Upload", |
|
description="Upload a PDF, DOCX, or PPTX file and ask questions. The RAG application retrieves relevant documents and generates a response." |
|
) |
|
|
|
iface.launch() |
|
|