import spaces import torch import torch.nn.functional as F from torch import Tensor from transformers import AutoTokenizer, AutoModel import threading import queue import gradio as gr import os title = """ # 👋🏻Welcome to 🙋🏻‍♂️Tonic's 🐣e5-mistral🛌🏻Embeddings """ description = """ You can use this ZeroGPU Space to test out the current model [intfloat/e5-mistral-7b-instruct](https://huggingface.co/intfloat/e5-mistral-7b-instruct). 🐣e5-mistral🛌🏻 has a larger context🪟window, a different prompting/return🛠️mechanism and generally better results than other embedding models. use it via API to create embeddings or try out the sentence similarity to see how various optimization parameters affect performance. You can also use 🐣e5-mistral🛌🏻 by cloning this space. 🧬🔬🔍 Simply click here: Duplicate Space Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community 👻 [![Join us on Discord](https://img.shields.io/discord/1109943800132010065?label=Discord&logo=discord&style=flat-square)](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [Poly](https://github.com/tonic-ai/poly) 🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 """ os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:30' device = torch.device("cuda" if torch.cuda.is_available() else "cpu") tasks = { 'ArguAna': 'Given a claim, find documents that refute the claim', 'ClimateFEVER': 'Given a claim about climate change, retrieve documents that support or refute the claim', 'DBPedia': 'Given a query, retrieve relevant entity descriptions from DBPedia', 'FEVER': 'Given a claim, retrieve documents that support or refute the claim', 'FiQA2018': 'Given a financial question, retrieve user replies that best answer the question', 'HotpotQA': 'Given a multi-hop question, retrieve documents that can help answer the question', 'MSMARCO': 'Given a web search query, retrieve relevant passages that answer the query', 'NFCorpus': 'Given a question, retrieve relevant documents that best answer the question', 'NQ': 'Given a question, retrieve Wikipedia passages that answer the question', 'QuoraRetrieval': 'Given a question, retrieve questions that are semantically equivalent to the given question', 'SCIDOCS': 'Given a scientific paper title, retrieve paper abstracts that are cited by the given paper', 'SciFact': 'Given a scientific claim, retrieve documents that support or refute the claim', 'Touche2020': 'Given a question, retrieve detailed and persuasive arguments that answer the question', 'TRECCOVID': 'Given a query on COVID-19, retrieve documents that answer the query', } # Global queue for embedding requests embedding_request_queue = queue.Queue() embedding_response_queue = queue.Queue() tokenizer = AutoTokenizer.from_pretrained('intfloat/e5-mistral-7b-instruct') model = AutoModel.from_pretrained('intfloat/e5-mistral-7b-instruct', torch_dtype=torch.float16, device_map=device) def last_token_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) if left_padding: return last_hidden_states[:, -1] else: sequence_lengths = attention_mask.sum(dim=1) - 1 batch_size = last_hidden_states.shape[0] return last_hidden_states[torch.arange(batch_size, device=last_hidden_states.device), sequence_lengths] def clear_cuda_cache(): torch.cuda.empty_cache() def free_memory(*args): for arg in args: del arg def load_corpus_from_json(file_path): with open(file_path, 'r') as file: data = json.load(file) return data def embedding_worker(): while True: # Wait for an item in the queue item = embedding_request_queue.get() if item is None: break selected_task, input_text = item embeddings = compute_embeddings(selected_task, input_text) formatted_response = format_response(embeddings) embedding_response_queue.put(formatted_response) embedding_request_queue.task_done() clear_cuda_cache() threading.Thread(target=embedding_worker, daemon=True).start() @spaces.GPU def compute_embeddings(selected_task, input_text): try: task_description = tasks[selected_task] except KeyError: print(f"Selected task not found: {selected_task}") return f"Error: Task '{selected_task}' not found. Please select a valid task." max_length = 2048 processed_texts = [f'Instruct: {task_description}\nQuery: {input_text}'] batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') batch_dict = {k: v.to(device) for k, v in batch_dict.items()} outputs = model(**batch_dict) embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) embeddings = F.normalize(embeddings, p=2, dim=1) embeddings_list = embeddings.detach().cpu().numpy().tolist() clear_cuda_cache() return embeddings_list @spaces.GPU def decode_embedding(embedding_str): try: embedding = [float(num) for num in embedding_str.split(',')] embedding_tensor = torch.tensor(embedding, dtype=torch.float16, device=device) decoded_embedding = tokenizer.decode(embedding_tensor[0], skip_special_tokens=True) return decoded_embedding.cpu().numpy().tolist() except Exception as e: return f"Error in decoding: {str(e)}" @spaces.GPU def compute_similarity(selected_task, sentence1, sentence2, extra_sentence1, extra_sentence2): try: task_description = tasks[selected_task] except KeyError: print(f"Selected task not found: {selected_task}") return f"Error: Task '{selected_task}' not found. Please select a valid task." # Compute embeddings for each sentence embeddings1 = compute_embeddings(selected_task, sentence1) embeddings2 = compute_embeddings(selected_task, sentence2) embeddings3 = compute_embeddings(selected_task, extra_sentence1) embeddings4 = compute_embeddings(selected_task, extra_sentence2) # Convert embeddings to tensors embeddings_tensor1 = torch.tensor(embeddings1).to(device).half() embeddings_tensor2 = torch.tensor(embeddings2).to(device).half() embeddings_tensor3 = torch.tensor(embeddings3).to(device).half() embeddings_tensor4 = torch.tensor(embeddings4).to(device).half() # Compute cosine similarity similarity1 = compute_cosine_similarity(embeddings1, embeddings2) similarity2 = compute_cosine_similarity(embeddings1, embeddings3) similarity3 = compute_cosine_similarity(embeddings1, embeddings4) # Free memory free_memory(embeddings1, embeddings2, embeddings3, embeddings4) similarity_scores = {"Similarity 1-2": similarity1, "Similarity 1-3": similarity2, "Similarity 1-4": similarity3} clear_cuda_cache() return similarity_scores @spaces.GPU def compute_cosine_similarity(emb1, emb2): tensor1 = torch.tensor(emb1).to(device).half() tensor2 = torch.tensor(emb2).to(device).half() similarity = F.cosine_similarity(tensor1, tensor2).item() free_memory(tensor1, tensor2) clear_cuda_cache() return similarity @spaces.GPU def compute_embeddings_batch(input_texts): max_length = 2042 processed_texts = [f'Instruct: {task_description}\nQuery: {text}' for text in input_texts] batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') batch_dict = {k: v.to(device) for k, v in batch_dict.items()} outputs = model(**batch_dict) embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) embeddings = F.normalize(embeddings, p=2, dim=1) clear_cuda_cache() return embeddings.detach().cpu().numpy() def semantic_search(query_embedding, corpus_embeddings, top_k=5): scores = np.dot(corpus_embeddings, query_embedding.T).flatten() top_k_indices = np.argsort(scores)[::-1][:top_k] return top_k_indices, scores[top_k_indices] def search_similar_sentences(input_question, corpus_sentences, corpus_embeddings): question_embedding = compute_embeddings_batch([input_question])[0] top_k_indices, top_k_scores = semantic_search(question_embedding, corpus_embeddings) results = [(corpus_sentences[i], top_k_scores[i]) for i in top_k_indices] return results # openai response object formatting def format_response(embeddings): return { "data": [ { "embedding": embeddings, "index": 0, "object": "embedding" } ], "model": "e5-mistral", "object": "list", "usage": { "prompt_tokens": 17, "total_tokens": 17 } } def generate_and_format_embeddings(selected_task, input_text): embedding_request_queue.put((selected_task, input_text)) response = embedding_response_queue.get() embedding_response_queue.task_done() clear_cuda_cache() return response def app_interface(): corpus_sentences = [] corpus_embeddings = [] with gr.Blocks() as demo: gr.Markdown(title) gr.Markdown(description) with gr.Row(): task_dropdown = gr.Dropdown(list(tasks.keys()), label="Select a Task", value=list(tasks.keys())[0]) with gr.Tab("Embedding Generation"): input_text_box = gr.Textbox(label="📖Input Text") compute_button = gr.Button("Try🐣🛌🏻e5") output_display = gr.Textbox(label="🐣e5-mistral🛌🏻 Embeddings") compute_button.click( fn=compute_embeddings, inputs=[task_dropdown, input_text_box], outputs=output_display ) with gr.Tab("Sentence Similarity"): sentence1_box = gr.Textbox(label="'Focus Sentence' - The 'Subject'") sentence2_box = gr.Textbox(label="'Input Sentence' - 1") extra_sentence1_box = gr.Textbox(label="'Input Sentence' - 2") extra_sentence2_box = gr.Textbox(label="'Input Sentence' - 3") similarity_button = gr.Button("Compute Similarity") similarity_output = gr.Textbox(label="🐣e5-mistral🛌🏻 Similarity Scores") similarity_button.click( fn=compute_similarity, inputs=[task_dropdown, sentence1_box, sentence2_box, extra_sentence1_box, extra_sentence2_box], outputs=similarity_output ) with gr.Tab("Load Corpus"): json_uploader = gr.File(label="Upload JSON File") load_corpus_button = gr.Button("Load Corpus") corpus_status = gr.Textbox(label="Corpus Status", value="Corpus not loaded") def load_corpus(file_info): if file_info is None: return "No file uploaded. Please upload a JSON file." try: global corpus_sentences, corpus_embeddings corpus_sentences = load_corpus_from_json(file_info['name']) corpus_embeddings = compute_embeddings_batch(corpus_sentences) return "Corpus loaded successfully with {} sentences.".format(len(corpus_sentences)) except Exception as e: return "Error loading corpus: {}".format(e) load_corpus_button.click( fn=load_corpus, inputs=json_uploader, outputs=corpus_status ) with gr.Tab("Semantic Search"): input_question_box = gr.Textbox(label="Enter your question") search_button = gr.Button("Search") search_results_output = gr.Textbox(label="Search Results") def perform_search(input_question): if not corpus_sentences or not corpus_embeddings: return "Corpus is not loaded. Please load a corpus first." return search_similar_sentences(input_question, corpus_sentences, corpus_embeddings) search_button.click( fn=perform_search, inputs=input_question_box, outputs=search_results_output ) with gr.Tab("Connector-like Embeddings"): with gr.Row(): input_text_box_connector = gr.Textbox(label="Input Text", placeholder="Enter text or array of texts") model_dropdown_connector = gr.Dropdown(label="Model", choices=["ArguAna", "ClimateFEVER", "DBPedia", "FEVER", "FiQA2018", "HotpotQA", "MSMARCO", "NFCorpus", "NQ", "QuoraRetrieval", "SCIDOCS", "SciFact", "Touche2020", "TRECCOVID"], value="text-embedding-ada-002") encoding_format_connector = gr.Radio(label="Encoding Format", choices=["float", "base64"], value="float") user_connector = gr.Textbox(label="User", placeholder="Enter user identifier (optional)") submit_button_connector = gr.Button("Generate Embeddings") output_display_connector = gr.JSON(label="Embeddings Output") submit_button_connector.click( fn=generate_and_format_embeddings, inputs=[model_dropdown_connector, input_text_box_connector], outputs=output_display_connector ) # with gr.Tab("Decode Embedding"): # embedding_input = gr.Textbox(label="Enter Embedding (comma-separated floats)") # decode_button = gr.Button("Decode") # decoded_output = gr.Textbox(label="Decoded Embedding") # # decode_button.click( # fn=decode_embedding, # inputs=embedding_input, # outputs=decoded_output # ) with gr.Row(): with gr.Column(): input_text_box with gr.Column(): compute_button output_display return demo app_interface().queue() app_interface().launch()