Roleplay_LLM_Arena / useapi.py
zzc0208's picture
Update useapi.py
90968b7 verified
import asyncio
import httpx
import json
import requests
import math
import os
client = httpx.AsyncClient()
# 请求URL
recommand_base_url = "https://" + os.getenv("recommand_base_url")
chat_url = "https://" + os.getenv("chat_url")
model_url = "https://" + os.getenv("model_url")
character_url = "https://" + os.getenv("character_url")
avatar_url = "https://" + os.getenv("avatar_url")
image_url = "https://" + os.getenv("image_url")
auth = os.getenv("auth")
moment_url = os.getenv("moment_url")
#headers
def create_headers(language):
# 映射
language_mapping = {
'Chinese': 'zh',
'English': 'en',
'Japanese': 'ja',
'Korean': 'ko'
}
# 获取对应的语言代码,如果不存在则默认为 'zh'
language_code = language_mapping.get(language, 'zh')
return {
'X-Refresh-Token': '',
'X-Language': language_code,
'accept-language': '',
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Authorization': auth,
'Accept': '*/*',
'Connection': 'keep-alive',
'X-nsfw': '1'
}
def recommand_character(language):
response = requests.get(character_url, headers=create_headers(language))
json_data = response.json()
characters = [{
"name": item["name"],
"_id": item["_id"],
"avatar_url": str(avatar_url + item['_id'] + "_avatar.webp")
} for item in json_data['data']]
return characters
def id_to_avatar(char_id):
return str(avatar_url + char_id + "_avatar.webp")
#GET模型列表
def get_models():
class ModelStorage:
def __init__(self):
self.models = []
def add_models(self, models):
for model_info in models:
# 过滤掉 'gpt-4o' 和 'gpt-4o-mini'
if model_info['model'] not in ['mythomax-13b']:
if model_info['model'] in ['gemma-2-9b', 'llama-3.1-8b']:
weight = 12 # Assign a low weight to reduce their frequency
else:
weight = int(math.ceil(25 / (model_info['price'] + 0.5)))
self.models.extend([model_info['model']] * weight)
model_storage = ModelStorage()
# 从指定的 URL 获取 JSON 数据
response = requests.get(model_url)
if response.status_code == 200:
data = response.json()
# 添加模型到 self.models
model_storage.add_models(data['data'])
return model_storage.models
#解析推荐json
async def extract_recommand(data, language):
result = []
for item in data["data"]:
opening = await get_moment_opening(item["_id"], language)
result.append({
"character_id": item["character_id"],
"avatar_url": str(avatar_url + item["character_id"] + "_avatar.webp"),
"_id": item["_id"],
"image_url": str(image_url + item["_id"] + "_large.webp"),
"description": item["description"],
"name": item["title"],
"opening": opening
})
return result
async def get_moment_opening(moment_id, language):
url = f"{moment_url}{moment_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=create_headers(language))
if response.status_code == 200:
data = response.json()
return data['data']['opening']
return None
#请求推荐API
async def recommand(char_id, language):
recommand_url = str(recommand_base_url + char_id + "?num=20&offset=0")
async with httpx.AsyncClient() as client:
response = await client.get(recommand_url, headers=create_headers(language))
json_data = response.json()
return await extract_recommand(json_data, language)
async def fetch_stream(query, model, moment_id, session_id, bio, request_name, queue, language):
payload = {"query": query, "model": model, "bio": bio, "moment_id": moment_id}
if session_id:
payload["session_id"] = session_id
async with client.stream(
"POST", chat_url, json=payload, headers=create_headers(language)
) as response:
# 获取并返回 header
if response.status_code != 200:
await queue.put((request_name, "content", "Error Occur!"))
await queue.put((request_name, "end", None))
return
response_headers = dict(response.headers)
session_id = response_headers.get("x-session-id")
await queue.put((request_name, "header", response_headers))
# 流式处理响应内容
async for chunk in response.aiter_bytes():
await queue.put((request_name, "content", chunk.decode()))
# 标记流结束
await queue.put((request_name, "end", None))
return session_id
async def combine_streams(
query_a,
query_b,
model_a,
model_b,
moment_id_a,
moment_id_b,
session_id_a,
session_id_b,
bio_a,
bio_b,
language
):
queue = asyncio.Queue()
task_a = asyncio.create_task(
fetch_stream(
query_a, model_a, moment_id_a, session_id_a, bio_a, "requestA", queue, language
)
)
task_b = asyncio.create_task(
fetch_stream(
query_b, model_b, moment_id_b, session_id_b, bio_b, "requestB", queue, language
)
)
headers = {}
content = {"requestA": "", "requestB": ""}
active_streams = 2
while active_streams > 0:
request_name, data_type, data = await queue.get()
if data_type == "header":
headers[f"{request_name}_header"] = data
if len(headers) == 2:
yield headers
elif data_type == "content":
content[request_name] = data.strip()
if content["requestA"] or content["requestB"]:
yield content
content = {"requestA": "", "requestB": ""}
elif data_type == "end":
active_streams -= 1
session_id_a = await task_a
session_id_b = await task_b