import os from typing import Dict, List, Optional, Union import numpy as np import torch import transformers from opencompass.models.base import BaseModel from opencompass.models.base_api import APITemplateParser from opencompass.registry import MODELS from opencompass.utils.logging import get_logger from opencompass.utils.prompt import PromptList PromptType = Union[PromptList, str] class MultiTokenEOSCriteria(transformers.StoppingCriteria): """Criteria to stop on the specified multi-token sequence.""" def __init__( self, sequence: str, tokenizer: transformers.PreTrainedTokenizer, batch_size: int, ): self.done_tracker = [False] * batch_size self.sequence = sequence self.sequence_ids = tokenizer.encode(sequence, add_special_tokens=False) self.sequence_id_len = len(self.sequence_ids) self.tokenizer = tokenizer def __call__(self, input_ids, scores, **kwargs) -> bool: # compare the last len(stop) tokens lookback_ids_batch = input_ids[:, -self.sequence_id_len:] lookback_tokens_batch = self.tokenizer.batch_decode(lookback_ids_batch) for i, done in enumerate(self.done_tracker): if done: continue self.done_tracker[i] = self.sequence in lookback_tokens_batch[i] return False not in self.done_tracker @MODELS.register_module() class HuggingFace(BaseModel): """Model wrapper around HuggingFace models. Args: path (str): The name or path to HuggingFace's model. hf_cache_dir: Set the cache dir to HF model cache dir. If None, it will use the env variable HF_MODEL_HUB. Defaults to None. max_seq_len (int): The maximum length of the input sequence. Defaults to 2048. tokenizer_path (str): The path to the tokenizer. Defaults to None. tokenizer_kwargs (dict): Keyword arguments for the tokenizer. Defaults to {}. peft_path (str, optional): The name or path to the HuggingFace's PEFT model. If None, the original model will not be converted to PEFT. Defaults to None. tokenizer_only (bool): If True, only the tokenizer will be initialized. Defaults to False. model_kwargs (dict): Keyword arguments for the model, used in loader. Defaults to dict(device_map='auto'). meta_template (Dict, optional): The model's meta prompt template if needed, in case the requirement of injecting or wrapping of any meta instructions. extract_pred_after_decode (bool): Whether to extract the prediction string from the decoded output string, instead of extract the prediction tokens before decoding. Defaults to False. batch_padding (bool): If False, inference with be performed in for-loop without batch padding. pad_token_id (int): The id of the padding token. Defaults to None. Use (#vocab + pad_token_id) if get negative value. mode (str, optional): The method of input truncation when input length exceeds max_seq_len. 'mid' represents the part of input to truncate. Defaults to 'none'. use_fastchat_template (str, optional): Whether to use fastchat to get the conversation template. If True, fastchat needs to be implemented first. Defaults to False. end_str (str, optional): Whether to trim generated strings with end_str if the model has special ending strings that are not handled well. Defaults to None. Note: About ``extract_pred_after_decode``: Commonly, we should extract the the prediction tokens before decoding. But for some tokenizers using ``sentencepiece``, like LLaMA, this behavior may change the number of whitespaces, which is harmful for Python programming tasks. """ def __init__(self, path: str, hf_cache_dir: Optional[str] = None, max_seq_len: int = 2048, tokenizer_path: Optional[str] = None, tokenizer_kwargs: dict = dict(), peft_path: Optional[str] = None, tokenizer_only: bool = False, model_kwargs: dict = dict(device_map='auto'), generation_kwargs: dict = dict(), meta_template: Optional[Dict] = None, extract_pred_after_decode: bool = False, batch_padding: bool = False, pad_token_id: Optional[int] = None, mode: str = 'none', use_fastchat_template: bool = False, end_str: Optional[str] = None): super().__init__(path=path, max_seq_len=max_seq_len, tokenizer_only=tokenizer_only, meta_template=meta_template) if hf_cache_dir is None: hf_cache_dir = os.getenv('HF_MODEL_HUB', None) self.logger = get_logger() self.pad_token_id = pad_token_id assert mode in ['none', 'mid'] self.mode = mode self._load_tokenizer(path=path, tokenizer_path=tokenizer_path, tokenizer_kwargs=tokenizer_kwargs) self.batch_padding = batch_padding self.extract_pred_after_decode = extract_pred_after_decode if not tokenizer_only: self._load_model(path=path, model_kwargs=model_kwargs, peft_path=peft_path) self.generation_kwargs = generation_kwargs self.use_fastchat_template = use_fastchat_template self.end_str = end_str def _load_tokenizer(self, path: str, tokenizer_path: Optional[str], tokenizer_kwargs: dict): from transformers import AutoTokenizer self.tokenizer = AutoTokenizer.from_pretrained( tokenizer_path if tokenizer_path else path, **tokenizer_kwargs) # A patch for some models without pad_token_id if self.pad_token_id is not None: if self.pad_token_id < 0: self.pad_token_id += self.tokenizer.vocab_size if self.tokenizer.pad_token_id is None: self.logger.debug(f'Using {self.pad_token_id} as pad_token_id') elif self.tokenizer.pad_token_id != self.pad_token_id: self.logger.warning( 'pad_token_id is not consistent with the tokenizer. Using ' f'{self.pad_token_id} as pad_token_id') self.tokenizer.pad_token_id = self.pad_token_id elif self.tokenizer.pad_token_id is None: self.logger.warning('pad_token_id is not set for the tokenizer.') if self.tokenizer.eos_token is not None: self.logger.warning( f'Using eos_token_id {self.tokenizer.eos_token} ' 'as pad_token_id.') self.tokenizer.pad_token = self.tokenizer.eos_token else: from transformers.generation import GenerationConfig gcfg = GenerationConfig.from_pretrained(path) if gcfg.pad_token_id is not None: self.logger.warning( f'Using pad_token_id {gcfg.pad_token_id} ' 'as pad_token_id.') self.tokenizer.pad_token_id = gcfg.pad_token_id else: raise ValueError( 'pad_token_id is not set for this tokenizer. Try to ' 'set pad_token_id via passing ' '`pad_token_id={PAD_TOKEN_ID}` in model_cfg.') # A patch for llama when batch_padding = True if 'decapoda-research/llama' in path or \ (tokenizer_path and 'decapoda-research/llama' in tokenizer_path): self.logger.warning('We set new pad_token_id for LLaMA model') # keep consistent with official LLaMA repo # https://github.com/google/sentencepiece/blob/master/python/sentencepiece_python_module_example.ipynb # noqa self.tokenizer.bos_token = '' self.tokenizer.eos_token = '' self.tokenizer.pad_token_id = 0 def _set_model_kwargs_torch_dtype(self, model_kwargs): if 'torch_dtype' not in model_kwargs: torch_dtype = torch.float16 else: torch_dtype = { 'torch.float16': torch.float16, 'torch.bfloat16': torch.bfloat16, 'torch.float': torch.float, 'auto': 'auto', 'None': None }.get(model_kwargs['torch_dtype']) self.logger.debug(f'HF using torch_dtype: {torch_dtype}') if torch_dtype is not None: model_kwargs['torch_dtype'] = torch_dtype def _load_model(self, path: str, model_kwargs: dict, peft_path: Optional[str] = None): from transformers import AutoModel, AutoModelForCausalLM self._set_model_kwargs_torch_dtype(model_kwargs) try: self.model = AutoModelForCausalLM.from_pretrained( path, **model_kwargs) except ValueError: self.model = AutoModel.from_pretrained(path, **model_kwargs) if peft_path is not None: from peft import PeftModel self.model = PeftModel.from_pretrained(self.model, peft_path, is_trainable=False) self.model.eval() self.model.generation_config.do_sample = False # A patch for llama when batch_padding = True if 'decapoda-research/llama' in path: self.model.config.bos_token_id = 1 self.model.config.eos_token_id = 2 self.model.config.pad_token_id = self.tokenizer.pad_token_id def generate(self, inputs: List[str], max_out_len: int, min_out_len: Optional[int] = None, stopping_criteria: List[str] = [], **kwargs) -> List[str]: """Generate results given a list of inputs. Args: inputs (List[str]): A list of strings. max_out_len (int): The maximum length of the output. min_out_len (Optional[int]): The minimum length of the output. Returns: List[str]: A list of generated strings. """ generation_kwargs = kwargs.copy() generation_kwargs.update(self.generation_kwargs) if self.batch_padding and len(inputs) > 1: return self._batch_generate(inputs=inputs, max_out_len=max_out_len, min_out_len=min_out_len, stopping_criteria=stopping_criteria, **generation_kwargs) else: return sum( (self._single_generate(inputs=[input_], max_out_len=max_out_len, min_out_len=min_out_len, stopping_criteria=stopping_criteria, **generation_kwargs) for input_ in inputs), []) def _batch_generate(self, inputs: List[str], max_out_len: int, min_out_len: Optional[int] = None, stopping_criteria: List[str] = [], **kwargs) -> List[str]: """Support for batch prompts inference. Args: inputs (List[str]): A list of strings. max_out_len (int): The maximum length of the output. Returns: List[str]: A list of generated strings. """ if self.extract_pred_after_decode: prompt_lens = [len(input_) for input_ in inputs] if self.use_fastchat_template: try: from fastchat.model import get_conversation_template except ModuleNotFoundError: raise ModuleNotFoundError( 'Fastchat is not implemented. You can use ' '\'pip install "fschat[model_worker,webui]"\' ' 'to implement fastchat.') for i in range(len(inputs)): conv = get_conversation_template('vicuna') conv.append_message(conv.roles[0], inputs[i]) conv.append_message(conv.roles[1], None) inputs[i] = conv.get_prompt() # step-1: tokenize the input with batch_encode_plus tokens = self.tokenizer.batch_encode_plus(inputs, padding=True, truncation=True, max_length=self.max_seq_len - max_out_len) tokens = { k: torch.tensor(np.array(tokens[k]), device=self.model.device) for k in tokens if k in ['input_ids', 'attention_mask'] } if stopping_criteria: # Construct huggingface stopping criteria if self.tokenizer.eos_token is not None: stopping_criteria = stopping_criteria + [ self.tokenizer.eos_token ] stopping_criteria = transformers.StoppingCriteriaList([ *[ MultiTokenEOSCriteria(sequence, self.tokenizer, tokens['input_ids'].shape[0]) for sequence in stopping_criteria ], ]) kwargs['stopping_criteria'] = stopping_criteria if min_out_len is not None: kwargs['min_new_tokens'] = min_out_len # step-2: conduct model forward to generate output outputs = self.model.generate(**tokens, max_new_tokens=max_out_len, **kwargs) if not self.extract_pred_after_decode: outputs = outputs[:, tokens['input_ids'].shape[1]:] decodeds = self.tokenizer.batch_decode(outputs, skip_special_tokens=True) if self.extract_pred_after_decode: decodeds = [ token[len_:] for token, len_ in zip(decodeds, prompt_lens) ] if self.end_str: decodeds = [token.split(self.end_str)[0] for token in decodeds] return decodeds def _single_generate(self, inputs: List[str], max_out_len: int, min_out_len: Optional[int] = None, stopping_criteria: List[str] = [], **kwargs) -> List[str]: """Support for single prompt inference. Args: inputs (List[str]): A list of strings. max_out_len (int): The maximum length of the output. Returns: List[str]: A list of generated strings. """ if self.extract_pred_after_decode: prompt_lens = [len(input_) for input_ in inputs] if self.use_fastchat_template: try: from fastchat.model import get_conversation_template except ModuleNotFoundError: raise ModuleNotFoundError( 'Fastchat is not implemented. You can use ' '\'pip install "fschat[model_worker,webui]"\' ' 'to implement fastchat.') conv = get_conversation_template('vicuna') conv.append_message(conv.roles[0], inputs[0]) conv.append_message(conv.roles[1], None) inputs = [conv.get_prompt()] if self.mode == 'mid': input_ids = self.tokenizer(inputs, truncation=False)['input_ids'] input_ids = torch.tensor(input_ids, device=self.model.device) if len(input_ids[0]) > self.max_seq_len - max_out_len: half = int((self.max_seq_len - max_out_len) / 2) inputs = [ self.tokenizer.decode(input_ids[0][:half], skip_special_tokens=True) + self.tokenizer.decode(input_ids[0][-half:], skip_special_tokens=True) ] input_ids = self.tokenizer(inputs, truncation=True, max_length=self.max_seq_len - max_out_len)['input_ids'] input_ids = torch.tensor(input_ids, device=self.model.device) if stopping_criteria: # Construct huggingface stopping criteria if self.tokenizer.eos_token is not None: stopping_criteria = stopping_criteria + [ self.tokenizer.eos_token ] stopping_criteria = transformers.StoppingCriteriaList([ *[ MultiTokenEOSCriteria(sequence, self.tokenizer, input_ids.shape[0]) for sequence in stopping_criteria ], ]) kwargs['stopping_criteria'] = stopping_criteria if min_out_len is not None: kwargs['min_new_tokens'] = min_out_len # To accommodate the PeftModel, parameters should be passed in # key-value format for generate. outputs = self.model.generate(input_ids=input_ids, max_new_tokens=max_out_len, **kwargs) if not self.extract_pred_after_decode: outputs = outputs[:, input_ids.shape[1]:] decodeds = self.tokenizer.batch_decode(outputs, skip_special_tokens=True) if self.extract_pred_after_decode: decodeds = [ token[len_:] for token, len_ in zip(decodeds, prompt_lens) ] if self.end_str: decodeds = [token.split(self.end_str)[0] for token in decodeds] return decodeds def get_logits(self, inputs: List[str]): if self.batch_padding and len(inputs) > 1: # batch inference tokens = self.tokenizer(inputs, padding=True, truncation=True, max_length=self.max_seq_len) tokens = { k: torch.tensor(np.array(tokens[k]), device=self.model.device) for k in tokens if k in ['input_ids', 'attention_mask'] } outputs = self.model(**tokens) else: input_ids = self.tokenizer( inputs, padding=False, truncation=True, max_length=self.max_seq_len)['input_ids'] input_ids = torch.tensor(input_ids, device=self.model.device) tokens = {'input_ids': input_ids} outputs = self.model(input_ids) return outputs[0], {'tokens': tokens} def get_ppl(self, inputs: List[str], mask_length: Optional[List[int]] = None) -> List[float]: """Get perplexity scores given a list of inputs. Args: inputs (List[str]): A list of strings. mask_length (Optional[List[int]]): A list of mask lengths. If provided, the perplexity scores will be calculated with the first mask_length[i] tokens masked out. It's okay to skip its implementation if advanced features in PPLInfernecer is not needed. Returns: List[float]: A list of perplexity scores. """ if self.batch_padding and len(inputs) > 1: assert self.tokenizer.pad_token return self._get_ppl(inputs, mask_length=mask_length) else: return np.concatenate([ self._get_ppl(inputs=[text], mask_length=mask_length) for text in inputs ]) def _get_ppl(self, inputs: List[str], mask_length: Optional[List[int]] = None) -> List[float]: """Get perplexity scores given a list of inputs. Args: inputs (List[str]): A list of strings. mask_length (Optional[List[int]]): A list of mask lengths. If provided, the perplexity scores will be calculated with the first mask_length[i] tokens masked out. It's okay to skip its implementation if advanced features in PPLInfernecer is not needed. Returns: List[float]: A list of perplexity scores. """ outputs, inputs = self.get_logits(inputs) shift_logits = outputs[..., :-1, :].contiguous().float() shift_labels = inputs['tokens']['input_ids'][..., 1:].contiguous() loss_fct = torch.nn.CrossEntropyLoss( reduction='none', ignore_index=self.tokenizer.pad_token_id) loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)).view(shift_labels.size()) if mask_length is not None: mask = torch.zeros_like(shift_labels) # [batch,seqlen] for i in range(len(mask)): for j in range(mask_length[i] - 1, len(mask[i])): mask[i][j] = 1 loss = loss * mask lens = (inputs['tokens']['input_ids'] != self.tokenizer.pad_token_id).sum(-1).cpu().numpy() if mask_length is not None: lens -= np.array(mask_length) ce_loss = loss.float().sum(-1).cpu().detach().numpy() / lens return ce_loss def get_loglikelihood( self, inputs: List[str], conts: List[str], mask_length: Optional[List[int]] = None) -> List[float]: """Get loglikelihood scores given a list of inputs. Args: inputs (List[str]): A list of strings. conts (List[str]): A list of strings: slices after the space. NOT SUPPORT mask_length YET! mask_length (Optional[List[int]]): A list of mask lengths. If provided, the perplexity scores will be calculated with the first mask_length[i] tokens masked out. It's okay to skip its implementation if advanced features in PPLInfernecer is not needed. Returns: List[float]: A list of loglikelihood scores. """ assert mask_length is None, 'Not support mask_length yet.' if self.batch_padding and len(inputs) > 1: assert self.tokenizer.pad_token return self._get_loglikelihood(inputs, conts) else: return np.concatenate([ self._get_loglikelihood(inputs=[inputs[idx]], conts=[conts[idx]]) for idx in range(len(inputs)) ]) def _get_loglikelihood(self, inputs: str, conts: str) -> float: """Get loglikelihood scores given input string and continuation string. Args: inputs (str): string. conts (str): strings: slices after the space. Returns: float: loglikelihood scores. """ input_tokenizer_out = self.tokenizer(inputs, padding=True, truncation=False, return_length=True, return_tensors='pt').to( self.model.device) input_ids = input_tokenizer_out['input_ids'][:, :self.max_seq_len] input_length = input_tokenizer_out['length'] context_ids = [ self.tokenizer(inputs[i].replace(conts[i], ''), padding=False, truncation=True, max_length=self.max_seq_len)['input_ids'] for i in range(len(inputs)) ] # forward outputs = self.model(input_ids)['logits'] outputs = torch.nn.functional.log_softmax(outputs, dim=-1) # calculate loglikelihood answer = np.zeros(len(inputs)) for i in range(len(inputs)): if self.tokenizer.padding_side == 'right': cont_ids = input_ids[i, len(context_ids[i]):input_length[i]] logits = outputs[i, len(context_ids[i]) - 1:input_length[i] - 1, :] # noqa else: cont_ids = input_ids[i, len(context_ids[i]) - input_length[i]:] logits = outputs[i, len(context_ids[i]) - input_length[i] - 1:-1] # Reducing the dimension will lead to a wrong outcome logits_gather = torch.gather( logits.unsqueeze(0), 2, cont_ids.unsqueeze(0).unsqueeze(-1)) # [1, seq] # Answer: sum the likelihood of each token in continuation answer[i] = float(logits_gather.detach().cpu().sum()) return answer def get_mink_percent(self, inputs: List[str], k: int = 20) -> List[float]: """https://swj0419.github.io/detect-pretrain.github.io/""" if self.batch_padding and len(inputs) > 1: assert self.tokenizer.pad_token return self._get_mink_percent(inputs, k=k) else: return np.concatenate([ self._get_mink_percent(inputs=[text], k=k) for text in inputs ]) def _get_mink_percent(self, inputs: List[str], k: int = 20) -> List[float]: outputs, inputs = self.get_logits(inputs) shift_logits = outputs[:, :-1, :].contiguous().float() shift_labels = inputs['tokens']['input_ids'][:, 1:].contiguous() loss_fct = torch.nn.CrossEntropyLoss( reduction='none', ignore_index=self.tokenizer.pad_token_id) loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)).view(shift_labels.size()) lens = (inputs['tokens']['input_ids'] != self.tokenizer.pad_token_id).sum(-1).cpu().numpy() mink_percent = [] for nloss, nlen in zip(loss, lens): nlen = int(nlen) minklen = max(nlen * k // 100, 1) nloss = torch.topk(loss[-nlen:], minklen, dim=-1)[0] nloss = -nloss.float().mean().cpu().detach().numpy() mink_percent.append(nloss) return np.array(mink_percent) def get_token_len(self, prompt: str) -> int: """Get lengths of the tokenized strings. Args: prompt (str): Input string. Returns: int: Length of the input tokens """ return len(self.tokenizer.encode(prompt)) @MODELS.register_module() class HuggingFaceCausalLM(HuggingFace): """Model wrapper around HuggingFace CausalLM. Args: path (str): The name or path to HuggingFace's model. hf_cache_dir: Set the cache dir to HF model cache dir. If None, it will use the env variable HF_MODEL_HUB. Defaults to None. max_seq_len (int): The maximum length of the input sequence. Defaults to 2048. tokenizer_path (str): The path to the tokenizer. Defaults to None. tokenizer_kwargs (dict): Keyword arguments for the tokenizer. Defaults to {}. peft_path (str, optional): The name or path to the HuggingFace's PEFT model. If None, the original model will not be converted to PEFT. Defaults to None. tokenizer_only (bool): If True, only the tokenizer will be initialized. Defaults to False. model_kwargs (dict): Keyword arguments for the model, used in loader. Defaults to dict(device_map='auto'). meta_template (Dict, optional): The model's meta prompt template if needed, in case the requirement of injecting or wrapping of any meta instructions. batch_padding (bool): If False, inference with be performed in for-loop without batch padding. """ def _load_model(self, path: str, model_kwargs: dict, peft_path: Optional[str] = None): from transformers import AutoModelForCausalLM self._set_model_kwargs_torch_dtype(model_kwargs) self.model = AutoModelForCausalLM.from_pretrained(path, **model_kwargs) if peft_path is not None: from peft import PeftModel self.model = PeftModel.from_pretrained(self.model, peft_path, is_trainable=False) self.model.eval() self.model.generation_config.do_sample = False class HuggingFaceChatGLM3(HuggingFace): """Model wrapper around HuggingFace's ChatGLM3. Details available in `https://huggingface.co/THUDM/chatglm3-6b`. model.chat() is used for inference. """ def __init__(self, path: str, hf_cache_dir: Optional[str] = None, max_seq_len: int = 2048, tokenizer_path: Optional[str] = None, tokenizer_kwargs: dict = dict(), peft_path: Optional[str] = None, tokenizer_only: bool = False, model_kwargs: dict = dict(device_map='auto'), generation_kwargs: dict = dict(), meta_template: Optional[Dict] = None, extract_pred_after_decode: bool = False, batch_padding: bool = False, pad_token_id: Optional[int] = None, mode: str = 'none', num_extra_tokens: int = 50): super().__init__(path=path, hf_cache_dir=hf_cache_dir, max_seq_len=max_seq_len, tokenizer_path=tokenizer_path, tokenizer_kwargs=tokenizer_kwargs, peft_path=peft_path, tokenizer_only=tokenizer_only, generation_kwargs=generation_kwargs, model_kwargs=model_kwargs, meta_template=meta_template, extract_pred_after_decode=extract_pred_after_decode, batch_padding=batch_padding, pad_token_id=pad_token_id, mode=mode) self.template_parser = APITemplateParser(meta_template) # used to compensate for #tokens occupied by sth like system prompt self.num_extra_tokens = num_extra_tokens def generate(self, inputs: List[str or PromptList], max_out_len: int = 512, skip_overlength=False, **kwargs) -> str: """Generate response from input prompt. Args: inputs (list): input prompt max_out_len (int): max output length """ generation_kwargs = kwargs.copy() generation_kwargs.update(self.generation_kwargs) responses = [] for _input in inputs: assert isinstance(_input, (str, PromptList)) if isinstance(_input, str): history = [{'role': 'user', 'content': _input}] else: history = [] for item in _input: msg = { 'content': item['prompt'], 'role': { 'HUMAN': 'user', 'BOT': 'assistant', 'SYSTEM': 'system', }[item['role'].upper()] } history.append(msg) user_content = history[-1]['content'] history = history[:-1] if skip_overlength: # The model will report the following error # if the sequence length is greater than the maximum length: # "Input length of input_ids is {INPUT_IDS}, # but `max_length` is set to 8192. # This can lead to unexpected behavior. # You should consider increasing `max_new_tokens`." # The following hardcode can fix this exception. len_user_content = len(self.tokenizer.encode(user_content)) if len_user_content > 8192: responses.append('') continue response, history = self.model.chat(self.tokenizer, user_content, history=history, max_new_tokens=max_out_len, **generation_kwargs) # response will be dict sometime if isinstance(response, dict): response = response.get('content', '') responses.append(response) return responses def get_token_len(self, prompt: str) -> int: return len(self.tokenizer.encode(prompt)) + self.num_extra_tokens