pimcore-ytest / src /vector_db.py
sergeipetrov's picture
Update src/vector_db.py
192f6d9 verified
raw
history blame
No virus
3.11 kB
from transformers import AutoConfig
from sentence_transformers import SentenceTransformer
import lancedb
import torch
import pyarrow as pa
import pandas as pd
import numpy as np
import tqdm
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class VectorDB:
vector_column = "vector"
description_column = "description"
name_column = "name"
table_name = "pimcore_actions"
emb_model = ''
db_location = ''
def __init__(self, emb_model, db_location, actions_list_file_path, num_sub_vectors, batch_size):
self.retriever = SentenceTransformer(emb_model)
emb_config = AutoConfig.from_pretrained(emb_model)
emb_dimension = emb_config.hidden_size
assert emb_dimension % num_sub_vectors == 0, \
"Embedding size must be divisible by the num of sub vectors"
print('Model loaded...')
print(emb_model)
model = SentenceTransformer(emb_model)
model.eval()
if torch.backends.mps.is_available():
device = "mps"
elif torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
print(f"Device: {device}")
db = lancedb.connect(db_location)
schema = pa.schema(
[
pa.field(self.vector_column, pa.list_(pa.float32(), emb_dimension)),
pa.field(self.description_column, pa.string()),
pa.field(self.name_column, pa.string())
]
)
tbl = db.create_table(self.table_name, schema=schema, mode="overwrite")
df = pd.read_csv(actions_list_file_path)
sentences = df.values
print("Starting vector generation")
for i in tqdm.tqdm(range(0, int(np.ceil(len(sentences) / batch_size)))):
try:
batch = [sent for sent in sentences[i * batch_size:(i + 1) * batch_size] if len(sent) > 0]
to_encode = [entry[1] for entry in batch]
names = [entry[0] for entry in batch]
encoded = model.encode(to_encode, normalize_embeddings=True, device=device)
encoded = [list(vec) for vec in encoded]
df = pd.DataFrame({
self.vector_column: encoded,
self.description_column: to_encode,
self.name_column: names
})
tbl.add(df)
except:
print(f"batch {i} was skipped")
self.db = db
self.table = tbl
print("Vector generation done.")
# def get_embedding_db_as_pandas(self):
# db = lancedb.connect(self.db_location)
# tbl = db.open_table(self.table_name)
# return tbl.to_pandas()
def retrieve_prefiltered_hits(self, query, k):
query_vec = self.retriever.encode(query)
logger.info('encoded')
documents = self.table.search(query_vec, vector_column_name=self.vector_column).limit(k).to_list()
names = [doc[self.name_column] for doc in documents]
descriptions = [doc[self.description_column] for doc in documents]
logger.info('done topK lookup')
return names, descriptions