Spaces:
Running
Running
import logging | |
import random | |
from llama_index.core.ingestion import IngestionPipeline | |
from llama_index.embeddings.openai import OpenAIEmbedding | |
from config import PINECONE_CONFIG | |
from pinecone.grpc import PineconeGRPC as Pinecone | |
from script.get_metadata import Metadata | |
from fastapi import UploadFile, status | |
from fastapi.responses import JSONResponse | |
from llama_index.core.node_parser import ( | |
SentenceSplitter, | |
SemanticSplitterNodeParser, | |
) | |
from llama_index.core import Settings | |
from service.reader_v4 import upload_file | |
class Uploader: | |
def __init__(self, reference, file: UploadFile): | |
self.file = file | |
self.reference = reference | |
self.metadata = Metadata(reference) | |
def check_existing_metadata(self, pinecone_index, title, random_vector): | |
try: | |
result = pinecone_index.query( | |
vector=random_vector, | |
top_k=1, | |
filter={ | |
"title": {"$eq": title}, | |
}, | |
) | |
return result["matches"] | |
except Exception as e: | |
return JSONResponse( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
content=f"Error check existing metadata {str(e)}", | |
) | |
async def process_documents(self): | |
# Ingest documents | |
documents_with_metadata, file_stream = await upload_file(self.reference, self.file) | |
if isinstance(documents_with_metadata, JSONResponse): | |
return documents_with_metadata # Return the error response directly | |
embed_model = OpenAIEmbedding(model="text-embedding-3-large") | |
Settings.embed_model = embed_model | |
# Set up the ingestion pipeline | |
pipeline = IngestionPipeline( | |
transformations=[ | |
SemanticSplitterNodeParser( | |
buffer_size=1, | |
breakpoint_percentile_threshold=95, | |
embed_model=embed_model, | |
), | |
] | |
) | |
# Run the pipeline | |
try: | |
print("Pipeline processing completed with Semantic Spliter.") | |
nodes_with_metadata = pipeline.run(documents=documents_with_metadata) | |
return nodes_with_metadata, file_stream | |
except Exception as e: | |
try: | |
# If the first method fails, fallback to sentence splitter | |
sentence_splitter = SentenceSplitter(chunk_size=512) | |
nodes_with_metadata = sentence_splitter.get_nodes_from_documents( | |
documents_with_metadata | |
) | |
print("Pipeline processing completed with SentenceSplitter fallback.") | |
return nodes_with_metadata, file_stream | |
except Exception as fallback_error: | |
# Log the second error and return JSONResponse for FastAPI | |
logging.error(f"Error with SentenceSplitter fallback: {fallback_error}") | |
return JSONResponse( | |
status_code=500, | |
content="An internal server error occurred during pipeline processing.", | |
) | |
def filter_document(self, documents): | |
api_key = PINECONE_CONFIG.PINECONE_API_KEY | |
client = Pinecone(api_key=api_key) | |
pinecone_index = client.Index("test") | |
random_vector = [random.uniform(0, 1) for _ in range(1536)] | |
filtered_documents = [] | |
for doc in documents: | |
result = self.check_existing_metadata( | |
pinecone_index, doc.metadata["title"], random_vector | |
) | |
if len(result) == 0: | |
filtered_documents.append(doc) | |
return filtered_documents | |