Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import numpy as np | |
import pandas as pd | |
from transformers import ( | |
AutoTokenizer, | |
AutoModel, | |
AutoConfig, | |
TFAutoModelForSequenceClassification, | |
) | |
from tensorflow import keras | |
from sklearn.model_selection import train_test_split | |
import logging | |
import time | |
from .models import Models, ModelsByFamily # noqa: F401 | |
from .split_strategies import ( # noqa: F401 | |
SplitStrategy, | |
SplitStrategies, | |
RegexExpressions | |
) | |
from .aggregation_strategies import ( # noqa: F401 | |
AggregationStrategy, | |
AggregationStrategies | |
) | |
from .helper import ( | |
get_features, | |
softmax, | |
remove_dir, | |
make_dir, | |
copy_dir | |
) | |
AUTOSAVE_PATH = './ernie-autosave/' | |
def clean_autosave(): | |
remove_dir(AUTOSAVE_PATH) | |
class SentenceClassifier: | |
def __init__(self, | |
model_name=Models.BertBaseUncased, | |
model_path=None, | |
max_length=64, | |
labels_no=2, | |
tokenizer_kwargs=None, | |
model_kwargs=None): | |
self._loaded_data = False | |
self._model_path = None | |
if model_kwargs is None: | |
model_kwargs = {} | |
model_kwargs['num_labels'] = labels_no | |
if tokenizer_kwargs is None: | |
tokenizer_kwargs = {} | |
tokenizer_kwargs['max_len'] = max_length | |
if model_path is not None: | |
self._load_local_model(model_path) | |
else: | |
self._load_remote_model(model_name, tokenizer_kwargs, model_kwargs) | |
def model(self): | |
return self._model | |
def tokenizer(self): | |
return self._tokenizer | |
def load_dataset(self, | |
dataframe=None, | |
validation_split=0.1, | |
random_state=None, | |
stratify=None, | |
csv_path=None, | |
read_csv_kwargs=None): | |
if dataframe is None and csv_path is None: | |
raise ValueError | |
if csv_path is not None: | |
dataframe = pd.read_csv(csv_path, **read_csv_kwargs) | |
sentences = list(dataframe[dataframe.columns[0]]) | |
labels = dataframe[dataframe.columns[1]].values | |
( | |
training_sentences, | |
validation_sentences, | |
training_labels, | |
validation_labels | |
) = train_test_split( | |
sentences, | |
labels, | |
test_size=validation_split, | |
shuffle=True, | |
random_state=random_state, | |
stratify=stratify | |
) | |
self._training_features = get_features( | |
self._tokenizer, training_sentences, training_labels) | |
self._training_size = len(training_sentences) | |
self._validation_features = get_features( | |
self._tokenizer, | |
validation_sentences, | |
validation_labels | |
) | |
self._validation_split = len(validation_sentences) | |
logging.info(f'training_size: {self._training_size}') | |
logging.info(f'validation_split: {self._validation_split}') | |
self._loaded_data = True | |
def fine_tune(self, | |
epochs=4, | |
learning_rate=2e-5, | |
epsilon=1e-8, | |
clipnorm=1.0, | |
optimizer_function=keras.optimizers.Adam, | |
optimizer_kwargs=None, | |
loss_function=keras.losses.SparseCategoricalCrossentropy, | |
loss_kwargs=None, | |
accuracy_function=keras.metrics.SparseCategoricalAccuracy, | |
accuracy_kwargs=None, | |
training_batch_size=32, | |
validation_batch_size=64, | |
**kwargs): | |
if not self._loaded_data: | |
raise Exception('Data has not been loaded.') | |
if optimizer_kwargs is None: | |
optimizer_kwargs = { | |
'learning_rate': learning_rate, | |
'epsilon': epsilon, | |
'clipnorm': clipnorm | |
} | |
optimizer = optimizer_function(**optimizer_kwargs) | |
if loss_kwargs is None: | |
loss_kwargs = {'from_logits': True} | |
loss = loss_function(**loss_kwargs) | |
if accuracy_kwargs is None: | |
accuracy_kwargs = {'name': 'accuracy'} | |
accuracy = accuracy_function(**accuracy_kwargs) | |
self._model.compile(optimizer=optimizer, loss=loss, metrics=[accuracy]) | |
training_features = self._training_features.shuffle( | |
self._training_size).batch(training_batch_size).repeat(-1) | |
validation_features = self._validation_features.batch( | |
validation_batch_size) | |
training_steps = self._training_size // training_batch_size | |
if training_steps == 0: | |
training_steps = self._training_size | |
logging.info(f'training_steps: {training_steps}') | |
validation_steps = self._validation_split // validation_batch_size | |
if validation_steps == 0: | |
validation_steps = self._validation_split | |
logging.info(f'validation_steps: {validation_steps}') | |
for i in range(epochs): | |
self._model.fit(training_features, | |
epochs=1, | |
validation_data=validation_features, | |
steps_per_epoch=training_steps, | |
validation_steps=validation_steps, | |
**kwargs) | |
# The fine-tuned model does not have the same input interface | |
# after being exported and loaded again. | |
self._reload_model() | |
def predict_one( | |
self, | |
text, | |
split_strategy=None, | |
aggregation_strategy=None | |
): | |
return next( | |
self.predict([text], | |
batch_size=1, | |
split_strategy=split_strategy, | |
aggregation_strategy=aggregation_strategy)) | |
def predict( | |
self, | |
texts, | |
batch_size=32, | |
split_strategy=None, | |
aggregation_strategy=None | |
): | |
if split_strategy is None: | |
yield from self._predict_batch(texts, batch_size) | |
else: | |
if aggregation_strategy is None: | |
aggregation_strategy = AggregationStrategies.Mean | |
split_indexes = [0] | |
sentences = [] | |
for text in texts: | |
new_sentences = split_strategy.split(text, self.tokenizer) | |
if not new_sentences: | |
continue | |
split_indexes.append(split_indexes[-1] + len(new_sentences)) | |
sentences.extend(new_sentences) | |
predictions = list(self._predict_batch(sentences, batch_size)) | |
for i, split_index in enumerate(split_indexes[:-1]): | |
stop_index = split_indexes[i + 1] | |
yield aggregation_strategy.aggregate( | |
predictions[split_index:stop_index] | |
) | |
def dump(self, path): | |
if self._model_path: | |
copy_dir(self._model_path, path) | |
else: | |
self._dump(path) | |
def _dump(self, path): | |
make_dir(path) | |
make_dir(path + '/tokenizer') | |
self._model.save_pretrained(path) | |
self._tokenizer.save_pretrained(path + '/tokenizer') | |
self._config.save_pretrained(path + '/tokenizer') | |
def _predict_batch(self, sentences: list, batch_size: int): | |
sentences_number = len(sentences) | |
if batch_size > sentences_number: | |
batch_size = sentences_number | |
for i in range(0, sentences_number, batch_size): | |
input_ids_list = [] | |
attention_mask_list = [] | |
stop_index = i + batch_size | |
stop_index = stop_index if stop_index < sentences_number \ | |
else sentences_number | |
for j in range(i, stop_index): | |
features = self._tokenizer.encode_plus( | |
sentences[j], | |
add_special_tokens=True, | |
max_length=self._tokenizer.model_max_length | |
) | |
input_ids, _, attention_mask = ( | |
features['input_ids'], | |
features['token_type_ids'], | |
features['attention_mask'] | |
) | |
input_ids = self._list_to_padded_array(features['input_ids']) | |
attention_mask = self._list_to_padded_array( | |
features['attention_mask']) | |
input_ids_list.append(input_ids) | |
attention_mask_list.append(attention_mask) | |
input_dict = { | |
'input_ids': np.array(input_ids_list), | |
'attention_mask': np.array(attention_mask_list) | |
} | |
logit_predictions = self._model.predict_on_batch(input_dict) | |
yield from ( | |
[softmax(logit_prediction) | |
for logit_prediction in logit_predictions[0]] | |
) | |
def _list_to_padded_array(self, items): | |
array = np.array(items) | |
padded_array = np.zeros(self._tokenizer.model_max_length, dtype=np.int) | |
padded_array[:array.shape[0]] = array | |
return padded_array | |
def _get_temporary_path(self, name=''): | |
return f'{AUTOSAVE_PATH}{name}/{int(round(time.time() * 1000))}' | |
def _reload_model(self): | |
self._model_path = self._get_temporary_path( | |
name=self._get_model_family()) | |
self._dump(self._model_path) | |
self._load_local_model(self._model_path) | |
def _load_local_model(self, model_path): | |
try: | |
self._tokenizer = AutoTokenizer.from_pretrained( | |
model_path + '/tokenizer') | |
self._config = AutoConfig.from_pretrained( | |
model_path + '/tokenizer') | |
# Old models didn't use to have a tokenizer folder | |
except OSError: | |
self._tokenizer = AutoTokenizer.from_pretrained(model_path) | |
self._config = AutoConfig.from_pretrained(model_path) | |
self._model = TFAutoModelForSequenceClassification.from_pretrained( | |
model_path, | |
from_pt=False | |
) | |
def _get_model_family(self): | |
model_family = ''.join(self._model.name[2:].split('_')[:2]) | |
return model_family | |
def _load_remote_model(self, model_name, tokenizer_kwargs, model_kwargs): | |
do_lower_case = False | |
if 'uncased' in model_name.lower(): | |
do_lower_case = True | |
tokenizer_kwargs.update({'do_lower_case': do_lower_case}) | |
self._tokenizer = AutoTokenizer.from_pretrained( | |
model_name, **tokenizer_kwargs) | |
self._config = AutoConfig.from_pretrained(model_name) | |
temporary_path = self._get_temporary_path() | |
make_dir(temporary_path) | |
# TensorFlow model | |
try: | |
self._model = TFAutoModelForSequenceClassification.from_pretrained( | |
model_name, | |
from_pt=False | |
) | |
# PyTorch model | |
except TypeError: | |
try: | |
self._model = \ | |
TFAutoModelForSequenceClassification.from_pretrained( | |
model_name, | |
from_pt=True | |
) | |
# Loading a TF model from a PyTorch checkpoint is not supported | |
# when using a model identifier name | |
except OSError: | |
model = AutoModel.from_pretrained(model_name) | |
model.save_pretrained(temporary_path) | |
self._model = \ | |
TFAutoModelForSequenceClassification.from_pretrained( | |
temporary_path, | |
from_pt=True | |
) | |
# Clean the model's last layer if the provided properties are different | |
clean_last_layer = False | |
for key, value in model_kwargs.items(): | |
if not hasattr(self._model.config, key): | |
clean_last_layer = True | |
break | |
if getattr(self._model.config, key) != value: | |
clean_last_layer = True | |
break | |
if clean_last_layer: | |
try: | |
getattr(self._model, self._get_model_family() | |
).save_pretrained(temporary_path) | |
self._model = self._model.__class__.from_pretrained( | |
temporary_path, | |
from_pt=False, | |
**model_kwargs | |
) | |
# The model is itself the main layer | |
except AttributeError: | |
# TensorFlow model | |
try: | |
self._model = self._model.__class__.from_pretrained( | |
model_name, | |
from_pt=False, | |
**model_kwargs | |
) | |
# PyTorch Model | |
except (OSError, TypeError): | |
model = AutoModel.from_pretrained(model_name) | |
model.save_pretrained(temporary_path) | |
self._model = self._model.__class__.from_pretrained( | |
temporary_path, | |
from_pt=True, | |
**model_kwargs | |
) | |
remove_dir(temporary_path) | |
assert self._tokenizer and self._model | |