Phi-3-small-8k-instruct / tokenization_phi3_small.py
nguyenbh's picture
Update tokenization_phi3_small.py (#29)
1535ae2 verified
raw
history blame
12.9 kB
# Adapted from https://huggingface.co/Qwen/Qwen-7B-Chat/blob/main/tokenization_qwen.py
import os
from typing import Collection, List, Optional, Dict, Set, Tuple, Union
from functools import cached_property
import base64
import requests
from transformers import PreTrainedTokenizer, AddedToken, AutoConfig
from transformers.models.auto.tokenization_auto import get_tokenizer_config
import tiktoken
"""
This tokenizer is almost identical to tiktoken.get_encoding("cl100k_base")
with a few additional special tokens to support the ChatML format.
TODO(bapatra): Right now, I do not save the special tokens to the vocab file.
Maybe in the future, that would be useful? Can add that support later.
"""
def _load_tiktoken_bpe(tiktoken_bpe_file: str) -> Dict[bytes, int]:
with open(tiktoken_bpe_file, "rb") as f:
contents = f.read()
return {
base64.b64decode(token): int(rank)
for token, rank in (line.split() for line in contents.splitlines() if line)
}
# On the megatron codebase, we pad vocabularies to ensure matrix multiplication is fast.
# this in turn causes some indices to be empty. We account for these empty indices by adding
# dummy tokens to the tokenizer.
EFFECTIVE_PADDED_VOCAB_SIZE = 100352
ACTUAL_VOCAB_SIZE = 100276
DUMMY_TOKENS = {
f"<|dummy_id_{11 + offset}|>": 100276 + offset
for offset in range(1, EFFECTIVE_PADDED_VOCAB_SIZE - ACTUAL_VOCAB_SIZE)
}
SPECIAL_TOKENS = {
# tiktoken.get_encoding("cl100k_base")._special_tokens
'<|endoftext|>': 100257,
'<|fim_prefix|>': 100258,
'<|fim_middle|>': 100259,
'<|fim_suffix|>': 100260,
# Special tokens for post-training
"<|system|>": 100261,
"<|user|>": 100262,
"<|assistant|>": 100263,
# Dummy unused tokens
"<|dummy_id_0|>": 100264,
"<|dummy_id_1|>": 100265,
# Special tokens for post-training continued
"<|end|>": 100266,
# Some dummy tokens, so that tokenization is contiguous and does not cause issues
# Note that the 100256th token of tiktoken.get_encoding("cl100k_base") does not
# actually map to anything. So we use a dummy token here.
"<|dummy_id_2|>": 100256,
# Likewise, tokens from 100267 to 100275 are also unused
"<|dummy_id_3|>": 100267,
"<|dummy_id_4|>": 100268,
"<|dummy_id_5|>": 100269,
"<|dummy_id_6|>": 100270,
"<|dummy_id_7|>": 100271,
"<|dummy_id_8|>": 100272,
"<|dummy_id_9|>": 100273,
"<|dummy_id_10|>": 100274,
"<|dummy_id_11|>": 100275,
# The final end of prompt token
# (unused, but present as a part of tiktoken.get_encoding("cl100k_base")._special_tokens)
'<|endofprompt|>': 100276,
# Dummy tokens to account for padding of the tokenizer
# We pad to ensure tensor cores are used for vocab multiplication
**DUMMY_TOKENS
}
class Phi3SmallTokenizer(PreTrainedTokenizer):
vocab_files_names = {
"vocab_file": "cl100k_base.tiktoken"
}
model_input_names: List[str] = ["input_ids", "attention_mask"]
padding_side = "left"
def __init__(
self,
vocab_file: Optional[str] = None,
errors: str = "replace",
**kwargs
) -> None:
# PreTrainedTokenizer's init calls _add_tokens, which in turn checks
# if the token is present in `self.special_tokens``. Hence instantiating it here.
# The way Qwen gets around this is by checking against SPECIAL_TOKENS
# But I think it's better to check against the objects own `special_tokens`
# in case we eventually want to allow the tokenizer to have special tokens.
self.special_tokens = SPECIAL_TOKENS
super().__init__(**kwargs)
self.errors = errors
try:
base = tiktoken.get_encoding("cl100k_base")
# This deals with the scenario where user has restricted internet access
# and thus fails to download the tokenizer file from https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken
# It is assumed that user should be able to access files on huggingface hub.
except requests.RequestException:
import hashlib
from transformers.utils import cached_file
cached_tokenizer_path = cached_file(
"microsoft/Phi-3-small-8k-instruct",
"cl100k_base.tiktoken",
_raise_exceptions_for_gated_repo=False,
_raise_exceptions_for_missing_entries=False,
_raise_exceptions_for_connection_errors=False
)
tiktoken_cache_dir = os.path.dirname(cached_tokenizer_path)
tiktoken_cache_path = os.path.join(
tiktoken_cache_dir,
hashlib.sha1("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken".encode()).hexdigest()
)
if not os.path.exists(tiktoken_cache_path):
os.rename(cached_tokenizer_path, tiktoken_cache_path)
os.environ["TIKTOKEN_CACHE_DIR"] = tiktoken_cache_dir
base = tiktoken.get_encoding("cl100k_base")
if vocab_file is None:
self.mergeable_ranks: Dict[bytes, int] = base._mergeable_ranks
else:
self.mergeable_ranks = _load_tiktoken_bpe(vocab_file)
self.pat_str = base._pat_str
enc = tiktoken.Encoding(
name="phi3small",
pat_str=self.pat_str,
mergeable_ranks=self.mergeable_ranks,
special_tokens=self.special_tokens,
)
self.tokenizer = enc
self.decoder: Dict[int, bytes] = {
v: k for k, v in self.mergeable_ranks.items()
}
self.decoder.update({v: k for k, v in self.special_tokens.items()})
self.eod_id = self.tokenizer.eot_token
self._eos_token = self._convert_id_to_token(self.eod_id)
# Setting the bos_token to be the same as the eos_token
# Note that this is **not** the correct thing to do, and is done
# just so that some of the downstream libraries do not break.
self._bos_token = self._eos_token
# Assign the special tokens to class variables
self.system_id = self.special_tokens["<|system|>"]
self.user_id = self.special_tokens["<|user|>"]
self.assistant_id = self.special_tokens["<|assistant|>"]
self.end_id = self.special_tokens["<|end|>"]
@cached_property
def dummy_token_indices(self) -> List[int]:
# There are some additional special tokens in the cl100k_base tokenizer
# that we do not use. Hence, we also consider them to be dummy tokens.
additional_tokens = [
"<|fim_prefix|>",
"<|fim_middle|>",
"<|fim_suffix|>",
"<|endofprompt|>"
]
dummy_token_indices = [index for token, index in self.special_tokens.items() if "dummy_id" in token]
dummy_token_indices.extend([self.special_tokens[token] for token in additional_tokens])
return sorted(dummy_token_indices)
def __getstate__(self):
state = self.__dict__.copy()
del state["tokenizer"]
return state
def __setstate__(self, state):
self.__dict__ = state
enc = tiktoken.Encoding(
name="cl100k_im",
pat_str=self.pat_str,
mergeable_ranks=self.mergeable_ranks,
special_tokens=self.special_tokens,
)
self.tokenizer = enc
def __len__(self):
return self.tokenizer.n_vocab
@classmethod
def from_pretrained(
cls,
pretrained_model_name_or_path: Union[str, os.PathLike],
*init_inputs,
**kwargs,
):
cls_kwargs = kwargs
# First try to load from the tokenization config if it exists
tokenization_config = get_tokenizer_config(pretrained_model_name_or_path, **kwargs)
if tokenization_config:
cls_kwargs = {
**tokenization_config,
**cls_kwargs
}
else:
config = AutoConfig.from_pretrained(pretrained_model_name_or_path, trust_remote_code=True)
cls_kwargs["model_max_length"] = config.max_position_embeddings
return cls(**cls_kwargs)
def get_vocab(self) -> Dict[Union[str, bytes], int]:
return {**self.mergeable_ranks, **self.special_tokens}
def convert_tokens_to_ids(
self,
tokens: Union[bytes, str, List[Union[bytes, str]]]
) -> Union[int, List[int]]:
ids = []
if isinstance(tokens, (str, bytes)):
if tokens in self.special_tokens:
return self.special_tokens[tokens]
else:
return self.mergeable_ranks.get(tokens)
ids: List[int] = []
for token in tokens:
ids.append(self.convert_tokens_to_ids(token))
return ids
def _add_tokens(
self,
new_tokens: Union[List[str], List[AddedToken]],
special_tokens: bool = False,
) -> int:
if not special_tokens and new_tokens:
raise ValueError("Only special tokens can be added to this tokenizer")
for token in new_tokens:
surface_form = token.content if isinstance(token, AddedToken) else token
if surface_form not in self.special_tokens:
raise ValueError(
"For now, we do not support unknown special tokens\n"
"In the future, if there is a need for this, we can add special tokens to the tokenizer\n"
"starting from rank 100261 - 100263 and then 100266 - 100275.\n"
"And finally, we can re-construct the enc object back\n"
)
return 0
def save_vocabulary(self, save_directory: str, **kwargs) -> Tuple[str]:
file_path = os.path.join(save_directory, "cl100k_base.tiktoken")
with open(file_path, "w") as f:
for token, rank in self.mergeable_ranks.items():
line = base64.b64encode(token).decode("utf-8") + " " + str(rank) + "\n"
f.write(line)
return (file_path,)
def tokenize(
self,
text: str,
allowed_special: Union[Set, str] = "all",
disallowed_special: Union[Collection, str] = (),
**kwargs
) -> List[Union[bytes, str]]:
tokens: List[Union[bytes, str]] = []
for token_id in self.tokenizer.encode(
text, allowed_special=allowed_special, disallowed_special=disallowed_special
):
tokens.append(self.decoder[token_id])
return tokens
def convert_tokens_to_string(self, tokens: List[Union[bytes, str]]) -> str:
"""
Converts a sequence of tokens in a single string.
"""
text = ""
temp = b""
for t in tokens:
if isinstance(t, str):
if temp:
text += temp.decode("utf-8", errors=self.errors)
temp = b""
text += t
elif isinstance(t, bytes):
temp += t
else:
raise TypeError("token should only be of type types or str")
if temp:
text += temp.decode("utf-8", errors=self.errors)
return text
@property
def vocab_size(self):
return self.tokenizer.n_vocab
@property
def eos_token_id(self) -> int:
return self.eod_id
def _convert_id_to_token(self, index: int) -> Union[bytes, str]:
"""Converts an id to a token, special tokens included"""
if index in self.decoder:
return self.decoder[index]
raise ValueError("unknown ids")
def _convert_token_to_id(self, token: Union[bytes, str]) -> int:
"""Converts a token to an id using the vocab, special tokens included"""
if token in self.special_tokens:
return self.special_tokens[token]
if token in self.mergeable_ranks:
return self.mergeable_ranks[token]
raise ValueError("unknown token")
def _tokenize(self, text: str, **kwargs):
"""
Converts a string in a sequence of tokens (string), using the tokenizer. Split in words for word-based
vocabulary or sub-words for sub-word-based vocabularies (BPE/SentencePieces/WordPieces).
Do NOT take care of added tokens.
"""
raise NotImplementedError
def _decode(
self,
token_ids: Union[int, List[int]],
skip_special_tokens: bool = False,
errors: str = None,
**kwargs,
) -> str:
if isinstance(token_ids, int):
token_ids = [token_ids]
if skip_special_tokens:
token_ids = [i for i in token_ids if i < self.eod_id]
return self.tokenizer.decode(token_ids, errors=errors or self.errors)