File size: 4,871 Bytes
43cd37c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# eval_Chroma_Embeddings.py
# Description: This script is used to evaluate the embeddings and chunking process for the ChromaDB model.
#
# Imports
import io
from typing import List
#
# External Imports
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.utils import embedding_functions
from chunking_evaluation import BaseChunker, rigorous_document_search
from chunking_evaluation import BaseChunker, GeneralEvaluation
from chunking_evaluation.evaluation_framework.base_evaluation import BaseEvaluation
#
# Local Imports
from App_Function_Libraries.Chunk_Lib import improved_chunking_process
from App_Function_Libraries.RAG.ChromaDB_Library import embedding_model, embedding_api_url
from App_Function_Libraries.RAG.Embeddings_Create import create_embeddings_batch, embedding_provider
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
########################################################################################################################
#
# Functions:
import chardet
# FIXME
def detect_file_encoding(file_path):
with open(file_path, 'rb') as file:
raw_data = file.read()
print(chardet.detect(raw_data)['encoding'])
return chardet.detect(raw_data)['encoding']
class CustomEmbeddingFunction(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
# Load config here
config = load_comprehensive_config()
embedding_provider = config.get('Embeddings', 'embedding_provider', fallback='openai')
embedding_model = config.get('Embeddings', 'embedding_model', fallback='text-embedding-3-small')
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
# Use your existing create_embeddings_batch function
embeddings = create_embeddings_batch(input, embedding_provider, embedding_model, embedding_api_url)
return embeddings
class CustomChunker(BaseChunker):
def __init__(self, chunk_options):
self.chunk_options = chunk_options
def split_text(self, text: str) -> List[str]:
# Use your existing improved_chunking_process function
chunks = improved_chunking_process(text, self.chunk_options)
return [chunk['text'] for chunk in chunks]
def read_file(self, file_path: str) -> str:
encoding = detect_file_encoding(file_path)
with open(file_path, 'r', encoding=encoding) as file:
return file.read()
def utf8_file_reader(file_path):
with io.open(file_path, 'r', encoding='utf-8') as file:
return file.read()
class CustomEvaluation(BaseEvaluation):
def _get_chunks_and_metadata(self, splitter):
documents = []
metadatas = []
for corpus_id in self.corpus_list:
corpus_path = corpus_id
if self.corpora_id_paths is not None:
corpus_path = self.corpora_id_paths[corpus_id]
corpus = splitter.read_file(corpus_path)
current_documents = splitter.split_text(corpus)
current_metadatas = []
for document in current_documents:
try:
_, start_index, end_index = rigorous_document_search(corpus, document)
except:
print(f"Error in finding {document} in {corpus_id}")
raise Exception(f"Error in finding {document} in {corpus_id}")
current_metadatas.append({"start_index": start_index, "end_index": end_index, "corpus_id": corpus_id})
documents.extend(current_documents)
metadatas.extend(current_metadatas)
return documents, metadatas
# Instantiate your custom chunker
chunk_options = {
'method': 'words',
'max_size': 400,
'overlap': 200,
'adaptive': False,
'multi_level': False,
'language': 'english'
}
custom_chunker = CustomChunker(chunk_options)
# Instantiate your custom embedding function
custom_ef = CustomEmbeddingFunction()
# Evaluate the embedding function
# Evaluate the chunker
evaluation = GeneralEvaluation()
import chardet
def smart_file_reader(file_path):
encoding = detect_file_encoding(file_path)
with io.open(file_path, 'r', encoding=encoding) as file:
return file.read()
# Set the custom file reader
#evaluation._file_reader = smart_file_reader
# Generate Embedding results
embedding_results = evaluation.run(custom_chunker, custom_ef)
print(f"Embedding Results:\n\t{embedding_results}")
# Generate Chunking results
chunk_results = evaluation.run(custom_chunker, custom_ef)
print(f"Chunking Results:\n\t{chunk_results}")
#
# End of File
########################################################################################################################
|