|
from typing import Dict, List, Any |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, StoppingCriteria, StoppingCriteriaList |
|
|
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
tokenizer = AutoTokenizer.from_pretrained(path) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
self.model = AutoModelForCausalLM.from_pretrained(path) |
|
self.tokenizer = tokenizer |
|
self.stopping_criteria = StoppingCriteriaList([StopAtPeriodCriteria(tokenizer)]) |
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
""" |
|
data args: |
|
inputs (:obj: `str`) |
|
kwargs |
|
Return: |
|
A :obj:`list` | `dict`: will be serialized and returned |
|
""" |
|
inputs = data.pop("inputs", data) |
|
additional_bad_words_ids = data.pop("additional_bad_words_ids", []) |
|
|
|
|
|
|
|
|
|
|
|
|
|
bad_words_ids = [[3070], [313, 334], [10456], [13], [1976, 441, 29889], [2087, 29885, 4430, 29889], [4920, 441], [4920, 441, 29889], [4920, 18054, 29889]] |
|
bad_words_ids.extend(additional_bad_words_ids) |
|
|
|
input_ids = self.tokenizer.encode(inputs, return_tensors="pt") |
|
max_generation_length = 75 |
|
max_input_length = 4092 - max_generation_length |
|
|
|
|
|
if input_ids.shape[1] > max_input_length: |
|
input_ids = input_ids[:, -max_input_length:] |
|
|
|
max_length = input_ids.shape[1] + max_generation_length |
|
|
|
generated_ids = self.model.generate( |
|
input_ids, |
|
max_length=max_length, |
|
bad_words_ids=bad_words_ids, |
|
temperature=0.7, |
|
top_k=40, |
|
do_sample=True, |
|
stopping_criteria=self.stopping_criteria, |
|
) |
|
|
|
generated_text = self.tokenizer.decode(generated_ids[0][input_ids.shape[1]:], skip_special_tokens=True) |
|
prediction = [{"generated_text": generated_text, "generated_ids": generated_ids[0][input_ids.shape[1]:].tolist()}] |
|
return prediction |
|
|
|
|
|
class StopAtPeriodCriteria(StoppingCriteria): |
|
def __init__(self, tokenizer): |
|
self.tokenizer = tokenizer |
|
|
|
def __call__(self, input_ids, scores, **kwargs): |
|
|
|
last_token_text = self.tokenizer.decode(input_ids[:, -1], skip_special_tokens=True) |
|
|
|
return '.' in last_token_text |