Spaces:
Running
Running
import math | |
import random | |
import threading | |
import chromadb | |
import numpy as np | |
import posthog | |
from chromadb.config import Settings | |
from chromadb.utils import embedding_functions | |
import extensions.superboogav2.parameters as parameters | |
from modules.logging_colors import logger | |
from modules.text_generation import decode, encode | |
# Intercept calls to posthog | |
posthog.capture = lambda *args, **kwargs: None | |
embedder = embedding_functions.SentenceTransformerEmbeddingFunction("sentence-transformers/all-mpnet-base-v2") | |
class Info: | |
def __init__(self, start_index, text_with_context, distance, id): | |
self.text_with_context = text_with_context | |
self.start_index = start_index | |
self.distance = distance | |
self.id = id | |
def calculate_distance(self, other_info): | |
if parameters.get_new_dist_strategy() == parameters.DIST_MIN_STRATEGY: | |
# Min | |
return min(self.distance, other_info.distance) | |
elif parameters.get_new_dist_strategy() == parameters.DIST_HARMONIC_STRATEGY: | |
# Harmonic mean | |
return 2 * (self.distance * other_info.distance) / (self.distance + other_info.distance) | |
elif parameters.get_new_dist_strategy() == parameters.DIST_GEOMETRIC_STRATEGY: | |
# Geometric mean | |
return (self.distance * other_info.distance) ** 0.5 | |
elif parameters.get_new_dist_strategy() == parameters.DIST_ARITHMETIC_STRATEGY: | |
# Arithmetic mean | |
return (self.distance + other_info.distance) / 2 | |
else: # Min is default | |
return min(self.distance, other_info.distance) | |
def merge_with(self, other_info): | |
s1 = self.text_with_context | |
s2 = other_info.text_with_context | |
s1_start = self.start_index | |
s2_start = other_info.start_index | |
new_dist = self.calculate_distance(other_info) | |
if self.should_merge(s1, s2, s1_start, s2_start): | |
if s1_start <= s2_start: | |
if s1_start + len(s1) >= s2_start + len(s2): # if s1 completely covers s2 | |
return Info(s1_start, s1, new_dist, self.id) | |
else: | |
overlap = max(0, s1_start + len(s1) - s2_start) | |
return Info(s1_start, s1 + s2[overlap:], new_dist, self.id) | |
else: | |
if s2_start + len(s2) >= s1_start + len(s1): # if s2 completely covers s1 | |
return Info(s2_start, s2, new_dist, other_info.id) | |
else: | |
overlap = max(0, s2_start + len(s2) - s1_start) | |
return Info(s2_start, s2 + s1[overlap:], new_dist, other_info.id) | |
return None | |
def should_merge(s1, s2, s1_start, s2_start): | |
# Check if s1 and s2 are adjacent or overlapping | |
s1_end = s1_start + len(s1) | |
s2_end = s2_start + len(s2) | |
return not (s1_end < s2_start or s2_end < s1_start) | |
class ChromaCollector(): | |
def __init__(self): | |
name = ''.join(random.choice('ab') for _ in range(10)) | |
self.name = name | |
self.chroma_client = chromadb.Client(Settings(anonymized_telemetry=False)) | |
self.collection = self.chroma_client.create_collection(name=name, embedding_function=embedder) | |
self.ids = [] | |
self.id_to_info = {} | |
self.embeddings_cache = {} | |
self.lock = threading.Lock() # Locking so the server doesn't break. | |
def add(self, texts: list[str], texts_with_context: list[str], starting_indices: list[int], metadatas: list[dict] = None): | |
with self.lock: | |
assert metadatas is None or len(metadatas) == len(texts), "metadatas must be None or have the same length as texts" | |
if len(texts) == 0: | |
return | |
new_ids = self._get_new_ids(len(texts)) | |
(existing_texts, existing_embeddings, existing_ids, existing_metas), \ | |
(non_existing_texts, non_existing_ids, non_existing_metas) = self._split_texts_by_cache_hit(texts, new_ids, metadatas) | |
# If there are any already existing texts, add them all at once. | |
if existing_texts: | |
logger.info(f'Adding {len(existing_embeddings)} cached embeddings.') | |
args = {'embeddings': existing_embeddings, 'documents': existing_texts, 'ids': existing_ids} | |
if metadatas is not None: | |
args['metadatas'] = existing_metas | |
self.collection.add(**args) | |
# If there are any non-existing texts, compute their embeddings all at once. Each call to embed has significant overhead. | |
if non_existing_texts: | |
non_existing_embeddings = embedder(non_existing_texts) | |
for text, embedding in zip(non_existing_texts, non_existing_embeddings): | |
self.embeddings_cache[text] = embedding | |
logger.info(f'Adding {len(non_existing_embeddings)} new embeddings.') | |
args = {'embeddings': non_existing_embeddings, 'documents': non_existing_texts, 'ids': non_existing_ids} | |
if metadatas is not None: | |
args['metadatas'] = non_existing_metas | |
self.collection.add(**args) | |
# Create a dictionary that maps each ID to its context and starting index | |
new_info = { | |
id_: {'text_with_context': context, 'start_index': start_index} | |
for id_, context, start_index in zip(new_ids, texts_with_context, starting_indices) | |
} | |
self.id_to_info.update(new_info) | |
self.ids.extend(new_ids) | |
def _split_texts_by_cache_hit(self, texts: list[str], new_ids: list[str], metadatas: list[dict]): | |
existing_texts, non_existing_texts = [], [] | |
existing_embeddings = [] | |
existing_ids, non_existing_ids = [], [] | |
existing_metas, non_existing_metas = [], [] | |
for i, text in enumerate(texts): | |
id_ = new_ids[i] | |
metadata = metadatas[i] if metadatas is not None else None | |
embedding = self.embeddings_cache.get(text) | |
if embedding: | |
existing_texts.append(text) | |
existing_embeddings.append(embedding) | |
existing_ids.append(id_) | |
existing_metas.append(metadata) | |
else: | |
non_existing_texts.append(text) | |
non_existing_ids.append(id_) | |
non_existing_metas.append(metadata) | |
return (existing_texts, existing_embeddings, existing_ids, existing_metas), \ | |
(non_existing_texts, non_existing_ids, non_existing_metas) | |
def _get_new_ids(self, num_new_ids: int): | |
if self.ids: | |
max_existing_id = max(int(id_) for id_ in self.ids) | |
else: | |
max_existing_id = -1 | |
return [str(i + max_existing_id + 1) for i in range(num_new_ids)] | |
def _find_min_max_start_index(self): | |
max_index, min_index = 0, float('inf') | |
for _, val in self.id_to_info.items(): | |
if val['start_index'] > max_index: | |
max_index = val['start_index'] | |
if val['start_index'] < min_index: | |
min_index = val['start_index'] | |
return min_index, max_index | |
# NB: Does not make sense to weigh excerpts from different documents. | |
# But let's say that's the user's problem. Perfect world scenario: | |
# Apply time weighing to different documents. For each document, then, add | |
# separate time weighing. | |
def _apply_sigmoid_time_weighing(self, infos: list[Info], document_len: int, time_steepness: float, time_power: float): | |
def sigmoid(x): | |
return 1 / (1 + np.exp(-x)) | |
weights = sigmoid(time_steepness * np.linspace(-10, 10, document_len)) | |
# Scale to [0,time_power] and shift it up to [1-time_power, 1] | |
weights = weights - min(weights) | |
weights = weights * (time_power / max(weights)) | |
weights = weights + (1 - time_power) | |
# Reverse the weights | |
weights = weights[::-1] | |
for info in infos: | |
index = info.start_index | |
info.distance *= weights[index] | |
def _filter_outliers_by_median_distance(self, infos: list[Info], significant_level: float): | |
# Ensure there are infos to filter | |
if not infos: | |
return [] | |
# Find info with minimum distance | |
min_info = min(infos, key=lambda x: x.distance) | |
# Calculate median distance among infos | |
median_distance = np.median([inf.distance for inf in infos]) | |
# Filter out infos that have a distance significantly greater than the median | |
filtered_infos = [inf for inf in infos if inf.distance <= significant_level * median_distance] | |
# Always include the info with minimum distance | |
if min_info not in filtered_infos: | |
filtered_infos.append(min_info) | |
return filtered_infos | |
def _merge_infos(self, infos: list[Info]): | |
merged_infos = [] | |
current_info = infos[0] | |
for next_info in infos[1:]: | |
merged = current_info.merge_with(next_info) | |
if merged is not None: | |
current_info = merged | |
else: | |
merged_infos.append(current_info) | |
current_info = next_info | |
merged_infos.append(current_info) | |
return merged_infos | |
# Main function for retrieving chunks by distance. It performs merging, time weighing, and mean filtering. | |
def _get_documents_ids_distances(self, search_strings: list[str], n_results: int): | |
n_results = min(len(self.ids), n_results) | |
if n_results == 0: | |
return [], [], [] | |
if isinstance(search_strings, str): | |
search_strings = [search_strings] | |
infos = [] | |
min_start_index, max_start_index = self._find_min_max_start_index() | |
for search_string in search_strings: | |
result = self.collection.query(query_texts=search_string, n_results=math.ceil(n_results / len(search_strings)), include=['distances']) | |
curr_infos = [Info(start_index=self.id_to_info[id]['start_index'], | |
text_with_context=self.id_to_info[id]['text_with_context'], | |
distance=distance, id=id) | |
for id, distance in zip(result['ids'][0], result['distances'][0])] | |
self._apply_sigmoid_time_weighing(infos=curr_infos, document_len=max_start_index - min_start_index + 1, time_steepness=parameters.get_time_steepness(), time_power=parameters.get_time_power()) | |
curr_infos = self._filter_outliers_by_median_distance(curr_infos, parameters.get_significant_level()) | |
infos.extend(curr_infos) | |
infos.sort(key=lambda x: x.start_index) | |
infos = self._merge_infos(infos) | |
texts_with_context = [inf.text_with_context for inf in infos] | |
ids = [inf.id for inf in infos] | |
distances = [inf.distance for inf in infos] | |
return texts_with_context, ids, distances | |
# Get chunks by similarity | |
def get(self, search_strings: list[str], n_results: int) -> list[str]: | |
with self.lock: | |
documents, _, _ = self._get_documents_ids_distances(search_strings, n_results) | |
return documents | |
# Get ids by similarity | |
def get_ids(self, search_strings: list[str], n_results: int) -> list[str]: | |
with self.lock: | |
_, ids, _ = self._get_documents_ids_distances(search_strings, n_results) | |
return ids | |
# Cutoff token count | |
def _get_documents_up_to_token_count(self, documents: list[str], max_token_count: int): | |
# TODO: Move to caller; We add delimiters there which might go over the limit. | |
current_token_count = 0 | |
return_documents = [] | |
for doc in documents: | |
doc_tokens = encode(doc)[0] | |
doc_token_count = len(doc_tokens) | |
if current_token_count + doc_token_count > max_token_count: | |
# If adding this document would exceed the max token count, | |
# truncate the document to fit within the limit. | |
remaining_tokens = max_token_count - current_token_count | |
truncated_doc = decode(doc_tokens[:remaining_tokens], skip_special_tokens=True) | |
return_documents.append(truncated_doc) | |
break | |
else: | |
return_documents.append(doc) | |
current_token_count += doc_token_count | |
return return_documents | |
# Get chunks by similarity and then sort by ids | |
def get_sorted_by_ids(self, search_strings: list[str], n_results: int, max_token_count: int) -> list[str]: | |
with self.lock: | |
documents, ids, _ = self._get_documents_ids_distances(search_strings, n_results) | |
sorted_docs = [x for _, x in sorted(zip(ids, documents))] | |
return self._get_documents_up_to_token_count(sorted_docs, max_token_count) | |
# Get chunks by similarity and then sort by distance (lowest distance is last). | |
def get_sorted_by_dist(self, search_strings: list[str], n_results: int, max_token_count: int) -> list[str]: | |
with self.lock: | |
documents, _, distances = self._get_documents_ids_distances(search_strings, n_results) | |
sorted_docs = [doc for doc, _ in sorted(zip(documents, distances), key=lambda x: x[1])] # sorted lowest -> highest | |
# If a document is truncated or competely skipped, it would be with high distance. | |
return_documents = self._get_documents_up_to_token_count(sorted_docs, max_token_count) | |
return_documents.reverse() # highest -> lowest | |
return return_documents | |
def delete(self, ids_to_delete: list[str], where: dict): | |
with self.lock: | |
ids_to_delete = self.collection.get(ids=ids_to_delete, where=where)['ids'] | |
self.collection.delete(ids=ids_to_delete, where=where) | |
# Remove the deleted ids from self.ids and self.id_to_info | |
ids_set = set(ids_to_delete) | |
self.ids = [id_ for id_ in self.ids if id_ not in ids_set] | |
for id_ in ids_to_delete: | |
self.id_to_info.pop(id_, None) | |
logger.info(f'Successfully deleted {len(ids_to_delete)} records from chromaDB.') | |
def clear(self): | |
with self.lock: | |
self.chroma_client.reset() | |
self.ids = [] | |
self.chroma_client.delete_collection(name=self.name) | |
self.collection = self.chroma_client.create_collection(name=self.name, embedding_function=embedder) | |
logger.info('Successfully cleared all records and reset chromaDB.') | |
def make_collector(): | |
return ChromaCollector() | |