|
|
|
|
|
|
|
|
|
import logging
|
|
from typing import List, Dict, Any
|
|
|
|
import chromadb
|
|
from chromadb import Settings
|
|
from itertools import islice
|
|
import numpy as np
|
|
|
|
|
|
from App_Function_Libraries.Chunk_Lib import chunk_for_embedding, chunk_options
|
|
from App_Function_Libraries.DB.DB_Manager import get_unprocessed_media, mark_media_as_processed
|
|
from App_Function_Libraries.DB.SQLite_DB import process_chunks
|
|
from App_Function_Libraries.RAG.Embeddings_Create import create_embedding, create_embeddings_batch
|
|
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize
|
|
from App_Function_Libraries.Utils.Utils import get_database_path, ensure_directory_exists, \
|
|
load_comprehensive_config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
config = load_comprehensive_config()
|
|
|
|
|
|
chroma_db_path = config.get('Database', 'chroma_db_path', fallback=get_database_path('chroma_db'))
|
|
ensure_directory_exists(chroma_db_path)
|
|
chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False))
|
|
|
|
|
|
embedding_provider = config.get('Embeddings', 'embedding_provider', fallback='openai')
|
|
embedding_model = config.get('Embeddings', 'embedding_model', fallback='text-embedding-3-small')
|
|
embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
|
|
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def preprocess_all_content(database, create_contextualized=True, api_name="gpt-3.5-turbo"):
|
|
unprocessed_media = get_unprocessed_media(db=database)
|
|
total_media = len(unprocessed_media)
|
|
|
|
for index, row in enumerate(unprocessed_media, 1):
|
|
media_id, content, media_type, file_name = row
|
|
collection_name = f"{media_type}_{media_id}"
|
|
|
|
logger.info(f"Processing media {index} of {total_media}: ID {media_id}, Type {media_type}")
|
|
|
|
try:
|
|
process_and_store_content(
|
|
database=database,
|
|
content=content,
|
|
collection_name=collection_name,
|
|
media_id=media_id,
|
|
file_name=file_name or f"{media_type}_{media_id}",
|
|
create_embeddings=True,
|
|
create_contextualized=create_contextualized,
|
|
api_name=api_name
|
|
)
|
|
|
|
|
|
mark_media_as_processed(database, media_id)
|
|
|
|
logger.info(f"Successfully processed media ID {media_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing media ID {media_id}: {str(e)}")
|
|
|
|
logger.info("Finished preprocessing all unprocessed content")
|
|
|
|
|
|
def batched(iterable, n):
|
|
"Batch data into lists of length n. The last batch may be shorter."
|
|
it = iter(iterable)
|
|
while True:
|
|
batch = list(islice(it, n))
|
|
if not batch:
|
|
return
|
|
yield batch
|
|
|
|
|
|
def situate_context(api_name, doc_content: str, chunk_content: str) -> str:
|
|
doc_content_prompt = f"""
|
|
<document>
|
|
{doc_content}
|
|
</document>
|
|
"""
|
|
|
|
chunk_context_prompt = f"""
|
|
\n\n\n\n\n
|
|
Here is the chunk we want to situate within the whole document
|
|
<chunk>
|
|
{chunk_content}
|
|
</chunk>
|
|
|
|
Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk.
|
|
Answer only with the succinct context and nothing else.
|
|
"""
|
|
|
|
response = summarize(chunk_context_prompt, doc_content_prompt, api_name, api_key=None, temp=0, system_message=None)
|
|
return response
|
|
|
|
|
|
|
|
def process_and_store_content(database, content: str, collection_name: str, media_id: int, file_name: str,
|
|
create_embeddings: bool = True, create_contextualized: bool = True, api_name: str = "gpt-3.5-turbo",
|
|
chunk_options = None, embedding_provider: str = None,
|
|
embedding_model: str = None, embedding_api_url: str = None):
|
|
try:
|
|
logger.info(f"Processing content for media_id {media_id} in collection {collection_name}")
|
|
|
|
chunks = chunk_for_embedding(content, file_name, chunk_options)
|
|
|
|
|
|
process_chunks(database, chunks, media_id)
|
|
|
|
if create_embeddings:
|
|
texts = []
|
|
contextualized_chunks = []
|
|
for chunk in chunks:
|
|
chunk_text = chunk['text']
|
|
if create_contextualized:
|
|
context = situate_context(api_name, content, chunk_text)
|
|
contextualized_text = f"{chunk_text}\n\nContextual Summary: {context}"
|
|
contextualized_chunks.append(contextualized_text)
|
|
else:
|
|
contextualized_chunks.append(chunk_text)
|
|
texts.append(chunk_text)
|
|
|
|
embeddings = create_embeddings_batch(contextualized_chunks, embedding_provider, embedding_model, embedding_api_url)
|
|
ids = [f"{media_id}_chunk_{i}" for i in range(1, len(chunks) + 1)]
|
|
metadatas = [{
|
|
"media_id": str(media_id),
|
|
"chunk_index": i,
|
|
"total_chunks": len(chunks),
|
|
"start_index": int(chunk['metadata']['start_index']),
|
|
"end_index": int(chunk['metadata']['end_index']),
|
|
"file_name": str(chunk['metadata']['file_name']),
|
|
"relative_position": float(chunk['metadata']['relative_position']),
|
|
"contextualized": create_contextualized,
|
|
"original_text": chunk['text'],
|
|
"contextual_summary": contextualized_chunks[i-1].split("\n\nContextual Summary: ")[-1] if create_contextualized else ""
|
|
} for i, chunk in enumerate(chunks, 1)]
|
|
|
|
store_in_chroma(collection_name, contextualized_chunks, embeddings, ids, metadatas)
|
|
|
|
|
|
mark_media_as_processed(database, media_id)
|
|
|
|
|
|
database.execute_query(
|
|
"INSERT OR REPLACE INTO media_fts (rowid, title, content) SELECT id, title, content FROM Media WHERE id = ?",
|
|
(media_id,)
|
|
)
|
|
|
|
logger.info(f"Finished processing and storing content for media_id {media_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in process_and_store_content for media_id {media_id}: {str(e)}")
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
def check_embedding_status(selected_item, item_mapping):
|
|
if not selected_item:
|
|
return "Please select an item", ""
|
|
|
|
try:
|
|
item_id = item_mapping.get(selected_item)
|
|
if item_id is None:
|
|
return f"Invalid item selected: {selected_item}", ""
|
|
|
|
item_title = selected_item.rsplit(' (', 1)[0]
|
|
collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
|
|
|
|
result = collection.get(ids=[f"doc_{item_id}"], include=["embeddings", "metadatas"])
|
|
logging.info(f"ChromaDB result for item '{item_title}' (ID: {item_id}): {result}")
|
|
|
|
if not result['ids']:
|
|
return f"No embedding found for item '{item_title}' (ID: {item_id})", ""
|
|
|
|
if not result['embeddings'] or not result['embeddings'][0]:
|
|
return f"Embedding data missing for item '{item_title}' (ID: {item_id})", ""
|
|
|
|
embedding = result['embeddings'][0]
|
|
metadata = result['metadatas'][0] if result['metadatas'] else {}
|
|
embedding_preview = str(embedding[:50])
|
|
status = f"Embedding exists for item '{item_title}' (ID: {item_id})"
|
|
return status, f"First 50 elements of embedding:\n{embedding_preview}\n\nMetadata: {metadata}"
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in check_embedding_status: {str(e)}")
|
|
return f"Error processing item: {selected_item}. Details: {str(e)}", ""
|
|
|
|
def reset_chroma_collection(collection_name: str):
|
|
try:
|
|
chroma_client.delete_collection(collection_name)
|
|
chroma_client.create_collection(collection_name)
|
|
logging.info(f"Reset ChromaDB collection: {collection_name}")
|
|
except Exception as e:
|
|
logging.error(f"Error resetting ChromaDB collection: {str(e)}")
|
|
|
|
|
|
|
|
def store_in_chroma(collection_name: str, texts: List[str], embeddings: Any, ids: List[str],
|
|
metadatas: List[Dict[str, Any]]):
|
|
|
|
if isinstance(embeddings, np.ndarray):
|
|
embeddings = embeddings.tolist()
|
|
elif not isinstance(embeddings, list):
|
|
raise TypeError("Embeddings must be either a list or a numpy array")
|
|
|
|
if not embeddings:
|
|
raise ValueError("No embeddings provided")
|
|
|
|
embedding_dim = len(embeddings[0])
|
|
|
|
logging.info(f"Storing embeddings in ChromaDB - Collection: {collection_name}")
|
|
logging.info(f"Number of embeddings: {len(embeddings)}, Dimension: {embedding_dim}")
|
|
|
|
try:
|
|
|
|
try:
|
|
collection = chroma_client.get_collection(name=collection_name)
|
|
logging.info(f"Existing collection '{collection_name}' found")
|
|
|
|
|
|
existing_embeddings = collection.get(limit=1, include=['embeddings'])['embeddings']
|
|
if existing_embeddings:
|
|
existing_dim = len(existing_embeddings[0])
|
|
if existing_dim != embedding_dim:
|
|
logging.warning(f"Embedding dimension mismatch. Existing: {existing_dim}, New: {embedding_dim}")
|
|
logging.warning("Deleting existing collection and creating a new one")
|
|
chroma_client.delete_collection(name=collection_name)
|
|
collection = chroma_client.create_collection(name=collection_name)
|
|
else:
|
|
logging.info("No existing embeddings in the collection")
|
|
except Exception as e:
|
|
logging.info(f"Collection '{collection_name}' not found. Creating new collection")
|
|
collection = chroma_client.create_collection(name=collection_name)
|
|
|
|
|
|
collection.upsert(
|
|
documents=texts,
|
|
embeddings=embeddings,
|
|
ids=ids,
|
|
metadatas=metadatas
|
|
)
|
|
logging.info(f"Successfully upserted {len(embeddings)} embeddings")
|
|
|
|
|
|
results = collection.get(ids=ids, include=["documents", "embeddings", "metadatas"])
|
|
|
|
for i, doc_id in enumerate(ids):
|
|
if results['embeddings'][i] is None:
|
|
raise ValueError(f"Failed to store embedding for {doc_id}")
|
|
else:
|
|
logging.debug(f"Embedding stored successfully for {doc_id}")
|
|
logging.debug(f"Stored document preview: {results['documents'][i][:100]}...")
|
|
logging.debug(f"Stored metadata: {results['metadatas'][i]}")
|
|
|
|
logging.info("Successfully stored and verified all embeddings in ChromaDB")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in store_in_chroma: {str(e)}")
|
|
raise
|
|
|
|
return collection
|
|
|
|
|
|
|
|
|
|
def vector_search(collection_name: str, query: str, k: int = 10) -> List[Dict[str, Any]]:
|
|
try:
|
|
collection = chroma_client.get_collection(name=collection_name)
|
|
|
|
|
|
sample_results = collection.get(limit=10, include=["metadatas"])
|
|
if not sample_results['metadatas']:
|
|
raise ValueError("No metadata found in the collection")
|
|
|
|
|
|
embedding_models = [metadata.get('embedding_model') for metadata in sample_results['metadatas'] if metadata.get('embedding_model')]
|
|
embedding_providers = [metadata.get('embedding_provider') for metadata in sample_results['metadatas'] if metadata.get('embedding_provider')]
|
|
|
|
if not embedding_models or not embedding_providers:
|
|
raise ValueError("Embedding model or provider information not found in metadata")
|
|
|
|
embedding_model = max(set(embedding_models), key=embedding_models.count)
|
|
embedding_provider = max(set(embedding_providers), key=embedding_providers.count)
|
|
|
|
logging.info(f"Using embedding model: {embedding_model} from provider: {embedding_provider}")
|
|
|
|
|
|
query_embedding = create_embedding(query, embedding_provider, embedding_model, embedding_api_url)
|
|
|
|
|
|
if isinstance(query_embedding, np.ndarray):
|
|
query_embedding = query_embedding.tolist()
|
|
|
|
results = collection.query(
|
|
query_embeddings=[query_embedding],
|
|
n_results=k,
|
|
include=["documents", "metadatas"]
|
|
)
|
|
|
|
if not results['documents'][0]:
|
|
logging.warning("No results found for the query")
|
|
return []
|
|
|
|
return [{"content": doc, "metadata": meta} for doc, meta in zip(results['documents'][0], results['metadatas'][0])]
|
|
except Exception as e:
|
|
logging.error(f"Error in vector_search: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def schedule_embedding(media_id: int, content: str, media_name: str):
|
|
try:
|
|
chunks = chunk_for_embedding(content, media_name, chunk_options)
|
|
texts = [chunk['text'] for chunk in chunks]
|
|
embeddings = create_embeddings_batch(texts, embedding_provider, embedding_model, embedding_api_url)
|
|
ids = [f"{media_id}_chunk_{i}" for i in range(len(chunks))]
|
|
metadatas = [{
|
|
"media_id": str(media_id),
|
|
"chunk_index": i,
|
|
"total_chunks": len(chunks),
|
|
"start_index": chunk['metadata']['start_index'],
|
|
"end_index": chunk['metadata']['end_index'],
|
|
"file_name": media_name,
|
|
"relative_position": chunk['metadata']['relative_position']
|
|
} for i, chunk in enumerate(chunks)]
|
|
|
|
store_in_chroma("all_content_embeddings", texts, embeddings, ids, metadatas)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error scheduling embedding for media_id {media_id}: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|