Spaces:
Running
Running
# Requirements | |
# scikit-learn umap-learn | |
from itertools import chain | |
from typing import List, Dict | |
from App_Function_Libraries.RAG.ChromaDB_Library import store_in_chroma, create_embedding, vector_search, chroma_client | |
from App_Function_Libraries.Chunk_Lib import improved_chunking_process, recursive_summarize_chunks | |
import logging | |
from sklearn.mixture import GaussianMixture | |
import umap | |
from nltk.corpus import wordnet | |
# Logging setup | |
logging.basicConfig(filename='raptor.log', level=logging.DEBUG) | |
# FIXME | |
MAX_LEVELS = 3 | |
def log_and_summarize(text, prompt): | |
logging.debug(f"Summarizing text: {text[:100]} with prompt: {prompt}") | |
return dummy_summarize(text, prompt) | |
# 1. Data Preparation | |
def prepare_data(content: str, media_id: int, chunk_options: dict): | |
chunks = improved_chunking_process(content, chunk_options) | |
embeddings = [create_embedding(chunk['text']) for chunk in chunks] | |
return chunks, embeddings | |
# 2. Recursive Summarization | |
def recursive_summarization(chunks, summarize_func, custom_prompt): | |
summarized_chunks = recursive_summarize_chunks( | |
[chunk['text'] for chunk in chunks], | |
summarize_func=summarize_func, | |
custom_prompt=custom_prompt | |
) | |
return summarized_chunks | |
# Initial gen | |
# 3. Tree Organization | |
#def build_tree_structure(chunks, embeddings, collection_name, level=0): | |
# if len(chunks) <= 1: | |
# return chunks # Base case: if chunks are small enough, return as is | |
# Recursive case: cluster and summarize | |
# summarized_chunks = recursive_summarization(chunks, summarize_func=dummy_summarize, custom_prompt="Summarize:") | |
# new_chunks, new_embeddings = prepare_data(' '.join(summarized_chunks), media_id, chunk_options) | |
# Store in ChromaDB | |
# ids = [f"{media_id}_L{level}_chunk_{i}" for i in range(len(new_chunks))] | |
# store_in_chroma(collection_name, [chunk['text'] for chunk in new_chunks], new_embeddings, ids) | |
# Recursively build tree | |
# return build_tree_structure(new_chunks, new_embeddings, collection_name, level+1) | |
# Second iteration | |
def build_tree_structure(chunks, collection_name, level=0): | |
# Dynamic clustering | |
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks]) | |
# Summarize each cluster | |
summarized_clusters = {} | |
for cluster_id, cluster_texts in clustered_texts.items(): | |
summary = dummy_summarize(' '.join(cluster_texts), custom_prompt="Summarize:") | |
summarized_clusters[cluster_id] = summary | |
# Store summaries at current level | |
ids = [] | |
embeddings = [] | |
summaries = [] | |
for cluster_id, summary in summarized_clusters.items(): | |
ids.append(f"{collection_name}_L{level}_C{cluster_id}") | |
embeddings.append(create_embedding(summary)) | |
summaries.append(summary) | |
store_in_chroma(collection_name, summaries, embeddings, ids) | |
# Recursively build tree structure if necessary | |
if level < MAX_LEVELS: | |
for cluster_id, cluster_texts in clustered_texts.items(): | |
build_tree_structure(cluster_texts, collection_name, level + 1) | |
# Dummy summarize function (replace with actual summarization) | |
def dummy_summarize(text, custom_prompt, temp=None, system_prompt=None): | |
return text # Replace this with actual call to summarization model (like GPT-3.5-turbo) | |
# 4. Retrieval | |
def raptor_retrieve(query, collection_name, level=0): | |
results = vector_search(collection_name, query, k=5) | |
return results | |
# Main function integrating RAPTOR | |
def raptor_pipeline(media_id, content, chunk_options): | |
collection_name = f"media_{media_id}_raptor" | |
# Step 1: Prepare Data | |
chunks, embeddings = prepare_data(content, media_id, chunk_options) | |
# Step 2: Build Tree | |
build_tree_structure(chunks, embeddings, collection_name) | |
# Step 3: Retrieve Information | |
query = "Your query here" | |
result = raptor_retrieve(query, collection_name) | |
print(result) | |
# Example usage | |
content = "Your long document content here" | |
chunk_options = { | |
'method': 'sentences', | |
'max_size': 300, | |
'overlap': 50 | |
} | |
media_id = 1 | |
raptor_pipeline(media_id, content, chunk_options) | |
# | |
# | |
################################################################################################################### | |
# | |
# Additions: | |
def dynamic_clustering(texts, n_components=2): | |
# Step 1: Convert text to embeddings | |
embeddings = [create_embedding(text) for text in texts] | |
# Step 2: Dimensionality reduction (UMAP) | |
reducer = umap.UMAP(n_components=n_components) | |
reduced_embeddings = reducer.fit_transform(embeddings) | |
# Step 3: Find optimal number of clusters using BIC | |
best_gmm = None | |
best_bic = float('inf') | |
n_clusters = range(2, 10) | |
for n in n_clusters: | |
gmm = GaussianMixture(n_components=n, covariance_type='full') | |
gmm.fit(reduced_embeddings) | |
bic = gmm.bic(reduced_embeddings) | |
if bic < best_bic: | |
best_bic = bic | |
best_gmm = gmm | |
# Step 4: Cluster the reduced embeddings | |
cluster_labels = best_gmm.predict(reduced_embeddings) | |
clustered_texts = {i: [] for i in range(best_gmm.n_components)} | |
for label, text in zip(cluster_labels, texts): | |
clustered_texts[label].append(text) | |
return clustered_texts | |
def tree_traversal_retrieve(query, collection_name, max_depth=3): | |
logging.info(f"Starting tree traversal for query: {query}") | |
results = [] | |
current_level = 0 | |
current_nodes = [collection_name + '_L0'] | |
while current_level <= max_depth and current_nodes: | |
next_level_nodes = [] | |
for node_id in current_nodes: | |
documents = vector_search(node_id, query, k=5) | |
results.extend(documents) | |
next_level_nodes.extend([doc['id'] for doc in documents]) # Assuming your doc structure includes an 'id' field | |
current_nodes = next_level_nodes | |
current_level += 1 | |
logging.info(f"Tree traversal completed with {len(results)} results") | |
return results | |
def collapsed_tree_retrieve(query, collection_name): | |
all_layers = [f"{collection_name}_L{level}" for level in range(MAX_LEVELS)] | |
all_results = [] | |
for layer in all_layers: | |
all_results.extend(vector_search(layer, query, k=5)) | |
# Sort and rank results by relevance | |
sorted_results = sorted(all_results, key=lambda x: x['relevance'], reverse=True) # Assuming 'relevance' is a key | |
return sorted_results[:5] # Return top 5 results | |
# Test collaped tree retrieval | |
query = "Your broad query here" | |
results = collapsed_tree_retrieve(query, collection_name=f"media_{media_id}_raptor") | |
print(results) | |
# Parallel processing | |
# pip install joblib | |
from joblib import Parallel, delayed | |
def parallel_process_chunks(chunks): | |
return Parallel(n_jobs=-1)(delayed(create_embedding)(chunk['text']) for chunk in chunks) | |
def build_tree_structure(chunks, collection_name, level=0): | |
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks]) | |
summarized_clusters = {} | |
for cluster_id, cluster_texts in clustered_texts.items(): | |
summary = dummy_summarize(' '.join(cluster_texts), custom_prompt="Summarize:") | |
summarized_clusters[cluster_id] = summary | |
# Parallel processing of embeddings | |
embeddings = parallel_process_chunks([{'text': summary} for summary in summarized_clusters.values()]) | |
ids = [f"{collection_name}_L{level}_C{cluster_id}" for cluster_id in summarized_clusters.keys()] | |
store_in_chroma(collection_name, list(summarized_clusters.values()), embeddings, ids) | |
if len(summarized_clusters) > 1 and level < MAX_LEVELS: | |
build_tree_structure(summarized_clusters.values(), collection_name, level + 1) | |
# Asynchronous processing | |
import asyncio | |
async def async_create_embedding(text): | |
return create_embedding(text) # Assuming create_embedding is now async | |
async def build_tree_structure_async(chunks, collection_name, level=0): | |
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks]) | |
summarized_clusters = {} | |
for cluster_id, cluster_texts in clustered_texts.items(): | |
summary = await async_create_embedding(' '.join(cluster_texts)) | |
summarized_clusters[cluster_id] = summary | |
embeddings = await asyncio.gather(*[async_create_embedding(summary) for summary in summarized_clusters.values()]) | |
ids = [f"{collection_name}_L{level}_C{cluster_id}" for cluster_id in summarized_clusters.keys()] | |
store_in_chroma(collection_name, list(summarized_clusters.values()), embeddings, ids) | |
if len(summarized_clusters) > 1 and level < MAX_LEVELS: | |
await build_tree_structure_async(summarized_clusters.values(), collection_name, level + 1) | |
# User feedback Loop | |
def get_user_feedback(results): | |
print("Please review the following results:") | |
for i, result in enumerate(results): | |
print(f"{i + 1}: {result['text'][:100]}...") | |
feedback = input("Enter the numbers of the results that were relevant (comma-separated): ") | |
relevant_indices = [int(i.strip()) - 1 for i in feedback.split(",")] | |
return relevant_indices | |
def raptor_pipeline_with_feedback(media_id, content, chunk_options): | |
# ... Existing pipeline steps ... | |
query = "Your query here" | |
initial_results = tree_traversal_retrieve(query, collection_name=f"media_{media_id}_raptor") | |
relevant_indices = get_user_feedback(initial_results) | |
if relevant_indices: | |
relevant_results = [initial_results[i] for i in relevant_indices] | |
refined_query = " ".join([res['text'] for res in relevant_results]) | |
try: | |
final_results = tree_traversal_retrieve(refined_query, collection_name=f"media_{media_id}_raptor") | |
except Exception as e: | |
logging.error(f"Error during retrieval: {str(e)}") | |
raise | |
print("Refined Results:", final_results) | |
else: | |
print("No relevant results were found in the initial search.") | |
def identify_uncertain_results(results): | |
threshold = 0.5 # Define a confidence threshold | |
uncertain_results = [res for res in results if res['confidence'] < threshold] | |
return uncertain_results | |
def raptor_pipeline_with_active_learning(media_id, content, chunk_options): | |
# ... Existing pipeline steps ... | |
query = "Your query here" | |
initial_results = tree_traversal_retrieve(query, collection_name=f"media_{media_id}_raptor") | |
uncertain_results = identify_uncertain_results(initial_results) | |
if uncertain_results: | |
print("The following results are uncertain. Please provide feedback:") | |
feedback_indices = get_user_feedback(uncertain_results) | |
# Use feedback to adjust retrieval or refine the query | |
refined_query = " ".join([uncertain_results[i]['text'] for i in feedback_indices]) | |
final_results = tree_traversal_retrieve(refined_query, collection_name=f"media_{media_id}_raptor") | |
print("Refined Results:", final_results) | |
else: | |
print("No uncertain results were found.") | |
# Query Expansion | |
def expand_query_with_synonyms(query): | |
words = query.split() | |
expanded_query = [] | |
for word in words: | |
synonyms = wordnet.synsets(word) | |
lemmas = set(chain.from_iterable([syn.lemma_names() for syn in synonyms])) | |
expanded_query.append(" ".join(lemmas)) | |
return " ".join(expanded_query) | |
def contextual_query_expansion(query, context): | |
# FIXME: Replace with actual contextual model | |
expanded_terms = some_contextual_model.get_expansions(query, context) | |
return query + " " + " ".join(expanded_terms) | |
def raptor_pipeline_with_query_expansion(media_id, content, chunk_options): | |
# ... Existing pipeline steps ... | |
query = "Your initial query" | |
expanded_query = expand_query_with_synonyms(query) | |
initial_results = tree_traversal_retrieve(expanded_query, collection_name=f"media_{media_id}_raptor") | |
# ... Continue with feedback loop ... | |
def generate_summary_with_citations(query: str, collection_name: str): | |
results = vector_search_with_citation(collection_name, query) | |
# FIXME | |
summary = summarize([res['text'] for res in results]) | |
# Deduplicate sources | |
sources = list(set(res['source'] for res in results)) | |
return f"{summary}\n\nCitations:\n" + "\n".join(sources) | |
def vector_search_with_citation(collection_name: str, query: str, k: int = 10) -> List[Dict[str, str]]: | |
query_embedding = create_embedding(query) | |
collection = chroma_client.get_collection(name=collection_name) | |
results = collection.query( | |
query_embeddings=[query_embedding], | |
n_results=k | |
) | |
return [{'text': doc, 'source': meta['source']} for doc, meta in zip(results['documents'], results['metadatas'])] | |
def generate_summary_with_footnotes(query: str, collection_name: str): | |
results = vector_search_with_citation(collection_name, query) | |
summary_parts = [] | |
citations = [] | |
for i, res in enumerate(results): | |
summary_parts.append(f"{res['text']} [{i + 1}]") | |
citations.append(f"[{i + 1}] {res['source']}") | |
return " ".join(summary_parts) + "\n\nFootnotes:\n" + "\n".join(citations) | |
def generate_summary_with_hyperlinks(query: str, collection_name: str): | |
results = vector_search_with_citation(collection_name, query) | |
summary_parts = [] | |
for res in results: | |
summary_parts.append(f'<a href="{res["source"]}">{res["text"][:100]}...</a>') | |
return " ".join(summary_parts) | |
# | |
# End of Additions | |
############################################3############################################3############################## |