Spaces:
Sleeping
Sleeping
import gradio as gr | |
import faiss | |
import numpy as np | |
import openai | |
from sentence_transformers import SentenceTransformer | |
from nltk.tokenize import sent_tokenize | |
import nltk | |
# Download the required NLTK data | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
# Paths to your files | |
faiss_path = "manual_chunked_faiss_index_500.bin" | |
manual_path = "ubuntu_manual.txt" | |
# Load the Ubuntu manual from a .txt file | |
try: | |
with open(manual_path, "r", encoding="utf-8") as file: | |
full_text = file.read() | |
except FileNotFoundError: | |
raise FileNotFoundError(f"The file {manual_path} was not found.") | |
# Function to chunk the text into smaller pieces | |
def chunk_text(text, chunk_size=500): | |
sentences = sent_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence.split()) <= chunk_size: | |
current_chunk.append(sentence) | |
else: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [sentence] | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
# Apply chunking to the entire text | |
manual_chunks = chunk_text(full_text, chunk_size=500) | |
# Load your FAISS index | |
try: | |
index = faiss.read_index(faiss_path) | |
except Exception as e: | |
raise RuntimeError(f"Failed to load FAISS index: {e}") | |
# Load the tokenizer and model for embeddings | |
from transformers import AutoTokenizer, AutoModel | |
tokenizer = AutoTokenizer.from_pretrained("microsoft/MiniLM-L12-H384-uncased") | |
model = AutoModel.from_pretrained("microsoft/MiniLM-L12-H384-uncased") | |
embedding_model = SentenceTransformer('microsoft/MiniLM-L12-H384-uncased') | |
# OpenAI API key | |
openai.api_key = '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' | |
# Function to create embeddings | |
def embed_text(text_list): | |
return np.array(embedding_model.encode(text_list), dtype=np.float32) | |
# Function to retrieve relevant chunks for a user query | |
def retrieve_chunks(query, k=5): | |
query_embedding = embed_text([query]) | |
try: | |
distances, indices = index.search(query_embedding, k=k) | |
print("Distances:", distances) | |
print("Indices:", indices) | |
except Exception as e: | |
raise RuntimeError(f"FAISS search failed: {e}") | |
if len(indices[0]) == 0: | |
return [], distances, indices | |
valid_indices = [i for i in indices[0] if i < len(manual_chunks)] | |
if not valid_indices: | |
return [], distances, indices | |
relevant_chunks = [manual_chunks[i] for i in valid_indices] | |
return relevant_chunks, distances, indices | |
# Function to truncate long inputs | |
def truncate_input(text, max_length=16385): | |
tokens = tokenizer.encode(text, truncation=True, max_length=max_length, return_tensors="pt") | |
return tokens | |
# Function to perform RAG: Retrieve chunks and generate a response | |
def rag_response(query, k=5, max_tokens=150): | |
try: | |
relevant_chunks, distances, indices = retrieve_chunks(query, k=k) | |
if not relevant_chunks: | |
return "Sorry, I couldn't find relevant information.", distances, indices | |
# Combine the query with retrieved chunks | |
augmented_input = query + "\n\n" + "\n\n".join(relevant_chunks) | |
# Truncate the input if it exceeds token limits | |
input_tokens = tokenizer.encode(augmented_input, return_tensors="pt") | |
if input_tokens.shape[1] > 16385: | |
# Truncate to fit within the model's maximum input length | |
augmented_input = tokenizer.decode(input_tokens[0, :16385]) | |
# Generate response using OpenAI API | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "user", "content": augmented_input} | |
], | |
max_tokens=max_tokens, | |
temperature=0.7 | |
) | |
generated_text = response.choices[0].message['content'].strip() | |
return generated_text, distances, indices | |
except Exception as e: | |
return f"An error occurred: {e}", [], [] | |
# Gradio Interface | |
# Gradio Interface | |
def format_output(response, distances, indices): | |
formatted_response = f"Response: {response}\n\nDistances: {distances}\n\nIndices: {indices}" | |
return formatted_response | |
iface = gr.Interface( | |
fn=rag_response, | |
inputs="text", | |
outputs="text", | |
title="RAG Chatbot with FAISS and GPT-3.5", | |
description="Ask me anything!", | |
live=True | |
) | |
if __name__ == "__main__": | |
iface.launch() | |