|
|
|
|
|
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
from functools import wraps
|
|
from threading import Lock, Timer
|
|
from typing import List
|
|
|
|
|
|
import numpy as np
|
|
import onnxruntime as ort
|
|
import requests
|
|
from transformers import AutoTokenizer, AutoModel
|
|
import torch
|
|
|
|
|
|
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
|
|
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
|
|
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loaded_config = load_comprehensive_config()
|
|
embedding_provider = loaded_config['Embeddings']['embedding_provider']
|
|
embedding_model = loaded_config['Embeddings']['embedding_model']
|
|
embedding_api_url = loaded_config['Embeddings']['embedding_api_url']
|
|
embedding_api_key = loaded_config['Embeddings']['embedding_api_key']
|
|
model_dir = loaded_config['Embeddings'].get('model_dir', './App_Function_Libraries/models/embedding_models/')
|
|
|
|
|
|
chunk_size = loaded_config['Embeddings']['chunk_size']
|
|
overlap = loaded_config['Embeddings']['overlap']
|
|
|
|
|
|
embedding_models = {}
|
|
|
|
|
|
commit_hashes = {
|
|
"jinaai/jina-embeddings-v3": "4be32c2f5d65b95e4bcce473545b7883ec8d2edd",
|
|
"Alibaba-NLP/gte-large-en-v1.5": "104333d6af6f97649377c2afbde10a7704870c7b",
|
|
"dunzhang/setll_en_400M_v5": "2aa5579fcae1c579de199a3866b6e514bbbf5d10"
|
|
}
|
|
|
|
class HuggingFaceEmbedder:
|
|
def __init__(self, model_name, cache_dir, timeout_seconds=30):
|
|
self.model_name = model_name
|
|
self.cache_dir = cache_dir
|
|
self.tokenizer = None
|
|
self.model = None
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
self.timeout_seconds = timeout_seconds
|
|
self.last_used_time = 0
|
|
self.unload_timer = None
|
|
log_counter("huggingface_embedder_init", labels={"model_name": model_name})
|
|
|
|
def load_model(self):
|
|
log_counter("huggingface_model_load_attempt", labels={"model_name": self.model_name})
|
|
start_time = time.time()
|
|
|
|
if self.model is None:
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(
|
|
self.model_name,
|
|
trust_remote_code=True,
|
|
cache_dir=self.cache_dir,
|
|
revision=commit_hashes.get(self.model_name, None)
|
|
)
|
|
self.model = AutoModel.from_pretrained(
|
|
self.model_name,
|
|
trust_remote_code=True,
|
|
cache_dir=self.cache_dir,
|
|
revision=commit_hashes.get(self.model_name, None)
|
|
)
|
|
self.model.to(self.device)
|
|
self.last_used_time = time.time()
|
|
self.reset_timer()
|
|
load_time = time.time() - start_time
|
|
log_histogram("huggingface_model_load_duration", load_time, labels={"model_name": self.model_name})
|
|
log_counter("huggingface_model_load_success", labels={"model_name": self.model_name})
|
|
|
|
def unload_model(self):
|
|
log_counter("huggingface_model_unload", labels={"model_name": self.model_name})
|
|
if self.model is not None:
|
|
del self.model
|
|
del self.tokenizer
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
self.model = None
|
|
self.tokenizer = None
|
|
if self.unload_timer:
|
|
self.unload_timer.cancel()
|
|
|
|
def reset_timer(self):
|
|
if self.unload_timer:
|
|
self.unload_timer.cancel()
|
|
self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
|
|
self.unload_timer.start()
|
|
|
|
def create_embeddings(self, texts):
|
|
log_counter("huggingface_create_embeddings_attempt", labels={"model_name": self.model_name})
|
|
start_time = time.time()
|
|
self.load_model()
|
|
|
|
inputs = self.tokenizer(
|
|
texts,
|
|
return_tensors="pt",
|
|
padding=True,
|
|
truncation=True,
|
|
max_length=512
|
|
)
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()}
|
|
try:
|
|
with torch.no_grad():
|
|
outputs = self.model(**inputs)
|
|
embeddings = outputs.last_hidden_state.mean(dim=1)
|
|
return embeddings.cpu().float().numpy()
|
|
except RuntimeError as e:
|
|
if "Got unsupported ScalarType BFloat16" in str(e):
|
|
logging.warning("BFloat16 not supported. Falling back to float32.")
|
|
|
|
self.model = self.model.float()
|
|
with torch.no_grad():
|
|
outputs = self.model(**inputs)
|
|
embeddings = outputs.last_hidden_state.mean(dim=1)
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("huggingface_create_embeddings_duration", embedding_time,
|
|
labels={"model_name": self.model_name})
|
|
log_counter("huggingface_create_embeddings_success", labels={"model_name": self.model_name})
|
|
return embeddings.cpu().float().numpy()
|
|
else:
|
|
log_counter("huggingface_create_embeddings_failure", labels={"model_name": self.model_name})
|
|
raise
|
|
|
|
class ONNXEmbedder:
|
|
def __init__(self, model_name, onnx_model_dir, timeout_seconds=30):
|
|
self.model_name = model_name
|
|
self.model_path = os.path.join(onnx_model_dir, f"{model_name}.onnx")
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(
|
|
model_name,
|
|
trust_remote_code=True,
|
|
cache_dir=onnx_model_dir,
|
|
revision=commit_hashes.get(model_name, None)
|
|
)
|
|
self.session = None
|
|
self.timeout_seconds = timeout_seconds
|
|
self.last_used_time = 0
|
|
self.unload_timer = None
|
|
self.device = "cpu"
|
|
log_counter("onnx_embedder_init", labels={"model_name": model_name})
|
|
|
|
def load_model(self):
|
|
log_counter("onnx_model_load_attempt", labels={"model_name": self.model_name})
|
|
start_time = time.time()
|
|
if self.session is None:
|
|
if not os.path.exists(self.model_path):
|
|
raise FileNotFoundError(f"ONNX model not found at {self.model_path}")
|
|
logging.info(f"Loading ONNX model from {self.model_path}")
|
|
self.session = ort.InferenceSession(self.model_path)
|
|
self.last_used_time = time.time()
|
|
self.reset_timer()
|
|
load_time = time.time() - start_time
|
|
log_histogram("onnx_model_load_duration", load_time, labels={"model_name": self.model_name})
|
|
log_counter("onnx_model_load_success", labels={"model_name": self.model_name})
|
|
|
|
def unload_model(self):
|
|
log_counter("onnx_model_unload", labels={"model_name": self.model_name})
|
|
if self.session is not None:
|
|
logging.info("Unloading ONNX model to free resources.")
|
|
self.session = None
|
|
if self.unload_timer:
|
|
self.unload_timer.cancel()
|
|
|
|
def reset_timer(self):
|
|
if self.unload_timer:
|
|
self.unload_timer.cancel()
|
|
self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
|
|
self.unload_timer.start()
|
|
|
|
def create_embeddings(self, texts: List[str]) -> List[List[float]]:
|
|
log_counter("onnx_create_embeddings_attempt", labels={"model_name": self.model_name})
|
|
start_time = time.time()
|
|
self.load_model()
|
|
try:
|
|
inputs = self.tokenizer(
|
|
texts,
|
|
return_tensors="np",
|
|
padding=True,
|
|
truncation=True,
|
|
max_length=512
|
|
)
|
|
input_ids = inputs["input_ids"].astype(np.int64)
|
|
attention_mask = inputs["attention_mask"].astype(np.int64)
|
|
|
|
ort_inputs = {
|
|
"input_ids": input_ids,
|
|
"attention_mask": attention_mask
|
|
}
|
|
|
|
ort_outputs = self.session.run(None, ort_inputs)
|
|
|
|
last_hidden_state = ort_outputs[0]
|
|
embeddings = np.mean(last_hidden_state, axis=1)
|
|
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("onnx_create_embeddings_duration", embedding_time, labels={"model_name": self.model_name})
|
|
log_counter("onnx_create_embeddings_success", labels={"model_name": self.model_name})
|
|
return embeddings.tolist()
|
|
except Exception as e:
|
|
log_counter("onnx_create_embeddings_failure", labels={"model_name": self.model_name})
|
|
logging.error(f"Error creating embeddings with ONNX model: {str(e)}")
|
|
raise
|
|
|
|
class RateLimiter:
|
|
def __init__(self, max_calls, period):
|
|
self.max_calls = max_calls
|
|
self.period = period
|
|
self.calls = []
|
|
self.lock = Lock()
|
|
|
|
def __call__(self, func):
|
|
def wrapper(*args, **kwargs):
|
|
with self.lock:
|
|
now = time.time()
|
|
self.calls = [call for call in self.calls if call > now - self.period]
|
|
if len(self.calls) >= self.max_calls:
|
|
sleep_time = self.calls[0] - (now - self.period)
|
|
time.sleep(sleep_time)
|
|
self.calls.append(time.time())
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
def exponential_backoff(max_retries=5, base_delay=1):
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
delay = base_delay * (2 ** attempt)
|
|
logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}")
|
|
time.sleep(delay)
|
|
return wrapper
|
|
return decorator
|
|
|
|
@exponential_backoff()
|
|
@RateLimiter(max_calls=50, period=60)
|
|
def create_embeddings_batch(texts: List[str],
|
|
provider: str,
|
|
model: str,
|
|
api_url: str,
|
|
timeout_seconds: int = 300
|
|
) -> List[List[float]]:
|
|
global embedding_models
|
|
log_counter("create_embeddings_batch_attempt", labels={"provider": provider, "model": model})
|
|
start_time = time.time()
|
|
|
|
try:
|
|
if provider.lower() == 'huggingface':
|
|
if model not in embedding_models:
|
|
if model == "dunzhang/stella_en_400M_v5":
|
|
embedding_models[model] = ONNXEmbedder(model, model_dir, timeout_seconds)
|
|
else:
|
|
|
|
embedding_models[model] = HuggingFaceEmbedder(model, model_dir, timeout_seconds)
|
|
embedder = embedding_models[model]
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("create_embeddings_batch_duration", embedding_time,
|
|
labels={"provider": provider, "model": model})
|
|
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model})
|
|
return embedder.create_embeddings(texts)
|
|
|
|
elif provider.lower() == 'openai':
|
|
logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API")
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("create_embeddings_batch_duration", embedding_time,
|
|
labels={"provider": provider, "model": model})
|
|
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model})
|
|
return [create_openai_embedding(text, model) for text in texts]
|
|
|
|
elif provider.lower() == 'local':
|
|
response = requests.post(
|
|
api_url,
|
|
json={"texts": texts, "model": model},
|
|
headers={"Authorization": f"Bearer {embedding_api_key}"}
|
|
)
|
|
if response.status_code == 200:
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("create_embeddings_batch_duration", embedding_time,
|
|
labels={"provider": provider, "model": model})
|
|
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model})
|
|
return response.json()['embeddings']
|
|
else:
|
|
raise Exception(f"Error from local API: {response.text}")
|
|
else:
|
|
raise ValueError(f"Unsupported embedding provider: {provider}")
|
|
except Exception as e:
|
|
log_counter("create_embeddings_batch_error", labels={"provider": provider, "model": model, "error": str(e)})
|
|
logging.error(f"Error in create_embeddings_batch: {str(e)}")
|
|
raise
|
|
|
|
def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]:
|
|
log_counter("create_embedding_attempt", labels={"provider": provider, "model": model})
|
|
start_time = time.time()
|
|
embedding = create_embeddings_batch([text], provider, model, api_url)[0]
|
|
if isinstance(embedding, np.ndarray):
|
|
embedding = embedding.tolist()
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("create_embedding_duration", embedding_time, labels={"provider": provider, "model": model})
|
|
log_counter("create_embedding_success", labels={"provider": provider, "model": model})
|
|
return embedding
|
|
|
|
def create_openai_embedding(text: str, model: str) -> List[float]:
|
|
log_counter("create_openai_embedding_attempt", labels={"model": model})
|
|
start_time = time.time()
|
|
embedding = get_openai_embeddings(text, model)
|
|
embedding_time = time.time() - start_time
|
|
log_histogram("create_openai_embedding_duration", embedding_time, labels={"model": model})
|
|
log_counter("create_openai_embedding_success", labels={"model": model})
|
|
return embedding
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|