rag_milvus / main.py
januarevan's picture
init
054de85
from io import BytesIO
from fastapi import FastAPI, Form, Depends, Request, File, UploadFile
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from pymilvus import connections
import os
import pypdf
from uuid import uuid4
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pymilvus import MilvusClient, db, utility, Collection, CollectionSchema, FieldSchema, DataType
from sentence_transformers import SentenceTransformer
import torch
from milvus_singleton import MilvusClientSingleton
os.environ['HF_HOME'] = '/app/cache'
os.environ['HF_MODULES_CACHE'] = '/app/cache/hf_modules'
embedding_model = SentenceTransformer('Alibaba-NLP/gte-large-en-v1.5',
trust_remote_code=True,
device='cuda' if torch.cuda.is_available() else 'cpu',
cache_folder='/app/cache'
)
collection_name="rag"
# milvus_client = MilvusClientSingleton.get_instance(uri="/app/milvus_data/milvus_demo.db")
milvus_client = MilvusClient(uri="/app/milvus_data/milvus_demo.db")
def document_to_embeddings(content:str) -> list:
return embedding_model.encode(content, show_progress_bar=True)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replace with the list of allowed origins for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def split_documents(document_data):
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=10)
return splitter.split_text(document_data)
def create_a_collection(milvus_client, collection_name):
# Define the fields for the collection
id_field = FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=40, is_primary=True)
content_field = FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=4096)
vector_field = FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024)
# Define the schema for the collection
schema = CollectionSchema(fields=[id_field, content_field, vector_field])
# Create the collection
milvus_client.create_collection(
collection_name=collection_name,
schema=schema
)
connections.connect(uri="/app/milvus_data/milvus_demo.db")
collection = Collection(name=collection_name)
# Create an index for the collection
# IVF_FLAT index is used here, with metric_type COSINE
index_params = {
"index_type": "FLAT",
"metric_type": "COSINE",
"params": {
"nlist": 128
}
}
# Create the index on the vector field
collection.create_index(
field_name="vector",
index_params=index_params # Pass the dictionary, not a string
)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.post("/insert")
async def insert(file: UploadFile = File(...)):
contents = await file.read()
if not milvus_client.has_collection(collection_name):
create_a_collection(milvus_client, collection_name)
contents = pypdf.PdfReader(BytesIO(contents))
extracted_text = ""
for page_num in range(len(contents.pages)):
page = contents.pages[page_num]
extracted_text += page.extract_text()
splitted_document_data = split_documents(extracted_text)
print(splitted_document_data)
data_objects = []
for doc in splitted_document_data:
data = {
"id": str(uuid4()),
"vector": document_to_embeddings(doc),
"content": doc,
}
data_objects.append(data)
print(data_objects)
try:
milvus_client.insert(collection_name=collection_name, data=data_objects)
except Exception as e:
raise JSONResponse(status_code=500, content={"error": str(e)})
else:
return JSONResponse(status_code=200, content={"result": 'good'})
class RAGRequest(BaseModel):
question: str
@app.post("/rag")
async def rag(request: RAGRequest):
question = request.question
if not question:
return JSONResponse(status_code=400, content={"message": "Please a question!"})
try:
search_res = milvus_client.search(
collection_name=collection_name,
data=[
document_to_embeddings(question)
],
limit=5, # Return top 3 results
# search_params={"metric_type": "COSINE"}, # Inner product distance
output_fields=["content"], # Return the text field
)
retrieved_lines_with_distances = [
(res["entity"]["content"]) for res in search_res[0]
]
return JSONResponse(status_code=200, content={"result": retrieved_lines_with_distances})
except Exception as e:
return JSONResponse(status_code=400, content={"error": str(e)})