Spaces:
Sleeping
Sleeping
from typing import List, Optional, Tuple, Union | |
import numpy as np | |
from transformers import (AlbertModel, AlbertTokenizer, BartModel, | |
BartTokenizer, BertModel, BertTokenizer, | |
CamembertModel, CamembertTokenizer, CTRLModel, | |
CTRLTokenizer, DistilBertModel, DistilBertTokenizer, | |
GPT2Model, GPT2Tokenizer, LongformerModel, | |
LongformerTokenizer, OpenAIGPTModel, | |
OpenAIGPTTokenizer, PreTrainedModel, | |
PreTrainedTokenizer, RobertaModel, RobertaTokenizer, | |
TransfoXLModel, TransfoXLTokenizer, XLMModel, | |
XLMTokenizer, XLNetModel, XLNetTokenizer) | |
from extractive_summarizer.bert_parent import BertParent | |
from extractive_summarizer.cluster_features import ClusterFeatures | |
from extractive_summarizer.sentence_handler import SentenceHandler | |
class ModelProcessor(object): | |
aggregate_map = { | |
'mean': np.mean, | |
'min': np.min, | |
'median': np.median, | |
'max': np.max, | |
} | |
def __init__( | |
self, | |
model: str = 'bert-large-uncased', | |
custom_model: PreTrainedModel = None, | |
custom_tokenizer: PreTrainedTokenizer = None, | |
hidden: Union[List[int], int] = -2, | |
reduce_option: str = 'mean', | |
sentence_handler: SentenceHandler = SentenceHandler(), | |
random_state: int = 12345, | |
hidden_concat: bool = False, | |
gpu_id: int = 0, | |
): | |
""" | |
This is the parent Bert Summarizer model. New methods should implement this class. | |
:param model: This parameter is associated with the inherit string parameters from the transformers library. | |
:param custom_model: If you have a pre-trained model, you can add the model class here. | |
:param custom_tokenizer: If you have a custom tokenizer, you can add the tokenizer here. | |
:param hidden: This signifies which layer(s) of the BERT model you would like to use as embeddings. | |
:param reduce_option: Given the output of the bert model, this param determines how you want to reduce results. | |
:param sentence_handler: The handler to process sentences. If want to use coreference, instantiate and pass. | |
CoreferenceHandler instance | |
:param random_state: The random state to reproduce summarizations. | |
:param hidden_concat: Whether or not to concat multiple hidden layers. | |
:param gpu_id: GPU device index if CUDA is available. | |
""" | |
np.random.seed(random_state) | |
self.model = BertParent(model, custom_model, custom_tokenizer, gpu_id) | |
self.hidden = hidden | |
self.reduce_option = reduce_option | |
self.sentence_handler = sentence_handler | |
self.random_state = random_state | |
self.hidden_concat = hidden_concat | |
def cluster_runner( | |
self, | |
content: List[str], | |
ratio: float = 0.2, | |
algorithm: str = 'kmeans', | |
use_first: bool = True, | |
num_sentences: int = None | |
) -> Tuple[List[str], np.ndarray]: | |
""" | |
Runs the cluster algorithm based on the hidden state. Returns both the embeddings and sentences. | |
:param content: Content list of sentences. | |
:param ratio: The ratio to use for clustering. | |
:param algorithm: Type of algorithm to use for clustering. | |
:param use_first: Return the first sentence in the output (helpful for news stories, etc). | |
:param num_sentences: Number of sentences to use for summarization. | |
:return: A tuple of summarized sentences and embeddings | |
""" | |
if num_sentences is not None: | |
num_sentences = num_sentences if use_first else num_sentences | |
hidden = self.model( | |
content, self.hidden, self.reduce_option, hidden_concat=self.hidden_concat) | |
hidden_args = ClusterFeatures( | |
hidden, algorithm, random_state=self.random_state).cluster(ratio, num_sentences) | |
if use_first: | |
if not hidden_args: | |
hidden_args.append(0) | |
elif hidden_args[0] != 0: | |
hidden_args.insert(0, 0) | |
sentences = [content[j] for j in hidden_args] | |
embeddings = np.asarray([hidden[j] for j in hidden_args]) | |
return sentences, embeddings | |
def __run_clusters( | |
self, | |
content: List[str], | |
ratio: float = 0.2, | |
algorithm: str = 'kmeans', | |
use_first: bool = True, | |
num_sentences: int = None | |
) -> List[str]: | |
""" | |
Runs clusters and returns sentences. | |
:param content: The content of sentences. | |
:param ratio: Ratio to use for for clustering. | |
:param algorithm: Algorithm selection for clustering. | |
:param use_first: Whether to use first sentence | |
:param num_sentences: Number of sentences. Overrides ratio. | |
:return: summarized sentences | |
""" | |
sentences, _ = self.cluster_runner( | |
content, ratio, algorithm, use_first, num_sentences) | |
return sentences | |
def __retrieve_summarized_embeddings( | |
self, | |
content: List[str], | |
ratio: float = 0.2, | |
algorithm: str = 'kmeans', | |
use_first: bool = True, | |
num_sentences: int = None | |
) -> np.ndarray: | |
""" | |
Retrieves embeddings of the summarized sentences. | |
:param content: The content of sentences. | |
:param ratio: Ratio to use for for clustering. | |
:param algorithm: Algorithm selection for clustering. | |
:param use_first: Whether to use first sentence | |
:return: Summarized embeddings | |
""" | |
_, embeddings = self.cluster_runner( | |
content, ratio, algorithm, use_first, num_sentences) | |
return embeddings | |
def calculate_elbow( | |
self, | |
body: str, | |
algorithm: str = 'kmeans', | |
min_length: int = 40, | |
max_length: int = 600, | |
k_max: int = None, | |
) -> List[float]: | |
""" | |
Calculates elbow across the clusters. | |
:param body: The input body to summarize. | |
:param algorithm: The algorithm to use for clustering. | |
:param min_length: The min length to use. | |
:param max_length: The max length to use. | |
:param k_max: The maximum number of clusters to search. | |
:return: List of elbow inertia values. | |
""" | |
sentences = self.sentence_handler(body, min_length, max_length) | |
if k_max is None: | |
k_max = len(sentences) - 1 | |
hidden = self.model(sentences, self.hidden, | |
self.reduce_option, hidden_concat=self.hidden_concat) | |
elbow = ClusterFeatures( | |
hidden, algorithm, random_state=self.random_state).calculate_elbow(k_max) | |
return elbow | |
def calculate_optimal_k( | |
self, | |
body: str, | |
algorithm: str = 'kmeans', | |
min_length: int = 40, | |
max_length: int = 600, | |
k_max: int = None, | |
): | |
""" | |
Calculates the optimal Elbow K. | |
:param body: The input body to summarize. | |
:param algorithm: The algorithm to use for clustering. | |
:param min_length: The min length to use. | |
:param max_length: The max length to use. | |
:param k_max: The maximum number of clusters to search. | |
:return: | |
""" | |
sentences = self.sentence_handler(body, min_length, max_length) | |
if k_max is None: | |
k_max = len(sentences) - 1 | |
hidden = self.model(sentences, self.hidden, | |
self.reduce_option, hidden_concat=self.hidden_concat) | |
optimal_k = ClusterFeatures( | |
hidden, algorithm, random_state=self.random_state).calculate_optimal_cluster(k_max) | |
return optimal_k | |
def run_embeddings( | |
self, | |
body: str, | |
ratio: float = 0.2, | |
min_length: int = 40, | |
max_length: int = 600, | |
use_first: bool = True, | |
algorithm: str = 'kmeans', | |
num_sentences: int = None, | |
aggregate: str = None, | |
) -> Optional[np.ndarray]: | |
""" | |
Preprocesses the sentences, runs the clusters to find the centroids, then combines the embeddings. | |
:param body: The raw string body to process | |
:param ratio: Ratio of sentences to use | |
:param min_length: Minimum length of sentence candidates to utilize for the summary. | |
:param max_length: Maximum length of sentence candidates to utilize for the summary | |
:param use_first: Whether or not to use the first sentence | |
:param algorithm: Which clustering algorithm to use. (kmeans, gmm) | |
:param num_sentences: Number of sentences to use. Overrides ratio. | |
:param aggregate: One of mean, median, max, min. Applied on zero axis | |
:return: A summary embedding | |
""" | |
sentences = self.sentence_handler(body, min_length, max_length) | |
if sentences: | |
embeddings = self.__retrieve_summarized_embeddings( | |
sentences, ratio, algorithm, use_first, num_sentences) | |
if aggregate is not None: | |
assert aggregate in [ | |
'mean', 'median', 'max', 'min'], "aggregate must be mean, min, max, or median" | |
embeddings = self.aggregate_map[aggregate](embeddings, axis=0) | |
return embeddings | |
return None | |
def run( | |
self, | |
body: str, | |
ratio: float = 0.2, | |
min_length: int = 40, | |
max_length: int = 600, | |
use_first: bool = True, | |
algorithm: str = 'kmeans', | |
num_sentences: int = None, | |
return_as_list: bool = False | |
) -> Union[List, str]: | |
""" | |
Preprocesses the sentences, runs the clusters to find the centroids, then combines the sentences. | |
:param body: The raw string body to process | |
:param ratio: Ratio of sentences to use | |
:param min_length: Minimum length of sentence candidates to utilize for the summary. | |
:param max_length: Maximum length of sentence candidates to utilize for the summary | |
:param use_first: Whether or not to use the first sentence | |
:param algorithm: Which clustering algorithm to use. (kmeans, gmm) | |
:param num_sentences: Number of sentences to use (overrides ratio). | |
:param return_as_list: Whether or not to return sentences as list. | |
:return: A summary sentence | |
""" | |
sentences = self.sentence_handler(body, min_length, max_length) | |
if sentences: | |
sentences = self.__run_clusters( | |
sentences, ratio, algorithm, use_first, num_sentences) | |
if return_as_list: | |
return sentences | |
else: | |
return ' '.join(sentences) | |
def __call__( | |
self, | |
body: str, | |
ratio: float = 0.2, | |
min_length: int = 40, | |
max_length: int = 600, | |
use_first: bool = True, | |
algorithm: str = 'kmeans', | |
num_sentences: int = None, | |
return_as_list: bool = False, | |
) -> str: | |
""" | |
(utility that wraps around the run function) | |
Preprocesses the sentences, runs the clusters to find the centroids, then combines the sentences. | |
:param body: The raw string body to process. | |
:param ratio: Ratio of sentences to use. | |
:param min_length: Minimum length of sentence candidates to utilize for the summary. | |
:param max_length: Maximum length of sentence candidates to utilize for the summary. | |
:param use_first: Whether or not to use the first sentence. | |
:param algorithm: Which clustering algorithm to use. (kmeans, gmm) | |
:param Number of sentences to use (overrides ratio). | |
:param return_as_list: Whether or not to return sentences as list. | |
:return: A summary sentence. | |
""" | |
return self.run( | |
body, ratio, min_length, max_length, algorithm=algorithm, use_first=use_first, num_sentences=num_sentences, | |
return_as_list=return_as_list | |
) | |
class Summarizer(ModelProcessor): | |
def __init__( | |
self, | |
model: str = 'bert-large-uncased', | |
custom_model: PreTrainedModel = None, | |
custom_tokenizer: PreTrainedTokenizer = None, | |
hidden: Union[List[int], int] = -2, | |
reduce_option: str = 'mean', | |
sentence_handler: SentenceHandler = SentenceHandler(), | |
random_state: int = 12345, | |
hidden_concat: bool = False, | |
gpu_id: int = 0, | |
): | |
""" | |
This is the main Bert Summarizer class. | |
:param model: This parameter is associated with the inherit string parameters from the transformers library. | |
:param custom_model: If you have a pre-trained model, you can add the model class here. | |
:param custom_tokenizer: If you have a custom tokenizer, you can add the tokenizer here. | |
:param hidden: This signifies which layer of the BERT model you would like to use as embeddings. | |
:param reduce_option: Given the output of the bert model, this param determines how you want to reduce results. | |
:param greedyness: associated with the neuralcoref library. Determines how greedy coref should be. | |
:param language: Which language to use for training. | |
:param random_state: The random state to reproduce summarizations. | |
:param hidden_concat: Whether or not to concat multiple hidden layers. | |
:param gpu_id: GPU device index if CUDA is available. | |
""" | |
super(Summarizer, self).__init__( | |
model, custom_model, custom_tokenizer, hidden, reduce_option, sentence_handler, random_state, hidden_concat, gpu_id | |
) | |
class TransformerSummarizer(ModelProcessor): | |
""" | |
Another type of Summarizer class to choose keyword based model and tokenizer | |
""" | |
MODEL_DICT = { | |
'Bert': (BertModel, BertTokenizer), | |
'OpenAIGPT': (OpenAIGPTModel, OpenAIGPTTokenizer), | |
'GPT2': (GPT2Model, GPT2Tokenizer), | |
'CTRL': (CTRLModel, CTRLTokenizer), | |
'TransfoXL': (TransfoXLModel, TransfoXLTokenizer), | |
'XLNet': (XLNetModel, XLNetTokenizer), | |
'XLM': (XLMModel, XLMTokenizer), | |
'DistilBert': (DistilBertModel, DistilBertTokenizer), | |
} | |
def __init__( | |
self, | |
transformer_type: str = 'Bert', | |
transformer_model_key: str = 'bert-base-uncased', | |
transformer_tokenizer_key: str = None, | |
hidden: Union[List[int], int] = -2, | |
reduce_option: str = 'mean', | |
sentence_handler: SentenceHandler = SentenceHandler(), | |
random_state: int = 12345, | |
hidden_concat: bool = False, | |
gpu_id: int = 0, | |
): | |
""" | |
:param transformer_type: The Transformer type, such as Bert, GPT2, DistilBert, etc. | |
:param transformer_model_key: The transformer model key. This is the directory for the model. | |
:param transformer_tokenizer_key: The transformer tokenizer key. This is the tokenizer directory. | |
:param hidden: The hidden output layers to use for the summarization. | |
:param reduce_option: The reduce option, such as mean, max, min, median, etc. | |
:param sentence_handler: The sentence handler class to process the raw text. | |
:param random_state: The random state to use. | |
:param hidden_concat: Deprecated hidden concat option. | |
:param gpu_id: GPU device index if CUDA is available. | |
""" | |
try: | |
self.MODEL_DICT['Roberta'] = (RobertaModel, RobertaTokenizer) | |
self.MODEL_DICT['Albert'] = (AlbertModel, AlbertTokenizer) | |
self.MODEL_DICT['Camembert'] = (CamembertModel, CamembertTokenizer) | |
self.MODEL_DICT['Bart'] = (BartModel, BartTokenizer) | |
self.MODEL_DICT['Longformer'] = (LongformerModel, LongformerTokenizer) | |
except Exception: | |
pass # older transformer version | |
model_clz, tokenizer_clz = self.MODEL_DICT[transformer_type] | |
model = model_clz.from_pretrained( | |
transformer_model_key, output_hidden_states=True) | |
tokenizer = tokenizer_clz.from_pretrained( | |
transformer_tokenizer_key if transformer_tokenizer_key is not None else transformer_model_key | |
) | |
super().__init__( | |
None, model, tokenizer, hidden, reduce_option, sentence_handler, random_state, hidden_concat, gpu_id | |
) | |