|
from typing import List, Union, Optional, Literal |
|
import dataclasses |
|
|
|
from tenacity import ( |
|
retry, |
|
stop_after_attempt, |
|
wait_random_exponential, |
|
) |
|
import openai |
|
import requests |
|
import json |
|
import os |
|
from groq import Groq |
|
|
|
MessageRole = Literal["system", "user", "assistant"] |
|
|
|
|
|
@dataclasses.dataclass() |
|
class Message(): |
|
role: MessageRole |
|
content: str |
|
|
|
|
|
def message_to_str(message: Message) -> str: |
|
return f"{message.role}: {message.content}" |
|
|
|
|
|
def messages_to_str(messages: List[Message]) -> str: |
|
return "\n".join([message_to_str(message) for message in messages]) |
|
|
|
|
|
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) |
|
def gpt_completion( |
|
model: str, |
|
prompt: str, |
|
max_tokens: int = 1024, |
|
stop_strs: Optional[List[str]] = None, |
|
temperature: float = 0.0, |
|
num_comps=1, |
|
) -> Union[List[str], str]: |
|
response = openai.Completion.create( |
|
model=model, |
|
prompt=prompt, |
|
temperature=temperature, |
|
max_tokens=max_tokens, |
|
top_p=1, |
|
frequency_penalty=0.0, |
|
presence_penalty=0.0, |
|
stop=stop_strs, |
|
n=num_comps, |
|
) |
|
if num_comps == 1: |
|
return response.choices[0].text |
|
|
|
return [choice.text for choice in response.choices] |
|
|
|
|
|
@retry(wait=wait_random_exponential(min=1, max=180), stop=stop_after_attempt(6)) |
|
def gpt_chat( |
|
model: str, |
|
messages: List[Message], |
|
max_tokens: int = 1024, |
|
temperature: float = 0.0, |
|
num_comps=1, |
|
) -> Union[List[str], str]: |
|
response = openai.ChatCompletion.create( |
|
model=model, |
|
messages=[dataclasses.asdict(message) for message in messages], |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=1, |
|
frequency_penalty=0.0, |
|
presence_penalty=0.0, |
|
n=num_comps, |
|
) |
|
if num_comps == 1: |
|
return response.choices[0].message.content |
|
print("temp", temperature) |
|
return [choice.message.content for choice in response.choices] |
|
|
|
|
|
class ModelBase(): |
|
def __init__(self, name: str): |
|
self.name = name |
|
self.is_chat = False |
|
|
|
def __repr__(self) -> str: |
|
return f'{self.name}' |
|
|
|
def generate_chat(self, messages: List[Message], max_tokens: int = 1024, temperature: float = 0.2, num_comps: int = 1) -> Union[List[str], str]: |
|
raise NotImplementedError |
|
|
|
def generate(self, prompt: str, max_tokens: int = 1024, stop_strs: Optional[List[str]] = None, temperature: float = 0.0, num_comps=1) -> Union[List[str], str]: |
|
raise NotImplementedError |
|
|
|
class GroqBase(): |
|
def __init__(self): |
|
self.is_chat = True |
|
self.client = Groq( |
|
api_key=os.environ.get("GROQ_API_KEY"), |
|
) |
|
|
|
def generate_chat(self, messages: List[Message], max_tokens: int = 1024, temperature: float = 0.2, num_comps: int = 1) -> Union[List[str], str]: |
|
resps = [] |
|
for i in range(num_comps): |
|
chat_completion = self.client.chat.completions.create( |
|
messages=[dataclasses.asdict(message) for message in messages], |
|
model="llama3-8b-8192", |
|
) |
|
|
|
response_text = chat_completion.choices[0].message.content |
|
resps.append(response_text) |
|
|
|
if num_comps == 1: |
|
return resps[0] |
|
else: |
|
return resps |
|
|
|
|
|
class Samba(): |
|
def __init__(self): |
|
self.is_chat = True |
|
|
|
def generate_chat(self, messages: List[Message], max_tokens: int = 1024, temperature: float = 0.2, num_comps: int = 1) -> Union[List[str], str]: |
|
resps = [] |
|
|
|
for i in range(num_comps): |
|
payload = { |
|
"inputs": messages, |
|
"params": { |
|
"max_tokens_allowed_in_completion": {"type": "int", "value": 500}, |
|
"min_token_capacity_for_completion": {"type": "int", "value": 2}, |
|
"skip_special_token": {"type": "bool", "value": True}, |
|
"stop_sequences": {"type": "list", "value": ["[INST]", "[INST]", "[/INST]", "[/INST]"]} |
|
}, |
|
"model": "llama3-8b" |
|
} |
|
url = "kjddazcq2e2wzvzv.snova.ai" |
|
key = "bGlnaHRuaW5nOlUyM3pMcFlHY3dmVzRzUGFy" |
|
|
|
headers = { |
|
"Authorization": f"Basic {key}", |
|
"Content-Type": "application/json" |
|
} |
|
post_response = requests.post(f'https://{url}/api/v1/chat/completion', json=payload, headers=headers, stream=True) |
|
|
|
response_text = "" |
|
for line in post_response.iter_lines(): |
|
if line.startswith(b"data: "): |
|
data_str = line.decode('utf-8')[6:] |
|
try: |
|
line_json = json.loads(data_str) |
|
content = line_json['0'].get("stream_token", "") |
|
if content: |
|
response_text += content |
|
except json.JSONDecodeError as e: |
|
pass |
|
|
|
if num_comps == 1: |
|
return resps[0] |
|
else: |
|
return resps |
|
|
|
class GPTChat(ModelBase): |
|
def __init__(self, model_name: str): |
|
self.name = model_name |
|
self.is_chat = True |
|
|
|
def generate_chat(self, messages: List[Message], max_tokens: int = 1024, temperature: float = 0.2, num_comps: int = 1) -> Union[List[str], str]: |
|
return gpt_chat(self.name, messages, max_tokens, temperature, num_comps) |
|
|
|
|
|
class GPT4(GPTChat): |
|
def __init__(self): |
|
super().__init__("gpt-4") |
|
|
|
class GPT4o(GPTChat): |
|
def __init__(self): |
|
super().__init__("gpt-4o") |
|
|
|
class GPT35(GPTChat): |
|
def __init__(self): |
|
super().__init__("gpt-3.5-turbo") |
|
|
|
|
|
class GPTDavinci(ModelBase): |
|
def __init__(self, model_name: str): |
|
self.name = model_name |
|
|
|
def generate(self, prompt: str, max_tokens: int = 1024, stop_strs: Optional[List[str]] = None, temperature: float = 0, num_comps=1) -> Union[List[str], str]: |
|
return gpt_completion(self.name, prompt, max_tokens, stop_strs, temperature, num_comps) |
|
|
|
|
|
class HFModelBase(ModelBase): |
|
""" |
|
Base for huggingface chat models |
|
""" |
|
|
|
def __init__(self, model_name: str, model, tokenizer, eos_token_id=None): |
|
self.name = model_name |
|
self.model = model |
|
self.tokenizer = tokenizer |
|
self.eos_token_id = eos_token_id if eos_token_id is not None else self.tokenizer.eos_token_id |
|
self.is_chat = True |
|
|
|
def generate_chat(self, messages: List[Message], max_tokens: int = 1024, temperature: float = 0.2, num_comps: int = 1) -> Union[List[str], str]: |
|
|
|
if temperature < 0.0001: |
|
temperature = 0.0001 |
|
|
|
prompt = self.prepare_prompt(messages) |
|
|
|
outputs = self.model.generate( |
|
prompt, |
|
max_new_tokens=min( |
|
max_tokens, self.model.config.max_position_embeddings), |
|
use_cache=True, |
|
do_sample=True, |
|
temperature=temperature, |
|
top_p=0.95, |
|
eos_token_id=self.eos_token_id, |
|
num_return_sequences=num_comps, |
|
) |
|
|
|
outs = self.tokenizer.batch_decode(outputs, skip_special_tokens=False) |
|
assert isinstance(outs, list) |
|
for i, out in enumerate(outs): |
|
assert isinstance(out, str) |
|
outs[i] = self.extract_output(out) |
|
|
|
if len(outs) == 1: |
|
return outs[0] |
|
else: |
|
return outs |
|
|
|
def prepare_prompt(self, messages: List[Message]): |
|
raise NotImplementedError |
|
|
|
def extract_output(self, output: str) -> str: |
|
raise NotImplementedError |
|
|
|
|
|
|
|
class StarChat(HFModelBase): |
|
def __init__(self): |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
model = AutoModelForCausalLM.from_pretrained( |
|
"HuggingFaceH4/starchat-beta", |
|
torch_dtype=torch.bfloat16, |
|
device_map="auto", |
|
) |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
"HuggingFaceH4/starchat-beta", |
|
) |
|
super().__init__("starchat", model, tokenizer, eos_token_id=49155) |
|
|
|
def prepare_prompt(self, messages: List[Message]): |
|
prompt = "" |
|
for i, message in enumerate(messages): |
|
prompt += f"<|{message.role}|>\n{message.content}\n<|end|>\n" |
|
if i == len(messages) - 1: |
|
prompt += "<|assistant|>\n" |
|
|
|
return self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device) |
|
|
|
def extract_output(self, output: str) -> str: |
|
out = output.split("<|assistant|>")[1] |
|
if out.endswith("<|end|>"): |
|
out = out[:-len("<|end|>")] |
|
|
|
return out |
|
|
|
|
|
class CodeLlama(HFModelBase): |
|
B_INST, E_INST = "[INST]", "[/INST]" |
|
B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n" |
|
|
|
DEFAULT_SYSTEM_PROMPT = """\ |
|
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature. |
|
|
|
If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information.""" |
|
|
|
def __init__(self, version: Literal["34b", "13b", "7b"] = "34b"): |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
f"codellama/CodeLlama-{version}-Instruct-hf", |
|
add_eos_token=True, |
|
add_bos_token=True, |
|
padding_side='left' |
|
) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
f"codellama/CodeLlama-{version}-Instruct-hf", |
|
torch_dtype=torch.bfloat16, |
|
device_map="auto", |
|
) |
|
super().__init__("codellama", model, tokenizer) |
|
|
|
def prepare_prompt(self, messages: List[Message]): |
|
if messages[0].role != "system": |
|
messages = [ |
|
Message(role="system", content=self.DEFAULT_SYSTEM_PROMPT) |
|
] + messages |
|
messages = [ |
|
Message(role=messages[1].role, content=self.B_SYS + |
|
messages[0].content + self.E_SYS + messages[1].content) |
|
] + messages[2:] |
|
assert all([msg.role == "user" for msg in messages[::2]]) and all( |
|
[msg.role == "assistant" for msg in messages[1::2]] |
|
), ( |
|
"model only supports 'system', 'user' and 'assistant' roles, " |
|
"starting with 'system', then 'user' and alternating (u/a/u/a/u...)" |
|
) |
|
messages_tokens: List[int] = sum( |
|
[ |
|
self.tokenizer.encode( |
|
f"{self.B_INST} {(prompt.content).strip()} {self.E_INST} {(answer.content).strip()} ", |
|
) |
|
for prompt, answer in zip( |
|
messages[::2], |
|
messages[1::2], |
|
) |
|
], |
|
[], |
|
) |
|
assert messages[-1].role == "user", f"Last message must be from user, got {messages[-1].role}" |
|
messages_tokens += self.tokenizer.encode( |
|
f"{self.B_INST} {(messages[-1].content).strip()} {self.E_INST}", |
|
) |
|
|
|
messages_tokens = messages_tokens[:-1] |
|
import torch |
|
return torch.tensor([messages_tokens]).to(self.model.device) |
|
|
|
def extract_output(self, output: str) -> str: |
|
out = output.split("[/INST]")[-1].split("</s>")[0].strip() |
|
return out |
|
|