from threading import Thread from typing import Iterator, List, Tuple import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer model_id = 'meta-llama/Llama-2-7b-chat-hf' # model_id = 'codellama/CodeLlama-13b-Instruct-hf' if torch.cuda.is_available(): model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.float16, device_map='auto', cache_dir='models', token="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" ) else: model = None tokenizer = AutoTokenizer.from_pretrained(model_id, cache_dir='models') def get_prompt(message: str, chat_history: List[Tuple[str, str]], system_prompt: str) -> str: texts = [f'[INST] <>\n{system_prompt}\n<>\n\n'] # The first user input is _not_ stripped do_strip = False for user_input, response in chat_history: user_input = user_input.strip() if do_strip else user_input do_strip = True texts.append(f'{user_input} [/INST] {response.strip()} [INST] ') message = message.strip() if do_strip else message texts.append(f'{message} [/INST]') return ''.join(texts) def get_input_token_length(message: str, chat_history: List[Tuple[str, str]], system_prompt: str) -> int: prompt = get_prompt(message, chat_history, system_prompt) input_ids = tokenizer([prompt], return_tensors='np', add_special_tokens=False)['input_ids'] return input_ids.shape[-1] def run(message: str, chat_history: List[Tuple[str, str]], system_prompt: str, max_new_tokens: int = 1024, temperature: float = 0.8, top_p: float = 0.95, top_k: int = 50) -> Iterator[str]: prompt = get_prompt(message, chat_history, system_prompt) inputs = tokenizer([prompt], return_tensors='pt', add_special_tokens=False).to('cuda') streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( inputs, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, num_beams=1, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield ''.join(outputs)