import os import json import random import uuid from flask import Flask, request, jsonify, session, render_template from flask_cors import CORS from datetime import datetime from elo_rank import EloRank app = Flask(__name__) CORS(app) app.secret_key = 'supersecretkey' DATA_DIR = './data' RESULTS_DIR = './results' # 实例化 EloRank 系统 elo_rank_system = EloRank() # 初始化 Elo 排名的模型 models = ['output_path_4o', 'output_path_miniomni', 'output_path_speechgpt', 'output_path_funaudio', 'output_path_4o_cascade', 'output_path_4o_llama_omni'] for model in models: elo_rank_system.add_model(model) def load_test_data(task): """Load the JSON file corresponding to the selected task""" with open(os.path.join(DATA_DIR, f"{task}.json"), "r", encoding='utf-8') as f: test_data = json.load(f) # 更新音频路径,将它们指向 Flask 静态文件夹 for item in test_data: item['input_path'] = f"/static/audio{item['input_path']}" item['output_path_4o'] = f"/static/audio{item['output_path_4o']}" item['output_path_miniomni'] = f"/static/audio{item['output_path_miniomni']}" item['output_path_speechgpt'] = f"/static/audio{item['output_path_speechgpt']}" item['output_path_funaudio'] = f"/static/audio{item['output_path_funaudio']}" item['output_path_4o_cascade'] = f"/static/audio{item['output_path_4o_cascade']}" item['output_path_4o_llama_omni'] = f"/static/audio{item['output_path_4o_llama_omni']}" return test_data def save_result(task, username, result_data, session_id): """Save user's result in a separate file""" file_path = os.path.join(RESULTS_DIR, f"{task}_{username}_{session_id}.jsonl") # 获取所有模型的 Elo 分数 elo_scores = {model: elo_rank_system.get_rating(model) for model in models} # 添加 Elo 分数和时间戳到结果数据 result_data['elo_scores'] = elo_scores result_data['timestamp'] = datetime.now().isoformat() with open(file_path, "a", encoding='utf-8') as f: f.write(json.dumps(result_data) + "\n") @app.route('/start_test', methods=['POST']) def start_test(): """Initiate the test for a user with the selected task""" data = request.json task = data['task'] username = data['username'] # Load the test data test_data = load_test_data(task) # Shuffle test data for the user random.shuffle(test_data) # Generate a unique session ID (for example using uuid) session_id = str(uuid.uuid4()) # Store in session session['task'] = task session['username'] = username session['test_data'] = test_data session['current_index'] = 0 session['session_id'] = session_id # Store the session ID in the session task_description = test_data[0].get('task_description', '') return jsonify({"message": "Test started", "total_tests": len(test_data), "task_description": task_description}) @app.route('/next_test', methods=['GET']) def next_test(): """Serve the next test item""" if 'current_index' not in session or 'test_data' not in session: return jsonify({"message": "No active test found"}), 400 current_index = session['current_index'] test_data = session['test_data'] if current_index >= len(test_data): # Return the "Test completed" message when all tests are done return jsonify({"message": "Test completed"}), 200 # 使用 EloRank 的 sample_next_match 来选择两款模型 selected_models = elo_rank_system.sample_next_match() # Serve test data with the two selected models current_test = test_data[current_index] session['selected_models'] = selected_models session['current_index'] += 1 return jsonify({ "text": current_test["text"], "input_path": current_test["input_path"], "model_a": selected_models[0], "model_b": selected_models[1], "audio_a": current_test[selected_models[0]], "audio_b": current_test[selected_models[1]] }) @app.route('/submit_result', methods=['POST']) def submit_result(): """Submit the user's result and save it""" data = request.json chosen_model = data['chosen_model'] username = session.get('username') task = session.get('task') current_index = session.get('current_index') - 1 # Subtract since we increment after serving session_id = session.get('session_id') # Get the session ID if not username or not task or current_index < 0: return jsonify({"message": "No active test found"}), 400 # Retrieve the selected models selected_models = session['selected_models'] model_a = selected_models[0] model_b = selected_models[1] result = { "name": username, "chosen_model": chosen_model, "model_a": model_a, "model_b": model_b, "result": { model_a: 1 if chosen_model == 'A' else 0, model_b: 1 if chosen_model == 'B' else 0 } } # Save the result for the current test using session_id to avoid filename conflict test_data = session['test_data'][current_index] result_data = {**test_data, **result} save_result(task, username, result_data, session_id) # 更新 Elo 排名系统 if chosen_model == 'A': elo_rank_system.record_match(model_a, model_b) else: elo_rank_system.record_match(model_b, model_a) return jsonify({"message": "Result submitted", "model_a": model_a, "model_b": model_b, "chosen_model": chosen_model}) @app.route('/end_test', methods=['GET']) def end_test(): """End the test session""" session.clear() return jsonify({"message": "Test completed"}) # 渲染index.html页面 @app.route('/') def index(): return render_template('index.html') if __name__ == '__main__': if not os.path.exists(RESULTS_DIR): os.makedirs(RESULTS_DIR) # 允许局域网访问 app.run(debug=True, host="0.0.0.0", port=8080)