File size: 4,871 Bytes
43cd37c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# eval_Chroma_Embeddings.py
# Description: This script is used to evaluate the embeddings and chunking process for the ChromaDB model.
#
# Imports
import io
from typing import List
#
# External Imports
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.utils import embedding_functions
from chunking_evaluation import BaseChunker, rigorous_document_search
from chunking_evaluation import BaseChunker, GeneralEvaluation
from chunking_evaluation.evaluation_framework.base_evaluation import BaseEvaluation

#
# Local Imports
from App_Function_Libraries.Chunk_Lib import improved_chunking_process
from App_Function_Libraries.RAG.ChromaDB_Library import embedding_model, embedding_api_url
from App_Function_Libraries.RAG.Embeddings_Create import create_embeddings_batch, embedding_provider
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
########################################################################################################################
#
# Functions:
import chardet
# FIXME


def detect_file_encoding(file_path):
    with open(file_path, 'rb') as file:
        raw_data = file.read()
        print(chardet.detect(raw_data)['encoding'])
    return chardet.detect(raw_data)['encoding']


class CustomEmbeddingFunction(EmbeddingFunction):
    def __call__(self, input: Documents) -> Embeddings:
        # Load config here
        config = load_comprehensive_config()
        embedding_provider = config.get('Embeddings', 'embedding_provider', fallback='openai')
        embedding_model = config.get('Embeddings', 'embedding_model', fallback='text-embedding-3-small')
        embedding_api_url = config.get('Embeddings', 'api_url', fallback='')

        # Use your existing create_embeddings_batch function
        embeddings = create_embeddings_batch(input, embedding_provider, embedding_model, embedding_api_url)
        return embeddings


class CustomChunker(BaseChunker):
    def __init__(self, chunk_options):
        self.chunk_options = chunk_options

    def split_text(self, text: str) -> List[str]:
        # Use your existing improved_chunking_process function
        chunks = improved_chunking_process(text, self.chunk_options)
        return [chunk['text'] for chunk in chunks]

    def read_file(self, file_path: str) -> str:
        encoding = detect_file_encoding(file_path)
        with open(file_path, 'r', encoding=encoding) as file:
            return file.read()

def utf8_file_reader(file_path):
    with io.open(file_path, 'r', encoding='utf-8') as file:
        return file.read()


class CustomEvaluation(BaseEvaluation):
    def _get_chunks_and_metadata(self, splitter):
        documents = []
        metadatas = []
        for corpus_id in self.corpus_list:
            corpus_path = corpus_id
            if self.corpora_id_paths is not None:
                corpus_path = self.corpora_id_paths[corpus_id]

            corpus = splitter.read_file(corpus_path)

            current_documents = splitter.split_text(corpus)
            current_metadatas = []
            for document in current_documents:
                try:
                    _, start_index, end_index = rigorous_document_search(corpus, document)
                except:
                    print(f"Error in finding {document} in {corpus_id}")
                    raise Exception(f"Error in finding {document} in {corpus_id}")
                current_metadatas.append({"start_index": start_index, "end_index": end_index, "corpus_id": corpus_id})
            documents.extend(current_documents)
            metadatas.extend(current_metadatas)
        return documents, metadatas


# Instantiate your custom chunker
chunk_options = {
    'method': 'words',
    'max_size': 400,
    'overlap': 200,
    'adaptive': False,
    'multi_level': False,
    'language': 'english'
}
custom_chunker = CustomChunker(chunk_options)

# Instantiate your custom embedding function
custom_ef = CustomEmbeddingFunction()


# Evaluate the embedding function

# Evaluate the chunker
evaluation = GeneralEvaluation()
import chardet

def smart_file_reader(file_path):
    encoding = detect_file_encoding(file_path)
    with io.open(file_path, 'r', encoding=encoding) as file:
        return file.read()

# Set the custom file reader
#evaluation._file_reader = smart_file_reader


# Generate Embedding results
embedding_results = evaluation.run(custom_chunker, custom_ef)
print(f"Embedding Results:\n\t{embedding_results}")

# Generate Chunking results
chunk_results = evaluation.run(custom_chunker, custom_ef)
print(f"Chunking Results:\n\t{chunk_results}")

#
# End of File
########################################################################################################################