from llama_index.core import VectorStoreIndex from llama_index.core import StorageContext # from llama_index.core import Settings from pinecone import Pinecone, ServerlessSpec from llama_index.vector_stores.pinecone import PineconeVectorStore from fastapi import HTTPException, status from fastapi.responses import JSONResponse from config import PINECONE_CONFIG from math import ceil import numpy as np import logging class IndexManager: def __init__(self, index_name: str = "multimodal-index"): self.vector_index = None self.index_name = index_name self.client = self._get_pinecone_client() self.pinecone_index = self._create_pinecone_index() def _get_pinecone_client(self): """Initialize and return the Pinecone client.""" # api_key = os.getenv("PINECONE_API_KEY") api_key = PINECONE_CONFIG.PINECONE_API_KEY if not api_key: raise ValueError( "Pinecone API key is missing. Please set it in environment variables." ) return Pinecone(api_key=api_key) def _create_pinecone_index(self): """Create Pinecone index if it doesn't already exist.""" if self.index_name not in self.client.list_indexes().names(): self.client.create_index( name=self.index_name, dimension=3072, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) return self.client.Index(self.index_name) def _initialize_vector_store(self) -> StorageContext: """Initialize and return the vector store with the Pinecone index.""" vector_store = PineconeVectorStore(pinecone_index=self.pinecone_index) return StorageContext.from_defaults(vector_store=vector_store) def build_indexes(self, nodes): """Build vector and tree indexes from nodes.""" try: storage_context = self._initialize_vector_store() self.vector_index = VectorStoreIndex(nodes, storage_context=storage_context) except HTTPException as http_exc: raise http_exc # Re-return JSONResponses to ensure FastAPI handles them except Exception as e: print("Error building index : ",e) raise JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=f"Error loading existing indexes: {str(e)}" ) def get_ids_from_query(self, input_vector, title): print("Searching Pinecone...") print(title) new_ids = set() # Initialize new_ids outside the loop while True: results = self.pinecone_index.query( vector=input_vector, top_k=10000, filter={ "title": {"$eq": f"{title}"}, }, ) ids = set() for result in results['matches']: ids.add(result['id']) # Check if there's any overlap between ids and new_ids if ids.issubset(new_ids): break else: new_ids.update(ids) # Add all new ids to new_ids return new_ids def get_all_ids_from_index(self, title): num_dimensions = 1536 num_vectors = self.pinecone_index.describe_index_stats( )["total_vector_count"] input_vector = np.random.rand(num_dimensions).tolist() ids = self.get_ids_from_query(input_vector, title) return ids def delete_vector_database(self, title): try : batch_size = 1000 all_ids = self.get_all_ids_from_index(title) all_ids = list(all_ids) # Split ids into chunks of batch_size num_batches = ceil(len(all_ids) / batch_size) for i in range(num_batches): # Fetch a batch of IDs batch_ids = all_ids[i * batch_size: (i + 1) * batch_size] self.pinecone_index.delete(ids=batch_ids) logging.info(f"delete from id {i * batch_size} to {(i + 1) * batch_size} successful") except Exception as e: return JSONResponse(status_code=500, content="An error occurred while delete metadata") def update_vector_database(self, current_reference, new_reference): reference = new_reference all_ids = self.get_all_ids_from_index(current_reference['title']) all_ids = list(all_ids) for id in all_ids: self.pinecone_index.update( id=id, set_metadata=reference ) def load_existing_indexes(self): """Load existing indexes from Pinecone.""" try: client = self._get_pinecone_client() pinecone_index = client.Index(self.index_name) vector_store = PineconeVectorStore(pinecone_index=pinecone_index) retriever = VectorStoreIndex.from_vector_store(vector_store) return retriever except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=f"Error loading existing indexes: {str(e)}" )