Spaces:
Running
Running
import copy | |
import json | |
import re | |
import tiktoken | |
import uuid | |
from curl_cffi import requests | |
from tclogger import logger | |
from constants.envs import PROXIES | |
from constants.headers import OPENAI_GET_HEADERS, OPENAI_POST_DATA | |
from constants.models import TOKEN_LIMIT_MAP, TOKEN_RESERVED | |
from messagers.message_outputer import OpenaiStreamOutputer | |
from networks.proof_worker import ProofWorker | |
class OpenaiRequester: | |
def __init__(self): | |
self.init_requests_params() | |
def init_requests_params(self): | |
self.api_base = "https://chat.openai.com/backend-anon" | |
self.api_me = f"{self.api_base}/me" | |
self.api_models = f"{self.api_base}/models" | |
self.api_chat_requirements = f"{self.api_base}/sentinel/chat-requirements" | |
self.api_conversation = f"{self.api_base}/conversation" | |
self.uuid = str(uuid.uuid4()) | |
self.requests_headers = copy.deepcopy(OPENAI_GET_HEADERS) | |
extra_headers = { | |
"Oai-Device-Id": self.uuid, | |
} | |
self.requests_headers.update(extra_headers) | |
def log_request(self, url, method="GET"): | |
logger.note(f"> {method}:", end=" ") | |
logger.mesg(f"{url}", end=" ") | |
def log_response( | |
self, res: requests.Response, stream=False, iter_lines=False, verbose=False | |
): | |
status_code = res.status_code | |
status_code_str = f"[{status_code}]" | |
if status_code == 200: | |
logger_func = logger.success | |
else: | |
logger_func = logger.warn | |
logger_func(status_code_str) | |
logger.enter_quiet(not verbose) | |
if stream: | |
if not iter_lines: | |
return | |
if not hasattr(self, "content_offset"): | |
self.content_offset = 0 | |
for line in res.iter_lines(): | |
line = line.decode("utf-8") | |
line = re.sub(r"^data:\s*", "", line) | |
if re.match(r"^\[DONE\]", line): | |
logger.success("\n[Finished]") | |
break | |
line = line.strip() | |
if line: | |
try: | |
data = json.loads(line, strict=False) | |
message_role = data["message"]["author"]["role"] | |
message_status = data["message"]["status"] | |
if ( | |
message_role == "assistant" | |
and message_status == "in_progress" | |
): | |
content = data["message"]["content"]["parts"][0] | |
delta_content = content[self.content_offset :] | |
self.content_offset = len(content) | |
logger_func(delta_content, end="") | |
except Exception as e: | |
logger.warn(e) | |
else: | |
logger_func(res.json()) | |
logger.exit_quiet(not verbose) | |
def get_models(self): | |
self.log_request(self.api_models) | |
res = requests.get( | |
self.api_models, | |
headers=self.requests_headers, | |
proxies=PROXIES, | |
timeout=10, | |
impersonate="chrome120", | |
) | |
self.log_response(res) | |
def auth(self): | |
self.log_request(self.api_chat_requirements, method="POST") | |
res = requests.post( | |
self.api_chat_requirements, | |
headers=self.requests_headers, | |
proxies=PROXIES, | |
timeout=10, | |
impersonate="chrome120", | |
) | |
data = res.json() | |
self.chat_requirements_token = data["token"] | |
self.chat_requirements_seed = data["proofofwork"]["seed"] | |
self.chat_requirements_difficulty = data["proofofwork"]["difficulty"] | |
self.log_response(res) | |
def transform_messages(self, messages: list[dict]): | |
def get_role(role): | |
if role in ["system", "user", "assistant"]: | |
return role | |
else: | |
return "system" | |
new_messages = [ | |
{ | |
"author": {"role": get_role(message["role"])}, | |
"content": {"content_type": "text", "parts": [message["content"]]}, | |
"metadata": {}, | |
} | |
for message in messages | |
] | |
return new_messages | |
def chat_completions(self, messages: list[dict], iter_lines=False, verbose=False): | |
proof_token = ProofWorker().calc_proof_token( | |
self.chat_requirements_seed, self.chat_requirements_difficulty | |
) | |
extra_headers = { | |
"Accept": "text/event-stream", | |
"Openai-Sentinel-Chat-Requirements-Token": self.chat_requirements_token, | |
"Openai-Sentinel-Proof-Token": proof_token, | |
} | |
requests_headers = copy.deepcopy(self.requests_headers) | |
requests_headers.update(extra_headers) | |
post_data = copy.deepcopy(OPENAI_POST_DATA) | |
extra_data = { | |
"messages": self.transform_messages(messages), | |
"websocket_request_id": str(uuid.uuid4()), | |
} | |
post_data.update(extra_data) | |
self.log_request(self.api_conversation, method="POST") | |
s = requests.Session() | |
res = s.post( | |
self.api_conversation, | |
headers=requests_headers, | |
json=post_data, | |
proxies=PROXIES, | |
timeout=10, | |
impersonate="chrome120", | |
stream=True, | |
) | |
self.log_response(res, stream=True, iter_lines=iter_lines, verbose=verbose) | |
return res | |
class OpenaiStreamer: | |
def __init__(self): | |
self.model = "gpt-3.5-turbo" | |
self.message_outputer = OpenaiStreamOutputer( | |
owned_by="openai", model="gpt-3.5-turbo" | |
) | |
self.tokenizer = tiktoken.get_encoding("cl100k_base") | |
def count_tokens(self, messages: list[dict]): | |
token_count = sum( | |
len(self.tokenizer.encode(message["content"])) for message in messages | |
) | |
logger.note(f"Prompt Token Count: {token_count}") | |
return token_count | |
def check_token_limit(self, messages: list[dict]): | |
token_limit = TOKEN_LIMIT_MAP[self.model] | |
token_count = self.count_tokens(messages) | |
token_redundancy = int(token_limit - TOKEN_RESERVED - token_count) | |
if token_redundancy <= 0: | |
raise ValueError( | |
f"Prompt exceeded token limit: {token_count} > {token_limit}" | |
) | |
return True | |
def chat_response(self, messages: list[dict], iter_lines=False, verbose=False): | |
self.check_token_limit(messages) | |
logger.enter_quiet(not verbose) | |
requester = OpenaiRequester() | |
requester.auth() | |
logger.exit_quiet(not verbose) | |
return requester.chat_completions( | |
messages=messages, iter_lines=iter_lines, verbose=verbose | |
) | |
def chat_return_generator(self, stream_response: requests.Response, verbose=False): | |
content_offset = 0 | |
is_finished = False | |
for line in stream_response.iter_lines(): | |
line = line.decode("utf-8") | |
line = re.sub(r"^data:\s*", "", line) | |
line = line.strip() | |
if not line: | |
continue | |
if re.match(r"^\[DONE\]", line): | |
content_type = "Finished" | |
delta_content = "" | |
logger.success("\n[Finished]") | |
is_finished = True | |
else: | |
content_type = "Completions" | |
delta_content = "" | |
try: | |
data = json.loads(line, strict=False) | |
message_role = data["message"]["author"]["role"] | |
message_status = data["message"]["status"] | |
if message_role == "assistant" and message_status == "in_progress": | |
content = data["message"]["content"]["parts"][0] | |
if not len(content): | |
continue | |
delta_content = content[content_offset:] | |
content_offset = len(content) | |
if verbose: | |
logger.success(delta_content, end="") | |
else: | |
continue | |
except Exception as e: | |
logger.warn(e) | |
output = self.message_outputer.output( | |
content=delta_content, content_type=content_type | |
) | |
yield output | |
if not is_finished: | |
yield self.message_outputer.output(content="", content_type="Finished") | |
def chat_return_dict(self, stream_response: requests.Response): | |
final_output = self.message_outputer.default_data.copy() | |
final_output["choices"] = [ | |
{ | |
"index": 0, | |
"finish_reason": "stop", | |
"message": {"role": "assistant", "content": ""}, | |
} | |
] | |
final_content = "" | |
for item in self.chat_return_generator(stream_response): | |
try: | |
data = json.loads(item) | |
delta = data["choices"][0]["delta"] | |
delta_content = delta.get("content", "") | |
if delta_content: | |
final_content += delta_content | |
except Exception as e: | |
logger.warn(e) | |
final_output["choices"][0]["message"]["content"] = final_content.strip() | |
return final_output | |
if __name__ == "__main__": | |
streamer = OpenaiStreamer() | |
messages = [ | |
{ | |
"role": "system", | |
"content": "You are an LLM developed by NiansuhAI.\nYour name is Niansuh-Copilot.", | |
}, | |
{"role": "user", "content": "Hello, what is your role?"}, | |
{"role": "assistant", "content": "I am an LLM."}, | |
{"role": "user", "content": "What is your name?"}, | |
] | |
streamer.chat_response(messages=messages, iter_lines=True, verbose=True) | |
# python -m networks.openai_streamer | |