|
|
|
|
|
|
|
|
|
import logging
|
|
import time
|
|
from functools import wraps
|
|
from threading import Lock, Timer
|
|
from typing import List
|
|
|
|
|
|
import requests
|
|
from transformers import AutoTokenizer, AutoModel
|
|
import torch
|
|
|
|
|
|
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
|
|
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loaded_config = load_comprehensive_config()
|
|
embedding_provider = loaded_config['Embeddings']['embedding_provider']
|
|
embedding_model = loaded_config['Embeddings']['embedding_model']
|
|
embedding_api_url = loaded_config['Embeddings']['embedding_api_url']
|
|
embedding_api_key = loaded_config['Embeddings']['embedding_api_key']
|
|
|
|
|
|
chunk_size = loaded_config['Embeddings']['chunk_size']
|
|
overlap = loaded_config['Embeddings']['overlap']
|
|
|
|
|
|
|
|
|
|
class HuggingFaceEmbedder:
|
|
def __init__(self, model_name, timeout_seconds=120):
|
|
self.model_name = model_name
|
|
self.tokenizer = None
|
|
self.model = None
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
self.timeout_seconds = timeout_seconds
|
|
self.last_used_time = 0
|
|
self.unload_timer = None
|
|
|
|
def load_model(self):
|
|
if self.model is None:
|
|
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
|
|
self.model = AutoModel.from_pretrained(self.model_name)
|
|
self.model.to(self.device)
|
|
self.last_used_time = time.time()
|
|
self.reset_timer()
|
|
|
|
def unload_model(self):
|
|
if self.model is not None:
|
|
del self.model
|
|
del self.tokenizer
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
self.model = None
|
|
self.tokenizer = None
|
|
if self.unload_timer:
|
|
self.unload_timer.cancel()
|
|
|
|
def reset_timer(self):
|
|
if self.unload_timer:
|
|
self.unload_timer.cancel()
|
|
self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
|
|
self.unload_timer.start()
|
|
|
|
def create_embeddings(self, texts):
|
|
self.load_model()
|
|
inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512)
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()}
|
|
with torch.no_grad():
|
|
outputs = self.model(**inputs)
|
|
embeddings = outputs.last_hidden_state.mean(dim=1)
|
|
return embeddings.cpu().numpy()
|
|
|
|
|
|
huggingface_embedder = None
|
|
|
|
|
|
class RateLimiter:
|
|
def __init__(self, max_calls, period):
|
|
self.max_calls = max_calls
|
|
self.period = period
|
|
self.calls = []
|
|
self.lock = Lock()
|
|
|
|
def __call__(self, func):
|
|
def wrapper(*args, **kwargs):
|
|
with self.lock:
|
|
now = time.time()
|
|
self.calls = [call for call in self.calls if call > now - self.period]
|
|
if len(self.calls) >= self.max_calls:
|
|
sleep_time = self.calls[0] - (now - self.period)
|
|
time.sleep(sleep_time)
|
|
self.calls.append(time.time())
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def exponential_backoff(max_retries=5, base_delay=1):
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
delay = base_delay * (2 ** attempt)
|
|
logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}")
|
|
time.sleep(delay)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
|
|
@exponential_backoff()
|
|
@RateLimiter(max_calls=50, period=60)
|
|
def create_embeddings_batch(texts: List[str], provider: str, model: str, api_url: str, timeout_seconds: int = 300) -> \
|
|
List[List[float]]:
|
|
global huggingface_embedder
|
|
|
|
if provider.lower() == 'huggingface':
|
|
if huggingface_embedder is None or huggingface_embedder.model_name != model:
|
|
if huggingface_embedder is not None:
|
|
huggingface_embedder.unload_model()
|
|
huggingface_embedder = HuggingFaceEmbedder(model, timeout_seconds)
|
|
|
|
embeddings = huggingface_embedder.create_embeddings(texts).tolist()
|
|
return embeddings
|
|
|
|
elif provider.lower() == 'openai':
|
|
logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API")
|
|
return [create_openai_embedding(text, model) for text in texts]
|
|
|
|
elif provider.lower() == 'local':
|
|
response = requests.post(
|
|
api_url,
|
|
json={"texts": texts, "model": model},
|
|
headers={"Authorization": f"Bearer {embedding_api_key}"}
|
|
)
|
|
if response.status_code == 200:
|
|
return response.json()['embeddings']
|
|
else:
|
|
raise Exception(f"Error from local API: {response.text}")
|
|
else:
|
|
raise ValueError(f"Unsupported embedding provider: {provider}")
|
|
|
|
|
|
def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]:
|
|
return create_embeddings_batch([text], provider, model, api_url)[0]
|
|
|
|
|
|
def create_stella_embeddings(text: str) -> List[float]:
|
|
if embedding_provider == 'local':
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained("dunzhang/stella_en_400M_v5")
|
|
model = AutoModel.from_pretrained("dunzhang/stella_en_400M_v5")
|
|
|
|
|
|
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
|
|
|
|
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
|
|
|
|
embeddings = outputs.last_hidden_state.mean(dim=1)
|
|
|
|
return embeddings[0].tolist()
|
|
elif embedding_provider == 'openai':
|
|
return get_openai_embeddings(text, embedding_model)
|
|
else:
|
|
raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
|
|
|
|
|
|
def create_openai_embedding(text: str, model: str) -> List[float]:
|
|
embedding = get_openai_embeddings(text, model)
|
|
return embedding
|
|
|
|
|
|
|
|
|
|
|