Spaces:
Running
Running
import numpy as np | |
from typing import List, Tuple, Dict | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sentence_transformers import SentenceTransformer | |
import math | |
from functools import lru_cache | |
from concurrent.futures import ThreadPoolExecutor | |
import openai | |
from transformers import T5ForConditionalGeneration, T5Tokenizer | |
import torch | |
import re | |
import psycopg2 | |
from psycopg2.extras import execute_values | |
import sqlite3 | |
import logging | |
######################################################################################################################################################################################################################################## | |
# | |
# RAG Chunking | |
# To fully integrate this chunking system, you'd need to: | |
# | |
# Create the UnvectorizedMediaChunks table in your SQLite database. | |
# Modify your document ingestion process to use chunk_and_store_unvectorized. | |
# Implement a background process that periodically calls vectorize_all_documents to process unvectorized chunks. | |
# This chunking is pretty weak and needs improvement | |
# See notes for improvements #FIXME | |
import json | |
from typing import List, Dict, Any | |
from datetime import datetime | |
def chunk_and_store_unvectorized( | |
db_connection, | |
media_id: int, | |
text: str, | |
chunk_size: int = 1000, | |
overlap: int = 100, | |
chunk_type: str = 'fixed-length' | |
) -> List[int]: | |
chunks = create_chunks(text, chunk_size, overlap) | |
return store_unvectorized_chunks(db_connection, media_id, chunks, chunk_type) | |
def create_chunks(text: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]: | |
words = text.split() | |
chunks = [] | |
for i in range(0, len(words), chunk_size - overlap): | |
chunk_text = ' '.join(words[i:i + chunk_size]) | |
start_char = text.index(words[i]) | |
end_char = start_char + len(chunk_text) | |
chunks.append({ | |
'text': chunk_text, | |
'start_char': start_char, | |
'end_char': end_char, | |
'index': len(chunks) | |
}) | |
return chunks | |
def store_unvectorized_chunks( | |
db_connection, | |
media_id: int, | |
chunks: List[Dict[str, Any]], | |
chunk_type: str | |
) -> List[int]: | |
cursor = db_connection.cursor() | |
chunk_ids = [] | |
for chunk in chunks: | |
cursor.execute(""" | |
INSERT INTO UnvectorizedMediaChunks | |
(media_id, chunk_text, chunk_index, start_char, end_char, chunk_type, metadata) | |
VALUES (?, ?, ?, ?, ?, ?, ?) | |
""", ( | |
media_id, | |
chunk['text'], | |
chunk['index'], | |
chunk['start_char'], | |
chunk['end_char'], | |
chunk_type, | |
json.dumps({'length': len(chunk['text'])}) # Example metadata | |
)) | |
chunk_ids.append(cursor.lastrowid) | |
db_connection.commit() | |
return chunk_ids | |
def get_unvectorized_chunks( | |
db_connection, | |
media_id: int, | |
limit: int = 100, | |
offset: int = 0 | |
) -> List[Dict[str, Any]]: | |
cursor = db_connection.cursor() | |
cursor.execute(""" | |
SELECT id, chunk_text, chunk_index, start_char, end_char, chunk_type, metadata | |
FROM UnvectorizedMediaChunks | |
WHERE media_id = ? AND is_processed = FALSE | |
ORDER BY chunk_index | |
LIMIT ? OFFSET ? | |
""", (media_id, limit, offset)) | |
return [ | |
{ | |
'id': row[0], | |
'text': row[1], | |
'index': row[2], | |
'start_char': row[3], | |
'end_char': row[4], | |
'type': row[5], | |
'metadata': json.loads(row[6]) | |
} | |
for row in cursor.fetchall() | |
] | |
def mark_chunks_as_processed(db_connection, chunk_ids: List[int]): | |
cursor = db_connection.cursor() | |
cursor.executemany(""" | |
UPDATE UnvectorizedMediaChunks | |
SET is_processed = TRUE, last_modified = ? | |
WHERE id = ? | |
""", [(datetime.now(), chunk_id) for chunk_id in chunk_ids]) | |
db_connection.commit() | |
# Usage example | |
def process_media_chunks(db_connection, media_id: int, text: str): | |
chunk_ids = chunk_and_store_unvectorized(db_connection, media_id, text) | |
print(f"Stored {len(chunk_ids)} unvectorized chunks for media_id {media_id}") | |
# Later, when you want to process these chunks: | |
unprocessed_chunks = get_unvectorized_chunks(db_connection, media_id) | |
# Process chunks (e.g., vectorize them) | |
# ... | |
# After processing, mark them as processed | |
mark_chunks_as_processed(db_connection, [chunk['id'] for chunk in unprocessed_chunks]) | |
########################################################################################################################################################################################################### | |
# | |
# RAG System | |
# To use this updated RAG system in your existing application: | |
# | |
# Install required packages: | |
# pip install sentence-transformers psycopg2-binary scikit-learn transformers torch | |
# Set up PostgreSQL with pgvector: | |
# | |
# Install PostgreSQL and the pgvector extension. | |
# Create a new database for vector storage. | |
# | |
# Update your main application to use the RAG system: | |
# | |
# Import the RAGSystem class from this new file. | |
# Initialize the RAG system with your SQLite and PostgreSQL configurations. | |
# Use the vectorize_all_documents method to initially vectorize your existing documents. | |
# | |
# | |
# Modify your existing PDF_Ingestion_Lib.py and Book_Ingestion_Lib.py: | |
# | |
# After successfully ingesting a document into SQLite, call the vectorization method from the RAG system. | |
# Example modification for ingest_text_file in Book_Ingestion_Lib.py: | |
# from RAG_Library import RAGSystem | |
# | |
# # Initialize RAG system (do this once in your main application) | |
# rag_system = RAGSystem(sqlite_path, pg_config) | |
# | |
# def ingest_text_file(file_path, title=None, author=None, keywords=None): | |
# try: | |
# # ... (existing code) | |
# | |
# # Add the text file to the database | |
# doc_id = add_media_with_keywords( | |
# url=file_path, | |
# title=title, | |
# media_type='document', | |
# content=content, | |
# keywords=keywords, | |
# prompt='No prompt for text files', | |
# summary='No summary for text files', | |
# transcription_model='None', | |
# author=author, | |
# ingestion_date=datetime.now().strftime('%Y-%m-%d') | |
# ) | |
# | |
# # Vectorize the newly added document | |
# rag_system.vectorize_document(doc_id, content) | |
# | |
# return f"Text file '{title}' by {author} ingested and vectorized successfully." | |
# except Exception as e: | |
# logging.error(f"Error ingesting text file: {str(e)}") | |
# return f"Error ingesting text file: {str(e)}" | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Constants | |
EMBEDDING_MODEL = 'all-MiniLM-L6-v2' | |
VECTOR_DIM = 384 # Dimension of the chosen embedding model | |
class RAGSystem: | |
def __init__(self, sqlite_path: str, pg_config: Dict[str, str], cache_size: int = 100): | |
self.sqlite_path = sqlite_path | |
self.pg_config = pg_config | |
self.model = SentenceTransformer(EMBEDDING_MODEL) | |
self.cache_size = cache_size | |
self._init_postgres() | |
def _init_postgres(self): | |
with psycopg2.connect(**self.pg_config) as conn: | |
with conn.cursor() as cur: | |
cur.execute(""" | |
CREATE TABLE IF NOT EXISTS document_vectors ( | |
id SERIAL PRIMARY KEY, | |
document_id INTEGER UNIQUE, | |
vector vector(384) | |
) | |
""") | |
conn.commit() | |
def _get_embedding(self, text: str) -> np.ndarray: | |
return self.model.encode([text])[0] | |
def vectorize_document(self, doc_id: int, content: str): | |
chunks = create_chunks(content, chunk_size=1000, overlap=100) | |
for chunk in chunks: | |
vector = self._get_embedding(chunk['text']) | |
with psycopg2.connect(**self.pg_config) as conn: | |
with conn.cursor() as cur: | |
cur.execute(""" | |
INSERT INTO document_vectors (document_id, chunk_index, vector, metadata) | |
VALUES (%s, %s, %s, %s) | |
ON CONFLICT (document_id, chunk_index) DO UPDATE SET vector = EXCLUDED.vector | |
""", (doc_id, chunk['index'], vector.tolist(), json.dumps(chunk))) | |
conn.commit() | |
def vectorize_all_documents(self): | |
with sqlite3.connect(self.sqlite_path) as sqlite_conn: | |
unprocessed_chunks = get_unvectorized_chunks(sqlite_conn, limit=1000) | |
for chunk in unprocessed_chunks: | |
self.vectorize_document(chunk['id'], chunk['text']) | |
mark_chunks_as_processed(sqlite_conn, [chunk['id'] for chunk in unprocessed_chunks]) | |
def semantic_search(self, query: str, top_k: int = 5) -> List[Tuple[int, int, float]]: | |
query_vector = self._get_embedding(query) | |
with psycopg2.connect(**self.pg_config) as conn: | |
with conn.cursor() as cur: | |
cur.execute(""" | |
SELECT document_id, chunk_index, 1 - (vector <-> %s) AS similarity | |
FROM document_vectors | |
ORDER BY vector <-> %s ASC | |
LIMIT %s | |
""", (query_vector.tolist(), query_vector.tolist(), top_k)) | |
results = cur.fetchall() | |
return results | |
def get_document_content(self, doc_id: int) -> str: | |
with sqlite3.connect(self.sqlite_path) as conn: | |
cur = conn.cursor() | |
cur.execute("SELECT content FROM media WHERE id = ?", (doc_id,)) | |
result = cur.fetchone() | |
return result[0] if result else "" | |
def bm25_search(self, query: str, top_k: int = 5) -> List[Tuple[int, float]]: | |
with sqlite3.connect(self.sqlite_path) as conn: | |
cur = conn.cursor() | |
cur.execute("SELECT id, content FROM media") | |
documents = cur.fetchall() | |
vectorizer = TfidfVectorizer(use_idf=True) | |
tfidf_matrix = vectorizer.fit_transform([doc[1] for doc in documents]) | |
query_vector = vectorizer.transform([query]) | |
doc_lengths = tfidf_matrix.sum(axis=1).A1 | |
avg_doc_length = np.mean(doc_lengths) | |
k1, b = 1.5, 0.75 | |
scores = [] | |
for i, doc_vector in enumerate(tfidf_matrix): | |
score = np.sum( | |
((k1 + 1) * query_vector.multiply(doc_vector)).A1 / | |
(k1 * (1 - b + b * doc_lengths[i] / avg_doc_length) + query_vector.multiply(doc_vector).A1) | |
) | |
scores.append((documents[i][0], score)) | |
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_k] | |
def combine_search_results(self, bm25_results: List[Tuple[int, float]], vector_results: List[Tuple[int, float]], | |
alpha: float = 0.5) -> List[Tuple[int, float]]: | |
combined_scores = {} | |
for idx, score in bm25_results + vector_results: | |
if idx in combined_scores: | |
combined_scores[idx] += score * (alpha if idx in dict(bm25_results) else (1 - alpha)) | |
else: | |
combined_scores[idx] = score * (alpha if idx in dict(bm25_results) else (1 - alpha)) | |
return sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) | |
def expand_query(self, query: str) -> str: | |
model = T5ForConditionalGeneration.from_pretrained("t5-small") | |
tokenizer = T5Tokenizer.from_pretrained("t5-small") | |
input_text = f"expand query: {query}" | |
input_ids = tokenizer.encode(input_text, return_tensors="pt") | |
outputs = model.generate(input_ids, max_length=50, num_return_sequences=1) | |
expanded_query = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return f"{query} {expanded_query}" | |
def cross_encoder_rerank(self, query: str, initial_results: List[Tuple[int, float]], top_k: int = 5) -> List[ | |
Tuple[int, float]]: | |
from sentence_transformers import CrossEncoder | |
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
candidate_docs = [self.get_document_content(doc_id) for doc_id, _ in initial_results[:top_k * 2]] | |
pairs = [[query, doc] for doc in candidate_docs] | |
scores = model.predict(pairs) | |
reranked = sorted(zip(initial_results[:top_k * 2], scores), key=lambda x: x[1], reverse=True) | |
return [(idx, score) for (idx, _), score in reranked[:top_k]] | |
def rag_query(self, query: str, search_type: str = 'combined', top_k: int = 5, use_hyde: bool = False, | |
rerank: bool = False, expand: bool = False) -> List[Dict[str, any]]: | |
try: | |
if expand: | |
query = self.expand_query(query) | |
if use_hyde: | |
# Implement HyDE if needed | |
pass | |
elif search_type == 'vector': | |
results = self.semantic_search(query, top_k) | |
elif search_type == 'bm25': | |
results = self.bm25_search(query, top_k) | |
elif search_type == 'combined': | |
bm25_results = self.bm25_search(query, top_k) | |
vector_results = self.semantic_search(query, top_k) | |
results = self.combine_search_results(bm25_results, vector_results) | |
else: | |
raise ValueError("Invalid search type. Choose 'vector', 'bm25', or 'combined'.") | |
if rerank: | |
results = self.cross_encoder_rerank(query, results, top_k) | |
enriched_results = [] | |
for doc_id, score in results: | |
content = self.get_document_content(doc_id) | |
enriched_results.append({ | |
"document_id": doc_id, | |
"score": score, | |
"content": content[:500] # Truncate content for brevity | |
}) | |
return enriched_results | |
except Exception as e: | |
logger.error(f"An error occurred during RAG query: {str(e)}") | |
return [] | |
# Example usage | |
if __name__ == "__main__": | |
sqlite_path = "path/to/your/sqlite/database.db" | |
pg_config = { | |
"dbname": "your_db_name", | |
"user": "your_username", | |
"password": "your_password", | |
"host": "localhost" | |
} | |
rag_system = RAGSystem(sqlite_path, pg_config) | |
# Vectorize all documents (run this once or periodically) | |
rag_system.vectorize_all_documents() | |
# Example query | |
query = "programming concepts for beginners" | |
results = rag_system.rag_query(query, search_type='combined', expand=True, rerank=True) | |
print(f"Search results for query: '{query}'\n") | |
for i, result in enumerate(results, 1): | |
print(f"Result {i}:") | |
print(f"Document ID: {result['document_id']}") | |
print(f"Score: {result['score']:.4f}") | |
print(f"Content snippet: {result['content']}") | |
print("---") |