|
from typing import Dict, Iterable, List, Optional, Union |
|
|
|
import numpy as np |
|
import torch.distributed as dist |
|
|
|
from opencompass.models.base import BaseModel |
|
from opencompass.models.base_api import APITemplateParser |
|
from opencompass.utils.logging import get_logger |
|
from opencompass.utils.prompt import PromptList |
|
|
|
PromptType = Union[PromptList, str] |
|
|
|
|
|
class LLaMA2AccessoryModel(BaseModel): |
|
"""LLaMA2-Accessory model wrapper. |
|
|
|
Project: https://github.com/Alpha-VLLM/LLaMA2-Accessory |
|
|
|
Args: |
|
tokenizer_only (bool): whether to load tokenizer only |
|
meta_template (dict): meta template for the model |
|
additional_stop_symbols: (Iterable[str]): additional symbols that mark |
|
the end of generation, e.g. the "###" symbol for separating turns |
|
in the chat template. |
|
from_pretrained_kwargs: kwargs that will be passed to |
|
`accessory.MetaModel.from_pretrained` for model instantiation. |
|
""" |
|
|
|
def __init__(self, |
|
tokenizer_only: bool = False, |
|
meta_template: Optional[Dict] = None, |
|
additional_stop_symbols: Iterable[str] = (), |
|
**from_pretrained_kwargs): |
|
if tokenizer_only: |
|
self._load_tokenizer(from_pretrained_kwargs) |
|
else: |
|
self._load_model(from_pretrained_kwargs) |
|
|
|
self.additional_stop_symbols = additional_stop_symbols |
|
self.max_seq_len = from_pretrained_kwargs.get('max_seq_len', 4096) |
|
self.template_parser = APITemplateParser(meta_template) |
|
self.logger = get_logger() |
|
|
|
def _load_model(self, from_pretrained_kwargs): |
|
from accessory.model.meta import MetaModel |
|
from accessory.util.misc import init_distributed_mode |
|
if not dist.is_initialized(): |
|
init_distributed_mode() |
|
|
|
model_parallel_group = dist.GroupMember.WORLD |
|
from_pretrained_kwargs['mp_group'] = model_parallel_group |
|
|
|
self.model = MetaModel.from_pretrained(**from_pretrained_kwargs) |
|
self.tokenizer = self.model.tokenizer |
|
self.logger = get_logger() |
|
|
|
def _load_tokenizer(self, from_pretrained_kwargs): |
|
from accessory.model.tokenizer import ( |
|
Tokenizer, probe_tokenizer_path_from_pretrained) |
|
if 'tokenizer_path' in from_pretrained_kwargs: |
|
tokenizer_path = from_pretrained_kwargs['tokenizer_path'] |
|
else: |
|
pretrained_path = from_pretrained_kwargs['pretrained_path'] |
|
if isinstance(pretrained_path, str): |
|
pretrained_path = [pretrained_path] |
|
tokenizer_path = probe_tokenizer_path_from_pretrained( |
|
pretrained_path[-1]) |
|
|
|
self.tokenizer = Tokenizer(tokenizer_path) |
|
|
|
def generate(self, inputs: List[str], max_out_len: int) -> List[str]: |
|
results = self.model.generate( |
|
prompts=inputs, |
|
max_gen_len=max_out_len, |
|
temperature=0., |
|
additional_stop_symbols=self.additional_stop_symbols) |
|
return results |
|
|
|
def get_ppl(self, |
|
inputs: List[str], |
|
mask_length: Optional[List[int]] = None): |
|
assert mask_length is None, 'mask_length is not supported' |
|
evaluation_results = self.model.evaluate_examples(examples=inputs) |
|
ppl = evaluation_results['ppl'] |
|
return np.array(ppl, dtype=np.float32) |
|
|
|
def get_token_len(self, prompt: str) -> int: |
|
return len(self.tokenizer.encode(prompt, True, True)) |
|
|