Spaces:
Sleeping
Sleeping
import os | |
import json | |
import random | |
import uuid | |
from flask import Flask, request, jsonify, session, render_template | |
from flask_cors import CORS | |
from datetime import datetime | |
from elo_rank import EloRank | |
app = Flask(__name__) | |
CORS(app, supports_credentials=True) | |
app.secret_key = 'supersecretkey' | |
app.config['SESSION_COOKIE_SECURE'] = False # 开发环境中关闭 HTTPS 限制 | |
app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止 JavaScript 访问 Cookie | |
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # 确保跨站请求时 Cookie 被正确传递 | |
base_dir = os.path.dirname(os.path.abspath(__file__)) | |
DATA_DIR = os.path.join(base_dir, '/app/data') | |
RESULTS_DIR = os.path.join(base_dir, '/app/results') | |
# 实例化 EloRank 系统 | |
elo_rank_system = EloRank() | |
# 初始化 Elo 排名的模型 | |
models = ['output_path_4o', 'output_path_miniomni', 'output_path_speechgpt', 'output_path_funaudio', 'output_path_4o_cascade', 'output_path_4o_llama_omni'] | |
for model in models: | |
elo_rank_system.add_model(model) | |
import os | |
def print_directory_structure(start_path, indent=''): | |
for item in os.listdir(start_path): | |
item_path = os.path.join(start_path, item) | |
if os.path.isdir(item_path): | |
print(f"{indent}📁 {item}/") | |
print_directory_structure(item_path, indent + ' ') | |
else: | |
print(f"{indent}📄 {item}") | |
def load_test_data(task): | |
"""Load the JSON file corresponding to the selected task""" | |
# 调用函数,打印当前目录结构 | |
with open('/app/test_text.txt', 'r') as file: | |
content = file.read() | |
print(content) | |
# with open(os.path.join(DATA_DIR, f"{task}.json"), "r", encoding='utf-8') as f: | |
# test_data = json.load(f) | |
try: | |
with open(os.path.join(DATA_DIR, f"{task}.json"), "r", encoding='utf-8') as f: | |
test_data = json.load(f) | |
except FileNotFoundError: | |
return jsonify({"message": "Test data file not found"}), 400 | |
# 更新音频路径,将它们指向 Flask 静态文件夹 | |
for item in test_data: | |
item['input_path'] = f"/app/static/audio{item['input_path']}" | |
item['output_path_4o'] = f"/app/static/audio{item['output_path_4o']}" | |
item['output_path_miniomni'] = f"/app/static/audio{item['output_path_miniomni']}" | |
item['output_path_speechgpt'] = f"/app/static/audio{item['output_path_speechgpt']}" | |
item['output_path_funaudio'] = f"/app/static/audio{item['output_path_funaudio']}" | |
item['output_path_4o_cascade'] = f"/app/static/audio{item['output_path_4o_cascade']}" | |
item['output_path_4o_llama_omni'] = f"/app/static/audio{item['output_path_4o_llama_omni']}" | |
return test_data | |
def save_result(task, username, result_data, session_id): | |
"""Save user's result in a separate file""" | |
file_path = os.path.join(RESULTS_DIR, f"{task}_{username}_{session_id}.jsonl") | |
# 获取所有模型的 Elo 分数 | |
elo_scores = {model: elo_rank_system.get_rating(model) for model in models} | |
# 添加 Elo 分数和时间戳到结果数据 | |
result_data['elo_scores'] = elo_scores | |
result_data['timestamp'] = datetime.now().isoformat() | |
with open(file_path, "a", encoding='utf-8') as f: | |
f.write(json.dumps(result_data) + "\n") | |
def start_test(): | |
"""Initiate the test for a user with the selected task""" | |
data = request.json | |
task = data['task'] | |
username = data['username'] | |
# Load the test data | |
test_data = load_test_data(task) | |
if not test_data: | |
return jsonify({"message": "No test data available"}), 400 | |
# Shuffle test data for the user | |
random.shuffle(test_data) | |
# Generate a unique session ID (for example using uuid) | |
session_id = str(uuid.uuid4()) | |
# Store in session | |
session['task'] = task | |
session['username'] = username | |
session['test_data'] = test_data | |
session['current_index'] = 0 | |
session['session_id'] = session_id # Store the session ID in the session | |
task_description = test_data[0].get('task_description', '') | |
print(session) | |
return jsonify({"message": "Test started", "total_tests": len(test_data), "task_description": task_description}) | |
def next_test(): | |
"""Serve the next test item""" | |
# if 'current_index' not in session or 'test_data' not in session: | |
# return jsonify({"message": "No active test found"}), 400 | |
if 'current_index' not in session or 'test_data' not in session: | |
# 转换 session 为字典避免序列化错误 | |
print("Debug Session:", dict(session)) # 调试信息 | |
return jsonify({"message": "Session data missing"}), 400 | |
current_index = session['current_index'] | |
test_data = session['test_data'] | |
if current_index >= len(test_data): | |
# Return the "Test completed" message when all tests are done | |
return jsonify({"message": "Test completed"}), 200 | |
# 使用 EloRank 的 sample_next_match 来选择两款模型 | |
selected_models = elo_rank_system.sample_next_match() | |
if not selected_models or len(selected_models) != 2: | |
return jsonify({"message": "Error selecting models"}), 500 | |
# Serve test data with the two selected models | |
current_test = test_data[current_index] | |
session['selected_models'] = selected_models | |
session['current_index'] += 1 | |
return jsonify({ | |
"text": current_test["text"], | |
"input_path": current_test["input_path"], | |
"model_a": selected_models[0], | |
"model_b": selected_models[1], | |
"audio_a": current_test[selected_models[0]], | |
"audio_b": current_test[selected_models[1]] | |
}) | |
def submit_result(): | |
"""Submit the user's result and save it""" | |
data = request.json | |
chosen_model = data['chosen_model'] | |
username = session.get('username') | |
task = session.get('task') | |
current_index = session.get('current_index') - 1 # Subtract since we increment after serving | |
session_id = session.get('session_id') # Get the session ID | |
if not username or not task or current_index < 0: | |
return jsonify({"message": "No active test found"}), 400 | |
# Retrieve the selected models | |
selected_models = session['selected_models'] | |
model_a = selected_models[0] | |
model_b = selected_models[1] | |
result = { | |
"name": username, | |
"chosen_model": chosen_model, | |
"model_a": model_a, | |
"model_b": model_b, | |
"result": { | |
model_a: 1 if chosen_model == 'A' else 0, | |
model_b: 1 if chosen_model == 'B' else 0 | |
} | |
} | |
# Save the result for the current test using session_id to avoid filename conflict | |
test_data = session['test_data'][current_index] | |
result_data = {**test_data, **result} | |
save_result(task, username, result_data, session_id) | |
# 更新 Elo 排名系统 | |
if chosen_model == 'A': | |
elo_rank_system.record_match(model_a, model_b) | |
else: | |
elo_rank_system.record_match(model_b, model_a) | |
return jsonify({"message": "Result submitted", "model_a": model_a, "model_b": model_b, "chosen_model": chosen_model}) | |
def end_test(): | |
"""End the test session""" | |
session.clear() | |
return jsonify({"message": "Test completed"}) | |
# 渲染index.html页面 | |
def index(): | |
return render_template('index.html') | |
if __name__ == '__main__': | |
if not os.path.exists(RESULTS_DIR): | |
os.makedirs(RESULTS_DIR) | |
# 允许局域网访问 | |
app.run(host="0.0.0.0", debug=True, port=8080) | |