TwT-6's picture
Upload 2667 files
256a159 verified
raw
history blame
34 kB
import os
from typing import Dict, List, Optional, Union
import numpy as np
import torch
import transformers
from opencompass.models.base import BaseModel
from opencompass.models.base_api import APITemplateParser
from opencompass.registry import MODELS
from opencompass.utils.logging import get_logger
from opencompass.utils.prompt import PromptList
PromptType = Union[PromptList, str]
class MultiTokenEOSCriteria(transformers.StoppingCriteria):
"""Criteria to stop on the specified multi-token sequence."""
def __init__(
self,
sequence: str,
tokenizer: transformers.PreTrainedTokenizer,
batch_size: int,
):
self.done_tracker = [False] * batch_size
self.sequence = sequence
self.sequence_ids = tokenizer.encode(sequence,
add_special_tokens=False)
self.sequence_id_len = len(self.sequence_ids)
self.tokenizer = tokenizer
def __call__(self, input_ids, scores, **kwargs) -> bool:
# compare the last len(stop) tokens
lookback_ids_batch = input_ids[:, -self.sequence_id_len:]
lookback_tokens_batch = self.tokenizer.batch_decode(lookback_ids_batch)
for i, done in enumerate(self.done_tracker):
if done:
continue
self.done_tracker[i] = self.sequence in lookback_tokens_batch[i]
return False not in self.done_tracker
@MODELS.register_module()
class HuggingFace(BaseModel):
"""Model wrapper around HuggingFace models.
Args:
path (str): The name or path to HuggingFace's model.
hf_cache_dir: Set the cache dir to HF model cache dir. If None, it will
use the env variable HF_MODEL_HUB. Defaults to None.
max_seq_len (int): The maximum length of the input sequence. Defaults
to 2048.
tokenizer_path (str): The path to the tokenizer. Defaults to None.
tokenizer_kwargs (dict): Keyword arguments for the tokenizer.
Defaults to {}.
peft_path (str, optional): The name or path to the HuggingFace's PEFT
model. If None, the original model will not be converted to PEFT.
Defaults to None.
tokenizer_only (bool): If True, only the tokenizer will be initialized.
Defaults to False.
model_kwargs (dict): Keyword arguments for the model, used in loader.
Defaults to dict(device_map='auto').
meta_template (Dict, optional): The model's meta prompt
template if needed, in case the requirement of injecting or
wrapping of any meta instructions.
extract_pred_after_decode (bool): Whether to extract the prediction
string from the decoded output string, instead of extract the
prediction tokens before decoding. Defaults to False.
batch_padding (bool): If False, inference with be performed in for-loop
without batch padding.
pad_token_id (int): The id of the padding token. Defaults to None. Use
(#vocab + pad_token_id) if get negative value.
mode (str, optional): The method of input truncation when input length
exceeds max_seq_len. 'mid' represents the part of input to
truncate. Defaults to 'none'.
use_fastchat_template (str, optional): Whether to use fastchat to get
the conversation template. If True, fastchat needs to be
implemented first. Defaults to False.
end_str (str, optional): Whether to trim generated strings with end_str
if the model has special ending strings that are not handled well.
Defaults to None.
Note:
About ``extract_pred_after_decode``: Commonly, we should extract the
the prediction tokens before decoding. But for some tokenizers using
``sentencepiece``, like LLaMA, this behavior may change the number of
whitespaces, which is harmful for Python programming tasks.
"""
def __init__(self,
path: str,
hf_cache_dir: Optional[str] = None,
max_seq_len: int = 2048,
tokenizer_path: Optional[str] = None,
tokenizer_kwargs: dict = dict(),
peft_path: Optional[str] = None,
tokenizer_only: bool = False,
model_kwargs: dict = dict(device_map='auto'),
generation_kwargs: dict = dict(),
meta_template: Optional[Dict] = None,
extract_pred_after_decode: bool = False,
batch_padding: bool = False,
pad_token_id: Optional[int] = None,
mode: str = 'none',
use_fastchat_template: bool = False,
end_str: Optional[str] = None):
super().__init__(path=path,
max_seq_len=max_seq_len,
tokenizer_only=tokenizer_only,
meta_template=meta_template)
if hf_cache_dir is None:
hf_cache_dir = os.getenv('HF_MODEL_HUB', None)
self.logger = get_logger()
self.pad_token_id = pad_token_id
assert mode in ['none', 'mid']
self.mode = mode
self._load_tokenizer(path=path,
tokenizer_path=tokenizer_path,
tokenizer_kwargs=tokenizer_kwargs)
self.batch_padding = batch_padding
self.extract_pred_after_decode = extract_pred_after_decode
if not tokenizer_only:
self._load_model(path=path,
model_kwargs=model_kwargs,
peft_path=peft_path)
self.generation_kwargs = generation_kwargs
self.use_fastchat_template = use_fastchat_template
self.end_str = end_str
def _load_tokenizer(self, path: str, tokenizer_path: Optional[str],
tokenizer_kwargs: dict):
from transformers import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
tokenizer_path if tokenizer_path else path, **tokenizer_kwargs)
# A patch for some models without pad_token_id
if self.pad_token_id is not None:
if self.pad_token_id < 0:
self.pad_token_id += self.tokenizer.vocab_size
if self.tokenizer.pad_token_id is None:
self.logger.debug(f'Using {self.pad_token_id} as pad_token_id')
elif self.tokenizer.pad_token_id != self.pad_token_id:
self.logger.warning(
'pad_token_id is not consistent with the tokenizer. Using '
f'{self.pad_token_id} as pad_token_id')
self.tokenizer.pad_token_id = self.pad_token_id
elif self.tokenizer.pad_token_id is None:
self.logger.warning('pad_token_id is not set for the tokenizer.')
if self.tokenizer.eos_token is not None:
self.logger.warning(
f'Using eos_token_id {self.tokenizer.eos_token} '
'as pad_token_id.')
self.tokenizer.pad_token = self.tokenizer.eos_token
else:
from transformers.generation import GenerationConfig
gcfg = GenerationConfig.from_pretrained(path)
if gcfg.pad_token_id is not None:
self.logger.warning(
f'Using pad_token_id {gcfg.pad_token_id} '
'as pad_token_id.')
self.tokenizer.pad_token_id = gcfg.pad_token_id
else:
raise ValueError(
'pad_token_id is not set for this tokenizer. Try to '
'set pad_token_id via passing '
'`pad_token_id={PAD_TOKEN_ID}` in model_cfg.')
# A patch for llama when batch_padding = True
if 'decapoda-research/llama' in path or \
(tokenizer_path and
'decapoda-research/llama' in tokenizer_path):
self.logger.warning('We set new pad_token_id for LLaMA model')
# keep consistent with official LLaMA repo
# https://github.com/google/sentencepiece/blob/master/python/sentencepiece_python_module_example.ipynb # noqa
self.tokenizer.bos_token = '<s>'
self.tokenizer.eos_token = '</s>'
self.tokenizer.pad_token_id = 0
def _set_model_kwargs_torch_dtype(self, model_kwargs):
if 'torch_dtype' not in model_kwargs:
torch_dtype = torch.float16
else:
torch_dtype = {
'torch.float16': torch.float16,
'torch.bfloat16': torch.bfloat16,
'torch.float': torch.float,
'auto': 'auto',
'None': None
}.get(model_kwargs['torch_dtype'])
self.logger.debug(f'HF using torch_dtype: {torch_dtype}')
if torch_dtype is not None:
model_kwargs['torch_dtype'] = torch_dtype
def _load_model(self,
path: str,
model_kwargs: dict,
peft_path: Optional[str] = None):
from transformers import AutoModel, AutoModelForCausalLM
self._set_model_kwargs_torch_dtype(model_kwargs)
try:
self.model = AutoModelForCausalLM.from_pretrained(
path, **model_kwargs)
except ValueError:
self.model = AutoModel.from_pretrained(path, **model_kwargs)
if peft_path is not None:
from peft import PeftModel
self.model = PeftModel.from_pretrained(self.model,
peft_path,
is_trainable=False)
self.model.eval()
self.model.generation_config.do_sample = False
# A patch for llama when batch_padding = True
if 'decapoda-research/llama' in path:
self.model.config.bos_token_id = 1
self.model.config.eos_token_id = 2
self.model.config.pad_token_id = self.tokenizer.pad_token_id
def generate(self,
inputs: List[str],
max_out_len: int,
min_out_len: Optional[int] = None,
stopping_criteria: List[str] = [],
**kwargs) -> List[str]:
"""Generate results given a list of inputs.
Args:
inputs (List[str]): A list of strings.
max_out_len (int): The maximum length of the output.
min_out_len (Optional[int]): The minimum length of the output.
Returns:
List[str]: A list of generated strings.
"""
generation_kwargs = kwargs.copy()
generation_kwargs.update(self.generation_kwargs)
if self.batch_padding and len(inputs) > 1:
return self._batch_generate(inputs=inputs,
max_out_len=max_out_len,
min_out_len=min_out_len,
stopping_criteria=stopping_criteria,
**generation_kwargs)
else:
return sum(
(self._single_generate(inputs=[input_],
max_out_len=max_out_len,
min_out_len=min_out_len,
stopping_criteria=stopping_criteria,
**generation_kwargs)
for input_ in inputs), [])
def _batch_generate(self,
inputs: List[str],
max_out_len: int,
min_out_len: Optional[int] = None,
stopping_criteria: List[str] = [],
**kwargs) -> List[str]:
"""Support for batch prompts inference.
Args:
inputs (List[str]): A list of strings.
max_out_len (int): The maximum length of the output.
Returns:
List[str]: A list of generated strings.
"""
if self.extract_pred_after_decode:
prompt_lens = [len(input_) for input_ in inputs]
if self.use_fastchat_template:
try:
from fastchat.model import get_conversation_template
except ModuleNotFoundError:
raise ModuleNotFoundError(
'Fastchat is not implemented. You can use '
'\'pip install "fschat[model_worker,webui]"\' '
'to implement fastchat.')
for i in range(len(inputs)):
conv = get_conversation_template('vicuna')
conv.append_message(conv.roles[0], inputs[i])
conv.append_message(conv.roles[1], None)
inputs[i] = conv.get_prompt()
# step-1: tokenize the input with batch_encode_plus
tokens = self.tokenizer.batch_encode_plus(inputs,
padding=True,
truncation=True,
max_length=self.max_seq_len -
max_out_len)
tokens = {
k: torch.tensor(np.array(tokens[k]), device=self.model.device)
for k in tokens if k in ['input_ids', 'attention_mask']
}
if stopping_criteria:
# Construct huggingface stopping criteria
if self.tokenizer.eos_token is not None:
stopping_criteria = stopping_criteria + [
self.tokenizer.eos_token
]
stopping_criteria = transformers.StoppingCriteriaList([
*[
MultiTokenEOSCriteria(sequence, self.tokenizer,
tokens['input_ids'].shape[0])
for sequence in stopping_criteria
],
])
kwargs['stopping_criteria'] = stopping_criteria
if min_out_len is not None:
kwargs['min_new_tokens'] = min_out_len
# step-2: conduct model forward to generate output
outputs = self.model.generate(**tokens,
max_new_tokens=max_out_len,
**kwargs)
if not self.extract_pred_after_decode:
outputs = outputs[:, tokens['input_ids'].shape[1]:]
decodeds = self.tokenizer.batch_decode(outputs,
skip_special_tokens=True)
if self.extract_pred_after_decode:
decodeds = [
token[len_:] for token, len_ in zip(decodeds, prompt_lens)
]
if self.end_str:
decodeds = [token.split(self.end_str)[0] for token in decodeds]
return decodeds
def _single_generate(self,
inputs: List[str],
max_out_len: int,
min_out_len: Optional[int] = None,
stopping_criteria: List[str] = [],
**kwargs) -> List[str]:
"""Support for single prompt inference.
Args:
inputs (List[str]): A list of strings.
max_out_len (int): The maximum length of the output.
Returns:
List[str]: A list of generated strings.
"""
if self.extract_pred_after_decode:
prompt_lens = [len(input_) for input_ in inputs]
if self.use_fastchat_template:
try:
from fastchat.model import get_conversation_template
except ModuleNotFoundError:
raise ModuleNotFoundError(
'Fastchat is not implemented. You can use '
'\'pip install "fschat[model_worker,webui]"\' '
'to implement fastchat.')
conv = get_conversation_template('vicuna')
conv.append_message(conv.roles[0], inputs[0])
conv.append_message(conv.roles[1], None)
inputs = [conv.get_prompt()]
if self.mode == 'mid':
input_ids = self.tokenizer(inputs, truncation=False)['input_ids']
input_ids = torch.tensor(input_ids, device=self.model.device)
if len(input_ids[0]) > self.max_seq_len - max_out_len:
half = int((self.max_seq_len - max_out_len) / 2)
inputs = [
self.tokenizer.decode(input_ids[0][:half],
skip_special_tokens=True) +
self.tokenizer.decode(input_ids[0][-half:],
skip_special_tokens=True)
]
input_ids = self.tokenizer(inputs,
truncation=True,
max_length=self.max_seq_len -
max_out_len)['input_ids']
input_ids = torch.tensor(input_ids, device=self.model.device)
if stopping_criteria:
# Construct huggingface stopping criteria
if self.tokenizer.eos_token is not None:
stopping_criteria = stopping_criteria + [
self.tokenizer.eos_token
]
stopping_criteria = transformers.StoppingCriteriaList([
*[
MultiTokenEOSCriteria(sequence, self.tokenizer,
input_ids.shape[0])
for sequence in stopping_criteria
],
])
kwargs['stopping_criteria'] = stopping_criteria
if min_out_len is not None:
kwargs['min_new_tokens'] = min_out_len
# To accommodate the PeftModel, parameters should be passed in
# key-value format for generate.
outputs = self.model.generate(input_ids=input_ids,
max_new_tokens=max_out_len,
**kwargs)
if not self.extract_pred_after_decode:
outputs = outputs[:, input_ids.shape[1]:]
decodeds = self.tokenizer.batch_decode(outputs,
skip_special_tokens=True)
if self.extract_pred_after_decode:
decodeds = [
token[len_:] for token, len_ in zip(decodeds, prompt_lens)
]
if self.end_str:
decodeds = [token.split(self.end_str)[0] for token in decodeds]
return decodeds
def get_logits(self, inputs: List[str]):
if self.batch_padding and len(inputs) > 1:
# batch inference
tokens = self.tokenizer(inputs,
padding=True,
truncation=True,
max_length=self.max_seq_len)
tokens = {
k: torch.tensor(np.array(tokens[k]), device=self.model.device)
for k in tokens if k in ['input_ids', 'attention_mask']
}
outputs = self.model(**tokens)
else:
input_ids = self.tokenizer(
inputs,
padding=False,
truncation=True,
max_length=self.max_seq_len)['input_ids']
input_ids = torch.tensor(input_ids, device=self.model.device)
tokens = {'input_ids': input_ids}
outputs = self.model(input_ids)
return outputs[0], {'tokens': tokens}
def get_ppl(self,
inputs: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
"""Get perplexity scores given a list of inputs.
Args:
inputs (List[str]): A list of strings.
mask_length (Optional[List[int]]): A list of mask lengths. If
provided, the perplexity scores will be calculated with the
first mask_length[i] tokens masked out. It's okay to skip
its implementation if advanced features in PPLInfernecer is
not needed.
Returns:
List[float]: A list of perplexity scores.
"""
if self.batch_padding and len(inputs) > 1:
assert self.tokenizer.pad_token
return self._get_ppl(inputs, mask_length=mask_length)
else:
return np.concatenate([
self._get_ppl(inputs=[text], mask_length=mask_length)
for text in inputs
])
def _get_ppl(self,
inputs: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
"""Get perplexity scores given a list of inputs.
Args:
inputs (List[str]): A list of strings.
mask_length (Optional[List[int]]): A list of mask lengths. If
provided, the perplexity scores will be calculated with the
first mask_length[i] tokens masked out. It's okay to skip
its implementation if advanced features in PPLInfernecer is
not needed.
Returns:
List[float]: A list of perplexity scores.
"""
outputs, inputs = self.get_logits(inputs)
shift_logits = outputs[..., :-1, :].contiguous().float()
shift_labels = inputs['tokens']['input_ids'][..., 1:].contiguous()
loss_fct = torch.nn.CrossEntropyLoss(
reduction='none', ignore_index=self.tokenizer.pad_token_id)
loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1)).view(shift_labels.size())
if mask_length is not None:
mask = torch.zeros_like(shift_labels) # [batch,seqlen]
for i in range(len(mask)):
for j in range(mask_length[i] - 1, len(mask[i])):
mask[i][j] = 1
loss = loss * mask
lens = (inputs['tokens']['input_ids'] !=
self.tokenizer.pad_token_id).sum(-1).cpu().numpy()
if mask_length is not None:
lens -= np.array(mask_length)
ce_loss = loss.float().sum(-1).cpu().detach().numpy() / lens
return ce_loss
def get_loglikelihood(
self,
inputs: List[str],
conts: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
"""Get loglikelihood scores given a list of inputs.
Args:
inputs (List[str]): A list of strings.
conts (List[str]): A list of strings: slices after the space.
NOT SUPPORT mask_length YET!
mask_length (Optional[List[int]]): A list of mask lengths. If
provided, the perplexity scores will be calculated with the
first mask_length[i] tokens masked out. It's okay to skip
its implementation if advanced features in PPLInfernecer is
not needed.
Returns:
List[float]: A list of loglikelihood scores.
"""
assert mask_length is None, 'Not support mask_length yet.'
if self.batch_padding and len(inputs) > 1:
assert self.tokenizer.pad_token
return self._get_loglikelihood(inputs, conts)
else:
return np.concatenate([
self._get_loglikelihood(inputs=[inputs[idx]],
conts=[conts[idx]])
for idx in range(len(inputs))
])
def _get_loglikelihood(self, inputs: str, conts: str) -> float:
"""Get loglikelihood scores given input string and continuation string.
Args:
inputs (str): string.
conts (str): strings: slices after the space.
Returns:
float: loglikelihood scores.
"""
input_tokenizer_out = self.tokenizer(inputs,
padding=True,
truncation=False,
return_length=True,
return_tensors='pt').to(
self.model.device)
input_ids = input_tokenizer_out['input_ids'][:, :self.max_seq_len]
input_length = input_tokenizer_out['length']
context_ids = [
self.tokenizer(inputs[i].replace(conts[i], ''),
padding=False,
truncation=True,
max_length=self.max_seq_len)['input_ids']
for i in range(len(inputs))
]
# forward
outputs = self.model(input_ids)['logits']
outputs = torch.nn.functional.log_softmax(outputs, dim=-1)
# calculate loglikelihood
answer = np.zeros(len(inputs))
for i in range(len(inputs)):
if self.tokenizer.padding_side == 'right':
cont_ids = input_ids[i, len(context_ids[i]):input_length[i]]
logits = outputs[i,
len(context_ids[i]) - 1:input_length[i] -
1, :] # noqa
else:
cont_ids = input_ids[i, len(context_ids[i]) - input_length[i]:]
logits = outputs[i,
len(context_ids[i]) - input_length[i] - 1:-1]
# Reducing the dimension will lead to a wrong outcome
logits_gather = torch.gather(
logits.unsqueeze(0), 2,
cont_ids.unsqueeze(0).unsqueeze(-1)) # [1, seq]
# Answer: sum the likelihood of each token in continuation
answer[i] = float(logits_gather.detach().cpu().sum())
return answer
def get_mink_percent(self, inputs: List[str], k: int = 20) -> List[float]:
"""https://swj0419.github.io/detect-pretrain.github.io/"""
if self.batch_padding and len(inputs) > 1:
assert self.tokenizer.pad_token
return self._get_mink_percent(inputs, k=k)
else:
return np.concatenate([
self._get_mink_percent(inputs=[text], k=k) for text in inputs
])
def _get_mink_percent(self, inputs: List[str], k: int = 20) -> List[float]:
outputs, inputs = self.get_logits(inputs)
shift_logits = outputs[:, :-1, :].contiguous().float()
shift_labels = inputs['tokens']['input_ids'][:, 1:].contiguous()
loss_fct = torch.nn.CrossEntropyLoss(
reduction='none', ignore_index=self.tokenizer.pad_token_id)
loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1)).view(shift_labels.size())
lens = (inputs['tokens']['input_ids'] !=
self.tokenizer.pad_token_id).sum(-1).cpu().numpy()
mink_percent = []
for nloss, nlen in zip(loss, lens):
nlen = int(nlen)
minklen = max(nlen * k // 100, 1)
nloss = torch.topk(loss[-nlen:], minklen, dim=-1)[0]
nloss = -nloss.float().mean().cpu().detach().numpy()
mink_percent.append(nloss)
return np.array(mink_percent)
def get_token_len(self, prompt: str) -> int:
"""Get lengths of the tokenized strings.
Args:
prompt (str): Input string.
Returns:
int: Length of the input tokens
"""
return len(self.tokenizer.encode(prompt))
@MODELS.register_module()
class HuggingFaceCausalLM(HuggingFace):
"""Model wrapper around HuggingFace CausalLM.
Args:
path (str): The name or path to HuggingFace's model.
hf_cache_dir: Set the cache dir to HF model cache dir. If None, it will
use the env variable HF_MODEL_HUB. Defaults to None.
max_seq_len (int): The maximum length of the input sequence. Defaults
to 2048.
tokenizer_path (str): The path to the tokenizer. Defaults to None.
tokenizer_kwargs (dict): Keyword arguments for the tokenizer.
Defaults to {}.
peft_path (str, optional): The name or path to the HuggingFace's PEFT
model. If None, the original model will not be converted to PEFT.
Defaults to None.
tokenizer_only (bool): If True, only the tokenizer will be initialized.
Defaults to False.
model_kwargs (dict): Keyword arguments for the model, used in loader.
Defaults to dict(device_map='auto').
meta_template (Dict, optional): The model's meta prompt
template if needed, in case the requirement of injecting or
wrapping of any meta instructions.
batch_padding (bool): If False, inference with be performed in for-loop
without batch padding.
"""
def _load_model(self,
path: str,
model_kwargs: dict,
peft_path: Optional[str] = None):
from transformers import AutoModelForCausalLM
self._set_model_kwargs_torch_dtype(model_kwargs)
self.model = AutoModelForCausalLM.from_pretrained(path, **model_kwargs)
if peft_path is not None:
from peft import PeftModel
self.model = PeftModel.from_pretrained(self.model,
peft_path,
is_trainable=False)
self.model.eval()
self.model.generation_config.do_sample = False
class HuggingFaceChatGLM3(HuggingFace):
"""Model wrapper around HuggingFace's ChatGLM3. Details available in
`https://huggingface.co/THUDM/chatglm3-6b`.
model.chat() is used for inference.
"""
def __init__(self,
path: str,
hf_cache_dir: Optional[str] = None,
max_seq_len: int = 2048,
tokenizer_path: Optional[str] = None,
tokenizer_kwargs: dict = dict(),
peft_path: Optional[str] = None,
tokenizer_only: bool = False,
model_kwargs: dict = dict(device_map='auto'),
generation_kwargs: dict = dict(),
meta_template: Optional[Dict] = None,
extract_pred_after_decode: bool = False,
batch_padding: bool = False,
pad_token_id: Optional[int] = None,
mode: str = 'none',
num_extra_tokens: int = 50):
super().__init__(path=path,
hf_cache_dir=hf_cache_dir,
max_seq_len=max_seq_len,
tokenizer_path=tokenizer_path,
tokenizer_kwargs=tokenizer_kwargs,
peft_path=peft_path,
tokenizer_only=tokenizer_only,
generation_kwargs=generation_kwargs,
model_kwargs=model_kwargs,
meta_template=meta_template,
extract_pred_after_decode=extract_pred_after_decode,
batch_padding=batch_padding,
pad_token_id=pad_token_id,
mode=mode)
self.template_parser = APITemplateParser(meta_template)
# used to compensate for #tokens occupied by sth like system prompt
self.num_extra_tokens = num_extra_tokens
def generate(self,
inputs: List[str or PromptList],
max_out_len: int = 512,
skip_overlength=False,
**kwargs) -> str:
"""Generate response from input prompt.
Args:
inputs (list): input prompt
max_out_len (int): max output length
"""
generation_kwargs = kwargs.copy()
generation_kwargs.update(self.generation_kwargs)
responses = []
for _input in inputs:
assert isinstance(_input, (str, PromptList))
if isinstance(_input, str):
history = [{'role': 'user', 'content': _input}]
else:
history = []
for item in _input:
msg = {
'content': item['prompt'],
'role': {
'HUMAN': 'user',
'BOT': 'assistant',
'SYSTEM': 'system',
}[item['role'].upper()]
}
history.append(msg)
user_content = history[-1]['content']
history = history[:-1]
if skip_overlength:
# The model will report the following error
# if the sequence length is greater than the maximum length:
# "Input length of input_ids is {INPUT_IDS},
# but `max_length` is set to 8192.
# This can lead to unexpected behavior.
# You should consider increasing `max_new_tokens`."
# The following hardcode can fix this exception.
len_user_content = len(self.tokenizer.encode(user_content))
if len_user_content > 8192:
responses.append('')
continue
response, history = self.model.chat(self.tokenizer,
user_content,
history=history,
max_new_tokens=max_out_len,
**generation_kwargs)
# response will be dict sometime
if isinstance(response, dict):
response = response.get('content', '')
responses.append(response)
return responses
def get_token_len(self, prompt: str) -> int:
return len(self.tokenizer.encode(prompt)) + self.num_extra_tokens