import gradio as gr import json import pandas as pd import config from vocab import load_tokener from utils.character_util import iter_vocab from utils.log_util import logger from utils.compression_util import tokenize_corpus, unit_convertor from functools import lru_cache @lru_cache def tokenize(text, tokenizer_name, color_num=5): """ """ logger.info("param=" + json.dumps({"text": text, "tokenizer_type": tokenizer_name}, ensure_ascii=False)) pos_tokens = [] tokenizer = load_tokener(tokenizer_name) if config.ADD_SPECIAL_TOKEN: encoding = tokenizer.encode(text, add_special_tokens=True) else: encoding = tokenizer.encode(text, add_special_tokens=False) table = [] for idx, token_id in enumerate(encoding): decode_text = tokenizer.decode([token_id]) # 特殊字符解码后会统一变成 �,对应 "\ufffd" pos_tokens.extend([(decode_text, str(idx % color_num))]) # token "Byte": # 这是 utf-8编码吧? token = tokenizer.convert_ids_to_tokens([token_id], skip_special_tokens=False)[0] if isinstance(token, bytes): try: token_str = token.decode("utf-8") except: token_str = token.decode("utf-8", errors="ignore") logger.error(f"{idx}: decode_error: " + json.dumps( # gpt_35_turbo 经常有token会decode error,这里用来记录一下 {"tokenizer_type": tokenizer_name, "token": str(token), "token_str": token_str}, ensure_ascii=False)) token_bytes = token # json_dumps = json.dumps(token_str) elif isinstance(token, str): token_str = token token_bytes = bytes(token_str, "utf-8") # json_dumps = json.dumps(token_str) else: logger.error(f"{idx}: wrong type for token {token_id} {type(token)} " + json.dumps({"text": text, "tokenizer_type": tokenizer_name}, ensure_ascii=False)) token_str = token token_bytes = token # continue # ⭐ # TODO: gpt3.5_turbo错误: 只有id和text是对的,token和 utf8都是错的。说明 convert_ids_to_tokens 出错了。 table.append( {"TokenID": token_id, "Token": token_str, # utf-8解码后的字符串,为什么有些是 <0xE7>,表示什么?比如llama "Text": decode_text, # # "Bytes": token_bytes, # bytes类型在gradio前端页面被解码成字符串,比如 b'\xe4\xb8\xad' 仍然显示成 "中"。因此 str(token_bytes) "UTF8 Bytes": str(token_bytes), # "Unicode": json_dumps # unicode, 如果是ascii码,就直接显示。如果不是ascii码,就显示unicode } ) table_df = pd.DataFrame(table) logger.info(f"tokenizer_type={tokenizer_name}, Tokens={table[:4]}") # print(table_df) return gr.update(value=pos_tokens, label=f"Tokens: {len(encoding)}"), table_df @lru_cache def tokenize_pair(text, tokenizer_type_1, tokenizer_type_2): """ input_text.change """ pos_tokens_1, table_df_1 = tokenize(text, tokenizer_type_1) pos_tokens_2, table_df_2 = tokenize(text, tokenizer_type_2) return pos_tokens_1, table_df_1, pos_tokens_2, table_df_2 @lru_cache def basic_count(tokenizer_name): stats = iter_vocab(tokenizer_name) return stats['vocab_size'], f'{stats["中文token数"]}' # return tokenizer.vocab_size, f'{stats["中文汉字数"]["中文单字"]}/{stats["中文汉字数"]["中文多字"]}' def get_compress_rate(tokenizer_type, all_corpus, unit): tokenizer = load_tokener(tokenizer_type) compress_rate_stats = tokenize_corpus(tokenizer, all_corpus) compress_rate = unit_convertor(compress_rate_stats, unit) return compress_rate # def get_all_compress_rate(corpuses, unit): # stats = {} # for lang in corpuses: # print("###" * 10 + lang) # for tokenizer_name in tokenizers: # tokenizer = load_tokener(tokenizer_name) # stat = tokenize_corpus(tokenizer, [lang]) # stats[tokenizer_name] = stat # pprint(stats) @lru_cache def get_overlap_token_size(tokenizer_type_1, tokenizer_type_2): tokenizer1 = load_tokener(tokenizer_type_1) tokenizer2 = load_tokener(tokenizer_type_2) vocab_set_1 = tokenizer1.get_vocab().keys() vocab_set_2 = tokenizer2.get_vocab().keys() token1 = next(iter(vocab_set_1)) token2 = next(iter(vocab_set_2)) if type(token1) != type(token2): # bytes str if isinstance(token1, str): vocab_set_1 = set([token.encode("utf-8") for token in vocab_set_1]) if isinstance(token2, str): vocab_set_2 = set([token.encode("utf-8") for token in vocab_set_2]) overlap_tokens = vocab_set_1 & vocab_set_2 overlap_token_size = len(overlap_tokens) logger.info( f"{overlap_token_size} OverlapTokens of {tokenizer_type_1} {tokenizer_type_2}: {list(overlap_tokens)[:10]}") return overlap_token_size, overlap_token_size default_user_input = """ “We apologize for any inconvenience and concern this may have caused to our customers and all concerned. We pray for the rest of the souls of those who lost their lives aboard the Japanese Coast Guard's equipment and extend our condolences to the bereaved families,” he said. Steenvliegen of oevervliegen[2] (Plecoptera) zijn een kleine orde van gevleugelde insecten. Steenvliegen zijn te herkennen aan hun slanke, langwerpige lichaamsvorm en de doorzichtige vleugels die in rust plat op de rug worden gehouden. def load_image_file(file, mode='RGB'): im = PIL.Image.open(file) if mode: im = im.convert(mode) return np.array(im) \section{The expected number of intervening \mbox{H\,{\sc i}} absorbers}\label{section:expected_number} \begin{equation}\label{equation:expected_number} \mu = \iint{f(N_{\rm HI},X)\,\mathrm{d}X\,\mathrm{d}N_{\rm HI}}, \end{equation} Eerder noemde De Meij Oud en Nieuw "een soort oorlogsgebied". En hij heeft dan ook geen zin in de nieuwjaarsnacht. "Als je weet dat er collega's gewond gaan raken, kan je niet meer zeggen: het is mooi politiewerk en we gaan naar een spannende nacht. Het zijn gewoon risico's die je niet wil lopen." 华为发布Mate60手机 ラグビーワールドカップ2023フランス""" default_tokenizer_type_1 = "dutch_llama_tokenizer" # default_tokenizer_type_2 = "internlm_chat_7b" default_tokenizer_type_2 = "mistral_7b" def on_load(url_params, request: gr.Request): """ onLoad """ text = None tokenizer_type_1 = None tokenizer_type_2 = None try: url_params = json.loads(url_params) except: url_params = {} if request: logger.info(str(request.headers)) client_ip = request.client.host # local_ip = socket.gethostbyname(socket.gethostbyname("")) # headers = request.kwargs['headers'] # if headers and 'x-forwarded-for' in headers: # x_forwarded_for = headers['x-forwarded-for'] # client_ip = x_forwarded_for.split(' ')[0] if x_forwarded_for else "" # if "referer" in request.headers: # not work for huggingface-space # url_params = parse_qs(urlparse(request.headers["referer"]).query) # url_params = {k: v[0] for k, v in url_params.items() if len(v) > 0} tokenizer_type_1 = url_params.get("tokenizer1", config.default_tokenizer_type_1) tokenizer_type_2 = url_params.get("tokenizer2", config.default_tokenizer_type_2) text = url_params.get("text", config.default_user_input) logger.info(f"client_ip: {client_ip}; params: {url_params}") return text, tokenizer_type_1, tokenizer_type_2 def compress_rate_unit_change(unit): return gr.update(label=f"Compress Rate: {unit}"), gr.update(label=f"Compress Rate: {unit}"), def test_coding(): bytes1 = b'\xe4\xb8\xad' print(bytes1) # b'\xe4\xb8\xad' if __name__ == "__main__": print(get_overlap_token_size("gpt_35_turbo", "gpt_4")) # print(basic_count("internlm_chat_7b"))