|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Tokenization Fast class for InternLM.""" |
|
import os |
|
from shutil import copyfile |
|
from typing import Any, Dict, Optional, Tuple |
|
|
|
from tokenizers import Tokenizer, decoders, normalizers, processors |
|
from tokenizers.models import BPE |
|
from transformers.convert_slow_tokenizer import ( |
|
SLOW_TO_FAST_CONVERTERS, |
|
SentencePieceExtractor, |
|
SpmConverter, |
|
) |
|
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast |
|
from transformers.utils import logging |
|
|
|
from .tokenization_internlm2 import InternLM2Tokenizer |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
VOCAB_FILES_NAMES = {"vocab_file": "./tokenizer.model"} |
|
|
|
|
|
|
|
class InternLM2Converter(SpmConverter): |
|
""" |
|
Fast tokenizer converter for InternLM2. |
|
""" |
|
|
|
handle_byte_fallback = True |
|
|
|
def vocab(self, proto): |
|
vocab = [ |
|
("<unk>", 0.0), |
|
("<s>", 0.0), |
|
("</s>", 0.0), |
|
] |
|
vocab += [(piece.piece, piece.score) for piece in proto.pieces[3:]] |
|
return vocab |
|
|
|
def unk_id(self, proto): |
|
unk_id = 0 |
|
return unk_id |
|
|
|
def decoder(self, replacement, add_prefix_space): |
|
decoders_sequence = [ |
|
decoders.Replace("▁", " "), |
|
decoders.ByteFallback(), |
|
decoders.Fuse(), |
|
] |
|
if self.proto.normalizer_spec.add_dummy_prefix: |
|
decoders_sequence.append(decoders.Strip(content=" ", left=1)) |
|
return decoders.Sequence(decoders_sequence) |
|
|
|
def tokenizer(self, proto): |
|
model_type = proto.trainer_spec.model_type |
|
vocab_scores = self.vocab(proto) |
|
|
|
added_tokens = self.original_tokenizer.added_tokens_decoder |
|
for i in range(len(vocab_scores)): |
|
_, score = vocab_scores[i] |
|
if i in added_tokens: |
|
vocab_scores[i] = (added_tokens[i].content, score) |
|
if model_type == 1: |
|
raise RuntimeError("InternLM2 is supposed to be a BPE model!") |
|
|
|
elif model_type == 2: |
|
_, merges = SentencePieceExtractor(self.original_tokenizer.vocab_file).extract(vocab_scores) |
|
bpe_vocab = {word: i for i, (word, _score) in enumerate(vocab_scores)} |
|
tokenizer = Tokenizer( |
|
BPE(bpe_vocab, merges, unk_token=proto.trainer_spec.unk_piece, fuse_unk=True, byte_fallback=True) |
|
) |
|
tokenizer.add_special_tokens([added_token for index, added_token in added_tokens.items()]) |
|
else: |
|
raise Exception( |
|
"You're trying to run a `Unigram` model but you're file was trained with a different algorithm" |
|
) |
|
|
|
return tokenizer |
|
|
|
def normalizer(self, proto): |
|
normalizers_list = [] |
|
if proto.normalizer_spec.add_dummy_prefix: |
|
normalizers_list.append(normalizers.Prepend(prepend="▁")) |
|
normalizers_list.append(normalizers.Replace(pattern=" ", content="▁")) |
|
return normalizers.Sequence(normalizers_list) |
|
|
|
def pre_tokenizer(self, replacement, add_prefix_space): |
|
return None |
|
|
|
|
|
SLOW_TO_FAST_CONVERTERS["InternLM2Tokenizer"] = InternLM2Converter |
|
|
|
|
|
|
|
class InternLM2TokenizerFast(PreTrainedTokenizerFast): |
|
""" |
|
Fast tokenizer for InternLM2. |
|
""" |
|
|
|
vocab_files_names = VOCAB_FILES_NAMES |
|
slow_tokenizer_class = InternLM2Tokenizer |
|
padding_side = "left" |
|
model_input_names = ["input_ids", "attention_mask"] |
|
_auto_class = "AutoTokenizer" |
|
|
|
def __init__( |
|
self, |
|
vocab_file, |
|
unk_token="<unk>", |
|
bos_token="<s>", |
|
eos_token="</s>", |
|
pad_token="</s>", |
|
sp_model_kwargs: Optional[Dict[str, Any]] = None, |
|
add_bos_token=True, |
|
add_eos_token=False, |
|
decode_with_prefix_space=False, |
|
clean_up_tokenization_spaces=False, |
|
**kwargs, |
|
): |
|
super().__init__( |
|
vocab_file=vocab_file, |
|
unk_token=unk_token, |
|
bos_token=bos_token, |
|
eos_token=eos_token, |
|
pad_token=pad_token, |
|
sp_model_kwargs=sp_model_kwargs, |
|
add_bos_token=add_bos_token, |
|
add_eos_token=add_eos_token, |
|
decode_with_prefix_space=decode_with_prefix_space, |
|
clean_up_tokenization_spaces=clean_up_tokenization_spaces, |
|
**kwargs, |
|
) |
|
self._add_bos_token = add_bos_token |
|
self._add_eos_token = add_eos_token |
|
self.update_post_processor() |
|
self.vocab_file = vocab_file |
|
|
|
@property |
|
def can_save_slow_tokenizer(self) -> bool: |
|
return os.path.isfile(self.vocab_file) if self.vocab_file else False |
|
|
|
def update_post_processor(self): |
|
""" |
|
Updates the underlying post processor with the current `bos_token` and `eos_token`. |
|
""" |
|
bos = self.bos_token |
|
bos_token_id = self.bos_token_id |
|
if bos is None and self.add_bos_token: |
|
raise ValueError("add_bos_token = True but bos_token = None") |
|
|
|
eos = self.eos_token |
|
eos_token_id = self.eos_token_id |
|
if eos is None and self.add_eos_token: |
|
raise ValueError("add_eos_token = True but eos_token = None") |
|
|
|
single = f"{(bos+':0 ') if self.add_bos_token else ''}$A:0{(' '+eos+':0') if self.add_eos_token else ''}" |
|
pair = ( |
|
f"{single}{(' '+bos+':1') if self.add_bos_token else ''} $B:1{(' '+eos+':1') if self.add_eos_token else ''}" |
|
) |
|
|
|
special_tokens = [] |
|
if self.add_bos_token: |
|
special_tokens.append((bos, bos_token_id)) |
|
if self.add_eos_token: |
|
special_tokens.append((eos, eos_token_id)) |
|
self._tokenizer.post_processor = processors.TemplateProcessing( |
|
single=single, pair=pair, special_tokens=special_tokens |
|
) |
|
|
|
@property |
|
def add_eos_token(self): |
|
return self._add_eos_token |
|
|
|
@property |
|
def add_bos_token(self): |
|
return self._add_bos_token |
|
|
|
@add_eos_token.setter |
|
def add_eos_token(self, value): |
|
self._add_eos_token = value |
|
self.update_post_processor() |
|
|
|
@add_bos_token.setter |
|
def add_bos_token(self, value): |
|
self._add_bos_token = value |
|
self.update_post_processor() |
|
|
|
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]: |
|
if not self.can_save_slow_tokenizer: |
|
raise ValueError( |
|
"Your fast tokenizer does not have the necessary information to save the vocabulary for a slow " |
|
"tokenizer." |
|
) |
|
|
|
if not os.path.isdir(save_directory): |
|
logger.error(f"Vocabulary path ({save_directory}) should be a directory") |
|
return |
|
out_vocab_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["vocab_file"] |
|
) |
|
|
|
if os.path.abspath(self.vocab_file) != os.path.abspath(out_vocab_file): |
|
copyfile(self.vocab_file, out_vocab_file) |
|
|
|
return (out_vocab_file,) |
|
|