import asyncio import httpx import json import requests import math import os client = httpx.AsyncClient() # 请求URL recommand_base_url = "https://" + os.getenv("recommand_base_url") chat_url = "https://" + os.getenv("chat_url") model_url = "https://" + os.getenv("model_url") character_url = "https://" + os.getenv("character_url") avatar_url = "https://" + os.getenv("avatar_url") image_url = "https://" + os.getenv("image_url") auth = os.getenv("auth") moment_url = os.getenv("moment_url") #headers def create_headers(language): # 映射 language_mapping = { 'Chinese': 'zh', 'English': 'en', 'Japanese': 'ja', 'Korean': 'ko' } # 获取对应的语言代码,如果不存在则默认为 'zh' language_code = language_mapping.get(language, 'zh') return { 'X-Refresh-Token': '', 'X-Language': language_code, 'accept-language': '', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Authorization': auth, 'Accept': '*/*', 'Connection': 'keep-alive', 'X-nsfw': '1' } def recommand_character(language): response = requests.get(character_url, headers=create_headers(language)) json_data = response.json() characters = [{ "name": item["name"], "_id": item["_id"], "avatar_url": str(avatar_url + item['_id'] + "_avatar.webp") } for item in json_data['data']] return characters def id_to_avatar(char_id): return str(avatar_url + char_id + "_avatar.webp") #GET模型列表 def get_models(): class ModelStorage: def __init__(self): self.models = [] def add_models(self, models): for model_info in models: # 过滤掉 'gpt-4o' 和 'gpt-4o-mini' if model_info['model'] not in ['mythomax-13b']: if model_info['model'] in ['gemma-2-9b', 'llama-3.1-8b']: weight = 12 # Assign a low weight to reduce their frequency else: weight = int(math.ceil(25 / (model_info['price'] + 0.5))) self.models.extend([model_info['model']] * weight) model_storage = ModelStorage() # 从指定的 URL 获取 JSON 数据 response = requests.get(model_url) if response.status_code == 200: data = response.json() # 添加模型到 self.models model_storage.add_models(data['data']) return model_storage.models #解析推荐json async def extract_recommand(data, language): result = [] for item in data["data"]: opening = await get_moment_opening(item["_id"], language) result.append({ "character_id": item["character_id"], "avatar_url": str(avatar_url + item["character_id"] + "_avatar.webp"), "_id": item["_id"], "image_url": str(image_url + item["_id"] + "_large.webp"), "description": item["description"], "name": item["title"], "opening": opening }) return result async def get_moment_opening(moment_id, language): url = f"{moment_url}{moment_id}" async with httpx.AsyncClient() as client: response = await client.get(url, headers=create_headers(language)) if response.status_code == 200: data = response.json() return data['data']['opening'] return None #请求推荐API async def recommand(char_id, language): recommand_url = str(recommand_base_url + char_id + "?num=20&offset=0") async with httpx.AsyncClient() as client: response = await client.get(recommand_url, headers=create_headers(language)) json_data = response.json() return await extract_recommand(json_data, language) async def fetch_stream(query, model, moment_id, session_id, bio, request_name, queue, language): payload = {"query": query, "model": model, "bio": bio, "moment_id": moment_id} if session_id: payload["session_id"] = session_id async with client.stream( "POST", chat_url, json=payload, headers=create_headers(language) ) as response: # 获取并返回 header if response.status_code != 200: await queue.put((request_name, "content", "Error Occur!")) await queue.put((request_name, "end", None)) return response_headers = dict(response.headers) session_id = response_headers.get("x-session-id") await queue.put((request_name, "header", response_headers)) # 流式处理响应内容 async for chunk in response.aiter_bytes(): await queue.put((request_name, "content", chunk.decode())) # 标记流结束 await queue.put((request_name, "end", None)) return session_id async def combine_streams( query_a, query_b, model_a, model_b, moment_id_a, moment_id_b, session_id_a, session_id_b, bio_a, bio_b, language ): queue = asyncio.Queue() task_a = asyncio.create_task( fetch_stream( query_a, model_a, moment_id_a, session_id_a, bio_a, "requestA", queue, language ) ) task_b = asyncio.create_task( fetch_stream( query_b, model_b, moment_id_b, session_id_b, bio_b, "requestB", queue, language ) ) headers = {} content = {"requestA": "", "requestB": ""} active_streams = 2 while active_streams > 0: request_name, data_type, data = await queue.get() if data_type == "header": headers[f"{request_name}_header"] = data if len(headers) == 2: yield headers elif data_type == "content": content[request_name] = data.strip() if content["requestA"] or content["requestB"]: yield content content = {"requestA": "", "requestB": ""} elif data_type == "end": active_streams -= 1 session_id_a = await task_a session_id_b = await task_b