Spaces:
Runtime error
Runtime error
import os | |
import torch | |
import json | |
import gc | |
import time | |
from unsloth import FastLanguageModel | |
from transformers import TextIteratorStreamer | |
from threading import Thread | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
tokenizer = None | |
model = None | |
default_cfg = { | |
'model_name': "unsloth/gemma-2-9b-it-bnb-4bit", | |
'dtype': None, | |
'instruction': None, | |
'inst_template': None, | |
'chat_template': None, | |
'max_length': 2400, | |
'max_seq_length': 2048, | |
'max_new_tokens': 512, | |
'temperature': 0.9, | |
'top_p': 0.95, | |
'top_k': 40, | |
'repetition_penalty': 1.2, | |
} | |
cfg = default_cfg.copy() | |
def load_model(model_name, dtype): | |
global tokenizer, model, cfg | |
if cfg['model_name'] == model_name and cfg['dtype'] == dtype: | |
return | |
del model | |
del tokenizer | |
model = None | |
tokenizer = None | |
gc.collect() | |
torch.cuda.empty_cache() | |
model, tokenizer = FastLanguageModel.from_pretrained( | |
model_name, | |
max_seq_length = cfg['max_seq_length'], | |
dtype = torch.bfloat16, | |
load_in_8bit = (dtype == '8bit'), | |
load_in_4bit = (dtype == '4bit'), | |
) | |
FastLanguageModel.for_inference(model) | |
cfg['model_name'] = model_name | |
cfg['dtype'] = dtype | |
def clear_config(): | |
global cfg | |
cfg = default_cfg.copy() | |
def set_config(model_name, dtype, instruction, inst_template, chat_template, max_new_tokens, temperature, top_p, top_k, repetition_penalty): | |
global cfg | |
load_model(model_name, dtype) | |
cfg.update({ | |
'instruction': instruction, | |
'inst_template': inst_template, | |
'chat_template': chat_template, | |
'max_new_tokens': int(max_new_tokens), | |
'temperature': float(temperature), | |
'top_p': float(top_p), | |
'top_k': int(top_k), | |
'repetition_penalty': float(repetition_penalty), | |
}) | |
return 'done.' | |
def set_config_args(args): | |
global cfg | |
load_model(args['model_name'], args['dtype']) | |
cfg.update(args) | |
return 'done.' | |
def chatinterface_to_messages(message, history): | |
global cfg | |
messages = [] | |
if cfg['instruction']: | |
messages.append({'role': 'user', 'content': cfg['instruction']}) | |
messages.append({'role': 'assistant', 'content': 'I understand.'}) | |
for pair in history: | |
[user, assistant] = pair | |
if user: | |
messages.append({'role': 'user', 'content': user}) | |
if assistant: | |
messages.append({'role': 'assistant', 'content': assistant}) | |
if message: | |
messages.append({'role': 'user', 'content': message}) | |
return messages | |
def apply_template(message, history, args): | |
global tokenizer, cfg | |
if 'input' in args: | |
message = args['input'] | |
if 'instruction' in args: | |
cfg['instruction'] = args['instruction'] | |
if 'messages' in args: | |
messages = args['messages'] | |
elif history: | |
messages = chatinterface_to_messages(message, history) | |
else: | |
messages = {} | |
if cfg['chat_template']: | |
tokenizer.chat_template = cfg['chat_template'] | |
if message: | |
if cfg['inst_template']: | |
return cfg['inst_template'].format(instruction=cfg['instruction'], input=message) | |
if cfg['instruction']: | |
messages = [ | |
{'role': 'user', 'content': cfg['instruction']}, | |
{'role': 'assistant', 'content': 'I understand.'}, | |
{'role': 'user', 'content': message}, | |
] | |
else: | |
messages = [ | |
{'role': 'user', 'content': message}, | |
] | |
return tokenizer.apply_chat_template(conversation=messages, add_generation_prompt=True, tokenize=False) | |
def chat(message = None, history = [], args = {}): | |
global tokenizer, model, cfg | |
prompt = apply_template(message, history, args) | |
inputs = tokenizer(prompt, return_tensors="pt", | |
padding=True, max_length=cfg['max_length'], truncation=True).to("cuda") | |
streamer = TextIteratorStreamer( | |
tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True, | |
) | |
generate_kwargs = dict( | |
inputs, | |
do_sample=True, | |
streamer=streamer, | |
num_beams=1, | |
) | |
for k in [ | |
'max_new_tokens', | |
'temperature', | |
'top_p', | |
'top_k', | |
'repetition_penalty' | |
]: | |
if cfg[k]: | |
generate_kwargs[k] = cfg[k] | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
model_output = "" | |
for new_text in streamer: | |
model_output += new_text | |
if 'fastapi' in args: | |
# fastapiは差分だけを返して欲しい | |
yield new_text | |
else: | |
# gradioは常に全文を返して欲しい | |
yield model_output | |
def infer(message = None, history = [], args = {}): | |
global tokenizer, model, cfg | |
prompt = apply_template(message, history, args) | |
inputs = tokenizer(prompt, return_tensors="pt", | |
padding=True, max_length=cfg['max_length'], truncation=True).to("cuda") | |
generate_kwargs = dict( | |
inputs, | |
do_sample=True, | |
num_beams=1, | |
use_cache=True, | |
) | |
for k in [ | |
'max_new_tokens', | |
'temperature', | |
'top_p', | |
'top_k', | |
'repetition_penalty' | |
]: | |
if cfg[k]: | |
generate_kwargs[k] = cfg[k] | |
output_ids = model.generate(**generate_kwargs) | |
return tokenizer.decode(output_ids.tolist()[0][inputs['input_ids'].size(1):], skip_special_tokens=True) | |
def numel(message = None, history = [], args = {}): | |
global tokenizer, model, cfg | |
prompt = apply_template(message, history, args) | |
model_inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
return torch.numel(model_inputs['input_ids']) | |
load_model(cfg['model_name'], '4bit') | |