Bot_Test / script /document_uploader.py
dsmultimedika's picture
fix : improve error llamaparse
647b702
import logging
import random
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.openai import OpenAIEmbedding
from config import PINECONE_CONFIG
from pinecone.grpc import PineconeGRPC as Pinecone
from script.get_metadata import Metadata
from fastapi import UploadFile, status
from fastapi.responses import JSONResponse
from llama_index.core.node_parser import (
SentenceSplitter,
SemanticSplitterNodeParser,
)
from llama_index.core import Settings
from service.reader_v4 import upload_file
class Uploader:
def __init__(self, reference, file: UploadFile):
self.file = file
self.reference = reference
self.metadata = Metadata(reference)
def check_existing_metadata(self, pinecone_index, title, random_vector):
try:
result = pinecone_index.query(
vector=random_vector,
top_k=1,
filter={
"title": {"$eq": title},
},
)
return result["matches"]
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=f"Error check existing metadata {str(e)}",
)
async def process_documents(self):
# Ingest documents
documents_with_metadata, file_stream = await upload_file(self.reference, self.file)
if isinstance(documents_with_metadata, JSONResponse):
return documents_with_metadata # Return the error response directly
embed_model = OpenAIEmbedding(model="text-embedding-3-large")
Settings.embed_model = embed_model
# Set up the ingestion pipeline
pipeline = IngestionPipeline(
transformations=[
SemanticSplitterNodeParser(
buffer_size=1,
breakpoint_percentile_threshold=95,
embed_model=embed_model,
),
]
)
# Run the pipeline
try:
print("Pipeline processing completed with Semantic Spliter.")
nodes_with_metadata = pipeline.run(documents=documents_with_metadata)
return nodes_with_metadata, file_stream
except Exception as e:
try:
# If the first method fails, fallback to sentence splitter
sentence_splitter = SentenceSplitter(chunk_size=512)
nodes_with_metadata = sentence_splitter.get_nodes_from_documents(
documents_with_metadata
)
print("Pipeline processing completed with SentenceSplitter fallback.")
return nodes_with_metadata, file_stream
except Exception as fallback_error:
# Log the second error and return JSONResponse for FastAPI
logging.error(f"Error with SentenceSplitter fallback: {fallback_error}")
return JSONResponse(
status_code=500,
content="An internal server error occurred during pipeline processing.",
)
def filter_document(self, documents):
api_key = PINECONE_CONFIG.PINECONE_API_KEY
client = Pinecone(api_key=api_key)
pinecone_index = client.Index("test")
random_vector = [random.uniform(0, 1) for _ in range(1536)]
filtered_documents = []
for doc in documents:
result = self.check_existing_metadata(
pinecone_index, doc.metadata["title"], random_vector
)
if len(result) == 0:
filtered_documents.append(doc)
return filtered_documents