import random import logging from datasets import load_dataset, Dataset from sentence_transformers import ( SentenceTransformer, SentenceTransformerTrainer, SentenceTransformerTrainingArguments, SentenceTransformerModelCardData, ) from typing import Any, Dict, Iterable import torch from torch import nn from sentence_transformers.losses import MultipleNegativesRankingLoss, MultipleNegativesSymmetricRankingLoss from sentence_transformers import util from sentence_transformers.training_args import BatchSamplers from sentence_transformers.evaluation import InformationRetrievalEvaluator logging.basicConfig( format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO ) # 1. Load a model to finetune with 2. (Optional) model card data model = SentenceTransformer( "microsoft/mpnet-base", model_card_data=SentenceTransformerModelCardData( language="en", license="apache-2.0", model_name="MPNet base trained on Natural Questions pairs", ), ) model_name = "mpnet-base-natural-questions-mnsrl" # 3. Load a dataset to finetune on dataset = load_dataset("sentence-transformers/natural-questions", split="train") dataset = dataset.add_column("id", range(len(dataset))) train_dataset: Dataset = dataset.select(range(90_000)) eval_dataset: Dataset = dataset.select(range(90_000, len(dataset))) # 4. Define a loss function class ImprovedContrastiveLoss(nn.Module): def __init__(self, model: SentenceTransformer, temperature: float = 0.01): super(ImprovedContrastiveLoss, self).__init__() self.model = model self.temperature = temperature def forward(self, sentence_features: Iterable[Dict[str, torch.Tensor]], labels: torch.Tensor = None) -> torch.Tensor: # Get the embeddings for each sentence in the batch embeddings = [self.model(sentence_feature)['sentence_embedding'] for sentence_feature in sentence_features] query_embeddings = embeddings[0] doc_embeddings = embeddings[1] # Compute similarity scores similarity_q_d = util.cos_sim(query_embeddings, doc_embeddings) similarity_q_q = util.cos_sim(query_embeddings, query_embeddings) similarity_d_d = util.cos_sim(doc_embeddings, doc_embeddings) # Move the similarity range from [-1, 1] to [-2, 0] to avoid overflow similarity_q_d = similarity_q_d - 1 similarity_q_q = similarity_q_q - 1 similarity_d_d = similarity_d_d - 1 # Compute the partition function exp_sim_q_d = torch.exp(similarity_q_d / self.temperature) exp_sim_q_q = torch.exp(similarity_q_q / self.temperature) exp_sim_d_d = torch.exp(similarity_d_d / self.temperature) # Ensure the diagonal is not considered in negative samples mask = torch.eye(similarity_q_d.size(0), device=similarity_q_d.device).bool() exp_sim_q_q = exp_sim_q_q.masked_fill(mask, 0) exp_sim_d_d = exp_sim_d_d.masked_fill(mask, 0) partition_function = exp_sim_q_d.sum(dim=1) + exp_sim_q_d.sum(dim=0) + exp_sim_q_q.sum(dim=1) + exp_sim_d_d.sum(dim=0) # Compute the loss loss = -torch.log(exp_sim_q_d.diag() / partition_function).mean() return loss def get_config_dict(self) -> Dict[str, Any]: return {"temperature": self.temperature} # loss = ImprovedContrastiveLoss(model) loss = MultipleNegativesSymmetricRankingLoss(model) # 5. (Optional) Specify training arguments args = SentenceTransformerTrainingArguments( # Required parameter: output_dir=f"models/{model_name}", # Optional training parameters: num_train_epochs=1, per_device_train_batch_size=32, per_device_eval_batch_size=32, learning_rate=2e-5, warmup_ratio=0.1, fp16=False, # Set to False if you get an error that your GPU can't run on FP16 bf16=True, # Set to True if you have a GPU that supports BF16 batch_sampler=BatchSamplers.NO_DUPLICATES, # MultipleNegativesRankingLoss benefits from no duplicate samples in a batch # Optional tracking/debugging parameters: eval_strategy="steps", eval_steps=100, save_strategy="steps", save_steps=100, save_total_limit=2, logging_steps=100, logging_first_step=True, run_name=model_name, # Will be used in W&B if `wandb` is installed ) # 6. (Optional) Create an evaluator & evaluate the base model # The full corpus, but only the evaluation queries queries = dict(zip(eval_dataset["id"], eval_dataset["query"])) corpus = {cid: dataset[cid]["answer"] for cid in range(20_000)} | {cid: dataset[cid]["answer"] for cid in eval_dataset["id"]} relevant_docs = {qid: {qid} for qid in eval_dataset["id"]} dev_evaluator = InformationRetrievalEvaluator( corpus=corpus, queries=queries, relevant_docs=relevant_docs, show_progress_bar=True, name="natural-questions-dev", ) dev_evaluator(model) # 7. Create a trainer & train trainer = SentenceTransformerTrainer( model=model, args=args, train_dataset=train_dataset.remove_columns("id"), eval_dataset=eval_dataset.remove_columns("id"), loss=loss, evaluator=dev_evaluator, ) trainer.train() # (Optional) Evaluate the trained model on the evaluator after training dev_evaluator(model) # 8. Save the trained model model.save_pretrained(f"models/{model_name}/final") # 9. (Optional) Push it to the Hugging Face Hub model.push_to_hub(f"{model_name}")