e5 / app.py
Tonic's picture
Update app.py
89a387a verified
raw
history blame
14.7 kB
import spaces
import torch
import torch.nn.functional as F
from torch import Tensor
from transformers import AutoTokenizer, AutoModel
import threading
import queue
import gradio as gr
import os
title = """
# 👋🏻Welcome to 🙋🏻‍♂️Tonic's 🐣e5-mistral🛌🏻Embeddings """
description = """
You can use this ZeroGPU Space to test out the current model [intfloat/e5-mistral-7b-instruct](https://huggingface.co/intfloat/e5-mistral-7b-instruct). 🐣e5-mistral🛌🏻 has a larger context🪟window, a different prompting/return🛠️mechanism and generally better results than other embedding models. use it via API to create embeddings or try out the sentence similarity to see how various optimization parameters affect performance.
You can also use 🐣e5-mistral🛌🏻 by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/Tonic/e5?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3>
Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community 👻 [![Join us on Discord](https://img.shields.io/discord/1109943800132010065?label=Discord&logo=discord&style=flat-square)](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [Poly](https://github.com/tonic-ai/poly) 🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗
"""
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:30'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tasks = {
'ArguAna': 'Given a claim, find documents that refute the claim',
'ClimateFEVER': 'Given a claim about climate change, retrieve documents that support or refute the claim',
'DBPedia': 'Given a query, retrieve relevant entity descriptions from DBPedia',
'FEVER': 'Given a claim, retrieve documents that support or refute the claim',
'FiQA2018': 'Given a financial question, retrieve user replies that best answer the question',
'HotpotQA': 'Given a multi-hop question, retrieve documents that can help answer the question',
'MSMARCO': 'Given a web search query, retrieve relevant passages that answer the query',
'NFCorpus': 'Given a question, retrieve relevant documents that best answer the question',
'NQ': 'Given a question, retrieve Wikipedia passages that answer the question',
'QuoraRetrieval': 'Given a question, retrieve questions that are semantically equivalent to the given question',
'SCIDOCS': 'Given a scientific paper title, retrieve paper abstracts that are cited by the given paper',
'SciFact': 'Given a scientific claim, retrieve documents that support or refute the claim',
'Touche2020': 'Given a question, retrieve detailed and persuasive arguments that answer the question',
'TRECCOVID': 'Given a query on COVID-19, retrieve documents that answer the query',
}
# Global queue for embedding requests
embedding_request_queue = queue.Queue()
embedding_response_queue = queue.Queue()
tokenizer = AutoTokenizer.from_pretrained('intfloat/e5-mistral-7b-instruct')
model = AutoModel.from_pretrained('intfloat/e5-mistral-7b-instruct', torch_dtype=torch.float16, device_map=device)
def last_token_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor:
left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0])
if left_padding:
return last_hidden_states[:, -1]
else:
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = last_hidden_states.shape[0]
return last_hidden_states[torch.arange(batch_size, device=last_hidden_states.device), sequence_lengths]
def clear_cuda_cache():
torch.cuda.empty_cache()
def free_memory(*args):
for arg in args:
del arg
def load_corpus_from_json(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
def embedding_worker():
while True:
# Wait for an item in the queue
item = embedding_request_queue.get()
if item is None:
break
selected_task, input_text = item
embeddings = compute_embeddings(selected_task, input_text)
formatted_response = format_response(embeddings)
embedding_response_queue.put(formatted_response)
embedding_request_queue.task_done()
clear_cuda_cache()
threading.Thread(target=embedding_worker, daemon=True).start()
@spaces.GPU
def compute_embeddings(selected_task, input_text):
try:
task_description = tasks[selected_task]
except KeyError:
print(f"Selected task not found: {selected_task}")
return f"Error: Task '{selected_task}' not found. Please select a valid task."
max_length = 2048
processed_texts = [f'Instruct: {task_description}\nQuery: {input_text}']
batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True)
batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']]
batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt')
batch_dict = {k: v.to(device) for k, v in batch_dict.items()}
outputs = model(**batch_dict)
embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask'])
embeddings = F.normalize(embeddings, p=2, dim=1)
embeddings_list = embeddings.detach().cpu().numpy().tolist()
clear_cuda_cache()
return embeddings_list
@spaces.GPU
def compute_similarity(selected_task, sentence1, sentence2, extra_sentence1, extra_sentence2):
try:
task_description = tasks[selected_task]
except KeyError:
print(f"Selected task not found: {selected_task}")
return f"Error: Task '{selected_task}' not found. Please select a valid task."
# Compute embeddings for each sentence
embeddings1 = compute_embeddings(selected_task, sentence1)
embeddings2 = compute_embeddings(selected_task, sentence2)
embeddings3 = compute_embeddings(selected_task, extra_sentence1)
embeddings4 = compute_embeddings(selected_task, extra_sentence2)
# Convert embeddings to tensors
embeddings_tensor1 = torch.tensor(embeddings1).to(device).half()
embeddings_tensor2 = torch.tensor(embeddings2).to(device).half()
embeddings_tensor3 = torch.tensor(embeddings3).to(device).half()
embeddings_tensor4 = torch.tensor(embeddings4).to(device).half()
# Compute cosine similarity
similarity1 = compute_cosine_similarity(embeddings1, embeddings2)
similarity2 = compute_cosine_similarity(embeddings1, embeddings3)
similarity3 = compute_cosine_similarity(embeddings1, embeddings4)
# Free memory
free_memory(embeddings1, embeddings2, embeddings3, embeddings4)
similarity_scores = {"Similarity 1-2": similarity1, "Similarity 1-3": similarity2, "Similarity 1-4": similarity3}
clear_cuda_cache()
return similarity_scores
@spaces.GPU
def compute_cosine_similarity(emb1, emb2):
tensor1 = torch.tensor(emb1).to(device).half()
tensor2 = torch.tensor(emb2).to(device).half()
similarity = F.cosine_similarity(tensor1, tensor2).item()
free_memory(tensor1, tensor2)
clear_cuda_cache()
return similarity
@spaces.GPU
def compute_embeddings_batch(input_texts):
max_length = 2042
processed_texts = [f'Instruct: {task_description}\nQuery: {text}' for text in input_texts]
batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True)
batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']]
batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt')
batch_dict = {k: v.to(device) for k, v in batch_dict.items()}
outputs = model(**batch_dict)
embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask'])
embeddings = F.normalize(embeddings, p=2, dim=1)
clear_cuda_cache()
return embeddings.detach().cpu().numpy()
def semantic_search(query_embedding, corpus_embeddings, top_k=5):
scores = np.dot(corpus_embeddings, query_embedding.T).flatten()
top_k_indices = np.argsort(scores)[::-1][:top_k]
return top_k_indices, scores[top_k_indices]
def search_similar_sentences(input_question, corpus_sentences, corpus_embeddings):
question_embedding = compute_embeddings_batch([input_question])[0]
top_k_indices, top_k_scores = semantic_search(question_embedding, corpus_embeddings)
results = [(corpus_sentences[i], top_k_scores[i]) for i in top_k_indices]
return results
# openai response object formatting
def format_response(embeddings):
return {
"data": [
{
"embedding": embeddings,
"index": 0,
"object": "embedding"
}
],
"model": "e5-mistral",
"object": "list",
"usage": {
"prompt_tokens": 17,
"total_tokens": 17
}
}
def generate_and_format_embeddings(selected_task, input_text):
embedding_request_queue.put((selected_task, input_text))
response = embedding_response_queue.get()
embedding_response_queue.task_done()
clear_cuda_cache()
return response
def app_interface():
corpus_sentences = []
corpus_embeddings = []
with gr.Blocks() as demo:
gr.Markdown(title)
gr.Markdown(description)
with gr.Row():
task_dropdown = gr.Dropdown(list(tasks.keys()), label="Select a Task", value=list(tasks.keys())[0])
with gr.Tab("Embedding Generation"):
input_text_box = gr.Textbox(label="📖Input Text")
compute_button = gr.Button("Try🐣🛌🏻e5")
output_display = gr.Textbox(label="🐣e5-mistral🛌🏻 Embeddings")
compute_button.click(
fn=compute_embeddings,
inputs=[task_dropdown, input_text_box],
outputs=output_display
)
with gr.Tab("Sentence Similarity"):
sentence1_box = gr.Textbox(label="'Focus Sentence' - The 'Subject'")
sentence2_box = gr.Textbox(label="'Input Sentence' - 1")
extra_sentence1_box = gr.Textbox(label="'Input Sentence' - 2")
extra_sentence2_box = gr.Textbox(label="'Input Sentence' - 3")
similarity_button = gr.Button("Compute Similarity")
similarity_output = gr.Textbox(label="🐣e5-mistral🛌🏻 Similarity Scores")
similarity_button.click(
fn=compute_similarity,
inputs=[task_dropdown, sentence1_box, sentence2_box, extra_sentence1_box, extra_sentence2_box],
outputs=similarity_output
)
with gr.Tab("Load Corpus"):
json_uploader = gr.File(label="Upload JSON File")
load_corpus_button = gr.Button("Load Corpus")
corpus_status = gr.Textbox(label="Corpus Status", value="Corpus not loaded")
def load_corpus(file_info):
if file_info is None:
return "No file uploaded. Please upload a JSON file."
try:
global corpus_sentences, corpus_embeddings
corpus_sentences = load_corpus_from_json(file_info['name'])
corpus_embeddings = compute_embeddings_batch(corpus_sentences)
return "Corpus loaded successfully with {} sentences.".format(len(corpus_sentences))
except Exception as e:
return "Error loading corpus: {}".format(e)
load_corpus_button.click(
fn=load_corpus,
inputs=json_uploader,
outputs=corpus_status
)
with gr.Tab("Semantic Search"):
input_question_box = gr.Textbox(label="Enter your question")
search_button = gr.Button("Search")
search_results_output = gr.Textbox(label="Search Results")
def perform_search(input_question):
if not corpus_sentences or not corpus_embeddings:
return "Corpus is not loaded. Please load a corpus first."
return search_similar_sentences(input_question, corpus_sentences, corpus_embeddings)
search_button.click(
fn=perform_search,
inputs=input_question_box,
outputs=search_results_output
)
with gr.Tab("Connector-like Embeddings"):
with gr.Row():
input_text_box_connector = gr.Textbox(label="Input Text", placeholder="Enter text or array of texts")
model_dropdown_connector = gr.Dropdown(label="Model", choices=["ArguAna", "ClimateFEVER", "DBPedia", "FEVER", "FiQA2018", "HotpotQA", "MSMARCO", "NFCorpus", "NQ", "QuoraRetrieval", "SCIDOCS", "SciFact", "Touche2020", "TRECCOVID"], value="text-embedding-ada-002")
encoding_format_connector = gr.Radio(label="Encoding Format", choices=["float", "base64"], value="float")
user_connector = gr.Textbox(label="User", placeholder="Enter user identifier (optional)")
submit_button_connector = gr.Button("Generate Embeddings")
output_display_connector = gr.JSON(label="Embeddings Output")
submit_button_connector.click(
fn=generate_and_format_embeddings,
inputs=[model_dropdown_connector, input_text_box_connector],
outputs=output_display_connector
)
with gr.Row():
with gr.Column():
input_text_box
with gr.Column():
compute_button
output_display
return demo
app_interface().queue()
app_interface().launch()