|
|
|
|
|
from itertools import chain
|
|
from typing import List, Dict
|
|
|
|
from App_Function_Libraries.RAG.ChromaDB_Library import store_in_chroma, create_embedding, vector_search, chroma_client
|
|
from App_Function_Libraries.Chunk_Lib import improved_chunking_process, recursive_summarize_chunks
|
|
import logging
|
|
from sklearn.mixture import GaussianMixture
|
|
import umap
|
|
from nltk.corpus import wordnet
|
|
|
|
|
|
|
|
logging.basicConfig(filename='raptor.log', level=logging.DEBUG)
|
|
|
|
|
|
MAX_LEVELS = 3
|
|
|
|
|
|
def log_and_summarize(text, prompt):
|
|
logging.debug(f"Summarizing text: {text[:100]} with prompt: {prompt}")
|
|
return dummy_summarize(text, prompt)
|
|
|
|
|
|
def prepare_data(content: str, media_id: int, chunk_options: dict):
|
|
chunks = improved_chunking_process(content, chunk_options)
|
|
embeddings = [create_embedding(chunk['text']) for chunk in chunks]
|
|
return chunks, embeddings
|
|
|
|
|
|
def recursive_summarization(chunks, summarize_func, custom_prompt):
|
|
summarized_chunks = recursive_summarize_chunks(
|
|
[chunk['text'] for chunk in chunks],
|
|
summarize_func=summarize_func,
|
|
custom_prompt=custom_prompt
|
|
)
|
|
return summarized_chunks
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_tree_structure(chunks, collection_name, level=0):
|
|
|
|
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks])
|
|
|
|
|
|
summarized_clusters = {}
|
|
for cluster_id, cluster_texts in clustered_texts.items():
|
|
summary = dummy_summarize(' '.join(cluster_texts), custom_prompt="Summarize:")
|
|
summarized_clusters[cluster_id] = summary
|
|
|
|
|
|
ids = []
|
|
embeddings = []
|
|
summaries = []
|
|
for cluster_id, summary in summarized_clusters.items():
|
|
ids.append(f"{collection_name}_L{level}_C{cluster_id}")
|
|
embeddings.append(create_embedding(summary))
|
|
summaries.append(summary)
|
|
|
|
store_in_chroma(collection_name, summaries, embeddings, ids)
|
|
|
|
|
|
if level < MAX_LEVELS:
|
|
for cluster_id, cluster_texts in clustered_texts.items():
|
|
build_tree_structure(cluster_texts, collection_name, level + 1)
|
|
|
|
|
|
|
|
|
|
|
|
def dummy_summarize(text, custom_prompt, temp=None, system_prompt=None):
|
|
return text
|
|
|
|
|
|
def raptor_retrieve(query, collection_name, level=0):
|
|
results = vector_search(collection_name, query, k=5)
|
|
return results
|
|
|
|
|
|
def raptor_pipeline(media_id, content, chunk_options):
|
|
collection_name = f"media_{media_id}_raptor"
|
|
|
|
|
|
chunks, embeddings = prepare_data(content, media_id, chunk_options)
|
|
|
|
|
|
build_tree_structure(chunks, embeddings, collection_name)
|
|
|
|
|
|
query = "Your query here"
|
|
result = raptor_retrieve(query, collection_name)
|
|
print(result)
|
|
|
|
|
|
content = "Your long document content here"
|
|
chunk_options = {
|
|
'method': 'sentences',
|
|
'max_size': 300,
|
|
'overlap': 50
|
|
}
|
|
media_id = 1
|
|
raptor_pipeline(media_id, content, chunk_options)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dynamic_clustering(texts, n_components=2):
|
|
|
|
embeddings = [create_embedding(text) for text in texts]
|
|
|
|
|
|
reducer = umap.UMAP(n_components=n_components)
|
|
reduced_embeddings = reducer.fit_transform(embeddings)
|
|
|
|
|
|
best_gmm = None
|
|
best_bic = float('inf')
|
|
n_clusters = range(2, 10)
|
|
for n in n_clusters:
|
|
gmm = GaussianMixture(n_components=n, covariance_type='full')
|
|
gmm.fit(reduced_embeddings)
|
|
bic = gmm.bic(reduced_embeddings)
|
|
if bic < best_bic:
|
|
best_bic = bic
|
|
best_gmm = gmm
|
|
|
|
|
|
cluster_labels = best_gmm.predict(reduced_embeddings)
|
|
clustered_texts = {i: [] for i in range(best_gmm.n_components)}
|
|
for label, text in zip(cluster_labels, texts):
|
|
clustered_texts[label].append(text)
|
|
|
|
return clustered_texts
|
|
|
|
|
|
def tree_traversal_retrieve(query, collection_name, max_depth=3):
|
|
logging.info(f"Starting tree traversal for query: {query}")
|
|
results = []
|
|
current_level = 0
|
|
current_nodes = [collection_name + '_L0']
|
|
|
|
while current_level <= max_depth and current_nodes:
|
|
next_level_nodes = []
|
|
for node_id in current_nodes:
|
|
documents = vector_search(node_id, query, k=5)
|
|
results.extend(documents)
|
|
next_level_nodes.extend([doc['id'] for doc in documents])
|
|
current_nodes = next_level_nodes
|
|
current_level += 1
|
|
|
|
logging.info(f"Tree traversal completed with {len(results)} results")
|
|
return results
|
|
|
|
|
|
def collapsed_tree_retrieve(query, collection_name):
|
|
all_layers = [f"{collection_name}_L{level}" for level in range(MAX_LEVELS)]
|
|
all_results = []
|
|
|
|
for layer in all_layers:
|
|
all_results.extend(vector_search(layer, query, k=5))
|
|
|
|
|
|
sorted_results = sorted(all_results, key=lambda x: x['relevance'], reverse=True)
|
|
return sorted_results[:5]
|
|
|
|
|
|
query = "Your broad query here"
|
|
results = collapsed_tree_retrieve(query, collection_name=f"media_{media_id}_raptor")
|
|
print(results)
|
|
|
|
|
|
|
|
|
|
from joblib import Parallel, delayed
|
|
|
|
def parallel_process_chunks(chunks):
|
|
return Parallel(n_jobs=-1)(delayed(create_embedding)(chunk['text']) for chunk in chunks)
|
|
|
|
def build_tree_structure(chunks, collection_name, level=0):
|
|
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks])
|
|
|
|
summarized_clusters = {}
|
|
for cluster_id, cluster_texts in clustered_texts.items():
|
|
summary = dummy_summarize(' '.join(cluster_texts), custom_prompt="Summarize:")
|
|
summarized_clusters[cluster_id] = summary
|
|
|
|
|
|
embeddings = parallel_process_chunks([{'text': summary} for summary in summarized_clusters.values()])
|
|
|
|
ids = [f"{collection_name}_L{level}_C{cluster_id}" for cluster_id in summarized_clusters.keys()]
|
|
store_in_chroma(collection_name, list(summarized_clusters.values()), embeddings, ids)
|
|
|
|
if len(summarized_clusters) > 1 and level < MAX_LEVELS:
|
|
build_tree_structure(summarized_clusters.values(), collection_name, level + 1)
|
|
|
|
|
|
import asyncio
|
|
|
|
async def async_create_embedding(text):
|
|
return create_embedding(text)
|
|
|
|
async def build_tree_structure_async(chunks, collection_name, level=0):
|
|
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks])
|
|
|
|
summarized_clusters = {}
|
|
for cluster_id, cluster_texts in clustered_texts.items():
|
|
summary = await async_create_embedding(' '.join(cluster_texts))
|
|
summarized_clusters[cluster_id] = summary
|
|
|
|
embeddings = await asyncio.gather(*[async_create_embedding(summary) for summary in summarized_clusters.values()])
|
|
|
|
ids = [f"{collection_name}_L{level}_C{cluster_id}" for cluster_id in summarized_clusters.keys()]
|
|
store_in_chroma(collection_name, list(summarized_clusters.values()), embeddings, ids)
|
|
|
|
if len(summarized_clusters) > 1 and level < MAX_LEVELS:
|
|
await build_tree_structure_async(summarized_clusters.values(), collection_name, level + 1)
|
|
|
|
|
|
|
|
def get_user_feedback(results):
|
|
print("Please review the following results:")
|
|
for i, result in enumerate(results):
|
|
print(f"{i + 1}: {result['text'][:100]}...")
|
|
|
|
feedback = input("Enter the numbers of the results that were relevant (comma-separated): ")
|
|
relevant_indices = [int(i.strip()) - 1 for i in feedback.split(",")]
|
|
return relevant_indices
|
|
|
|
|
|
def raptor_pipeline_with_feedback(media_id, content, chunk_options):
|
|
|
|
|
|
query = "Your query here"
|
|
initial_results = tree_traversal_retrieve(query, collection_name=f"media_{media_id}_raptor")
|
|
relevant_indices = get_user_feedback(initial_results)
|
|
|
|
if relevant_indices:
|
|
relevant_results = [initial_results[i] for i in relevant_indices]
|
|
refined_query = " ".join([res['text'] for res in relevant_results])
|
|
try:
|
|
final_results = tree_traversal_retrieve(refined_query, collection_name=f"media_{media_id}_raptor")
|
|
except Exception as e:
|
|
logging.error(f"Error during retrieval: {str(e)}")
|
|
raise
|
|
print("Refined Results:", final_results)
|
|
else:
|
|
print("No relevant results were found in the initial search.")
|
|
|
|
|
|
def identify_uncertain_results(results):
|
|
threshold = 0.5
|
|
uncertain_results = [res for res in results if res['confidence'] < threshold]
|
|
return uncertain_results
|
|
|
|
|
|
def raptor_pipeline_with_active_learning(media_id, content, chunk_options):
|
|
|
|
|
|
query = "Your query here"
|
|
initial_results = tree_traversal_retrieve(query, collection_name=f"media_{media_id}_raptor")
|
|
uncertain_results = identify_uncertain_results(initial_results)
|
|
|
|
if uncertain_results:
|
|
print("The following results are uncertain. Please provide feedback:")
|
|
feedback_indices = get_user_feedback(uncertain_results)
|
|
|
|
refined_query = " ".join([uncertain_results[i]['text'] for i in feedback_indices])
|
|
final_results = tree_traversal_retrieve(refined_query, collection_name=f"media_{media_id}_raptor")
|
|
print("Refined Results:", final_results)
|
|
else:
|
|
print("No uncertain results were found.")
|
|
|
|
|
|
|
|
def expand_query_with_synonyms(query):
|
|
words = query.split()
|
|
expanded_query = []
|
|
for word in words:
|
|
synonyms = wordnet.synsets(word)
|
|
lemmas = set(chain.from_iterable([syn.lemma_names() for syn in synonyms]))
|
|
expanded_query.append(" ".join(lemmas))
|
|
return " ".join(expanded_query)
|
|
|
|
|
|
def contextual_query_expansion(query, context):
|
|
|
|
expanded_terms = some_contextual_model.get_expansions(query, context)
|
|
return query + " " + " ".join(expanded_terms)
|
|
|
|
|
|
def raptor_pipeline_with_query_expansion(media_id, content, chunk_options):
|
|
|
|
|
|
query = "Your initial query"
|
|
expanded_query = expand_query_with_synonyms(query)
|
|
initial_results = tree_traversal_retrieve(expanded_query, collection_name=f"media_{media_id}_raptor")
|
|
|
|
|
|
|
|
def generate_summary_with_citations(query: str, collection_name: str):
|
|
results = vector_search_with_citation(collection_name, query)
|
|
|
|
summary = summarize([res['text'] for res in results])
|
|
|
|
sources = list(set(res['source'] for res in results))
|
|
return f"{summary}\n\nCitations:\n" + "\n".join(sources)
|
|
|
|
|
|
def vector_search_with_citation(collection_name: str, query: str, k: int = 10) -> List[Dict[str, str]]:
|
|
query_embedding = create_embedding(query)
|
|
collection = chroma_client.get_collection(name=collection_name)
|
|
results = collection.query(
|
|
query_embeddings=[query_embedding],
|
|
n_results=k
|
|
)
|
|
return [{'text': doc, 'source': meta['source']} for doc, meta in zip(results['documents'], results['metadatas'])]
|
|
|
|
|
|
def generate_summary_with_footnotes(query: str, collection_name: str):
|
|
results = vector_search_with_citation(collection_name, query)
|
|
summary_parts = []
|
|
citations = []
|
|
for i, res in enumerate(results):
|
|
summary_parts.append(f"{res['text']} [{i + 1}]")
|
|
citations.append(f"[{i + 1}] {res['source']}")
|
|
return " ".join(summary_parts) + "\n\nFootnotes:\n" + "\n".join(citations)
|
|
|
|
|
|
def generate_summary_with_hyperlinks(query: str, collection_name: str):
|
|
results = vector_search_with_citation(collection_name, query)
|
|
summary_parts = []
|
|
for res in results:
|
|
summary_parts.append(f'<a href="{res["source"]}">{res["text"][:100]}...</a>')
|
|
return " ".join(summary_parts)
|
|
|
|
|
|
|
|
|
|
|