import os import torch import json import gc import time from unsloth import FastLanguageModel from transformers import TextIteratorStreamer from threading import Thread os.environ["TOKENIZERS_PARALLELISM"] = "false" tokenizer = None model = None default_cfg = { 'model_name': "unsloth/gemma-2-9b-it-bnb-4bit", 'dtype': None, 'instruction': None, 'inst_template': None, 'chat_template': None, 'max_length': 2400, 'max_seq_length': 2048, 'max_new_tokens': 512, 'temperature': 0.9, 'top_p': 0.95, 'top_k': 40, 'repetition_penalty': 1.2, } cfg = default_cfg.copy() def load_model(model_name, dtype): global tokenizer, model, cfg if cfg['model_name'] == model_name and cfg['dtype'] == dtype: return del model del tokenizer model = None tokenizer = None gc.collect() torch.cuda.empty_cache() model, tokenizer = FastLanguageModel.from_pretrained( model_name, max_seq_length = cfg['max_seq_length'], dtype = torch.bfloat16, load_in_8bit = (dtype == '8bit'), load_in_4bit = (dtype == '4bit'), ) FastLanguageModel.for_inference(model) cfg['model_name'] = model_name cfg['dtype'] = dtype def clear_config(): global cfg cfg = default_cfg.copy() def set_config(model_name, dtype, instruction, inst_template, chat_template, max_new_tokens, temperature, top_p, top_k, repetition_penalty): global cfg load_model(model_name, dtype) cfg.update({ 'instruction': instruction, 'inst_template': inst_template, 'chat_template': chat_template, 'max_new_tokens': int(max_new_tokens), 'temperature': float(temperature), 'top_p': float(top_p), 'top_k': int(top_k), 'repetition_penalty': float(repetition_penalty), }) return 'done.' def set_config_args(args): global cfg load_model(args['model_name'], args['dtype']) cfg.update(args) return 'done.' def chatinterface_to_messages(message, history): global cfg messages = [] if cfg['instruction']: messages.append({'role': 'user', 'content': cfg['instruction']}) messages.append({'role': 'assistant', 'content': 'I understand.'}) for pair in history: [user, assistant] = pair if user: messages.append({'role': 'user', 'content': user}) if assistant: messages.append({'role': 'assistant', 'content': assistant}) if message: messages.append({'role': 'user', 'content': message}) return messages def apply_template(message, history, args): global tokenizer, cfg if 'input' in args: message = args['input'] if 'instruction' in args: cfg['instruction'] = args['instruction'] if 'messages' in args: messages = args['messages'] elif history: messages = chatinterface_to_messages(message, history) else: messages = {} if cfg['chat_template']: tokenizer.chat_template = cfg['chat_template'] if message: if cfg['inst_template']: return cfg['inst_template'].format(instruction=cfg['instruction'], input=message) if cfg['instruction']: messages = [ {'role': 'user', 'content': cfg['instruction']}, {'role': 'assistant', 'content': 'I understand.'}, {'role': 'user', 'content': message}, ] else: messages = [ {'role': 'user', 'content': message}, ] return tokenizer.apply_chat_template(conversation=messages, add_generation_prompt=True, tokenize=False) def chat(message = None, history = [], args = {}): global tokenizer, model, cfg prompt = apply_template(message, history, args) inputs = tokenizer(prompt, return_tensors="pt", padding=True, max_length=cfg['max_length'], truncation=True).to("cuda") streamer = TextIteratorStreamer( tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True, ) generate_kwargs = dict( inputs, do_sample=True, streamer=streamer, num_beams=1, ) for k in [ 'max_new_tokens', 'temperature', 'top_p', 'top_k', 'repetition_penalty' ]: if cfg[k]: generate_kwargs[k] = cfg[k] t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() model_output = "" for new_text in streamer: model_output += new_text if 'fastapi' in args: # fastapiは差分だけを返して欲しい yield new_text else: # gradioは常に全文を返して欲しい yield model_output def infer(message = None, history = [], args = {}): global tokenizer, model, cfg prompt = apply_template(message, history, args) inputs = tokenizer(prompt, return_tensors="pt", padding=True, max_length=cfg['max_length'], truncation=True).to("cuda") generate_kwargs = dict( inputs, do_sample=True, num_beams=1, use_cache=True, ) for k in [ 'max_new_tokens', 'temperature', 'top_p', 'top_k', 'repetition_penalty' ]: if cfg[k]: generate_kwargs[k] = cfg[k] output_ids = model.generate(**generate_kwargs) return tokenizer.decode(output_ids.tolist()[0][inputs['input_ids'].size(1):], skip_special_tokens=True) def numel(message = None, history = [], args = {}): global tokenizer, model, cfg prompt = apply_template(message, history, args) model_inputs = tokenizer(prompt, return_tensors="pt").to(model.device) return torch.numel(model_inputs['input_ids']) load_model(cfg['model_name'], '4bit')