import json import os import json from sklearn.metrics import accuracy_score, precision_recall_fscore_support import re from src.envs import EVAL_RESULTS_PATH def parse_first_word(answer): # Extract the first word and check if it's 'yes' or 'no' first_word = re.split(r'[\s,\.]', answer.lower())[0] if first_word.startswith('yes'): return 'yes' elif first_word.startswith('no'): return 'no' else: return None def compute_metrics(true_labels, predicted_labels): # Filtering out invalid answers valid_indices = [i for i, label in enumerate(predicted_labels) if label in ['yes', 'no']] filtered_true_labels = [true_labels[i] for i in valid_indices] filtered_predicted_labels = [predicted_labels[i] for i in valid_indices] # Calculating metrics accuracy = accuracy_score(filtered_true_labels, filtered_predicted_labels) precision, recall, f1_score, _ = precision_recall_fscore_support( filtered_true_labels, filtered_predicted_labels, average='binary', pos_label='yes') yes_ratio = filtered_predicted_labels.count('yes') / len(filtered_predicted_labels) if filtered_predicted_labels else 0 return { "Accuracy": accuracy, "Precision": precision, "Recall": recall, "F1 Score": f1_score, "Yes Ratio": yes_ratio } def aggregate_metrics(directory_path): metrics_data = {"random": {"true": [], "pred": [], "invalid": []}, "popular": {"true": [], "pred": [], "invalid": []}, "adversarial": {"true": [], "pred": [], "invalid": []}} # Process each file in the directory for filename in os.listdir(directory_path): if filename.endswith(".json"): file_path = os.path.join(directory_path, filename) with open(file_path, 'r') as f: data = json.load(f) question_type = filename.split('_')[0] if question_type in metrics_data: for entry in data[next(iter(data))]: first_word = parse_first_word(entry['predicted_answer']) if first_word is None: metrics_data[question_type]["invalid"].append(entry['predicted_answer']) metrics_data[question_type]["true"].append(entry['ground_truth_answer'].lower()) metrics_data[question_type]["pred"].append(first_word if first_word else entry['predicted_answer'].lower()) results = {} for q_type, data in metrics_data.items(): result = compute_metrics(data["true"], data["pred"]) result["Non-Binary Responses Count"] = len(data["invalid"]) result["Non-Binary Responses"] = data["invalid"] results[q_type] = result return results def transform_format(data, model_name): # Define the new format's base structure transformed_data = { "config": { "model_name": model_name }, "results": {} } # Mapping of old keys to new keys key_mapping = { "Accuracy": "accuracy", "Precision": "precision", "Recall": "recall", "F1 Score": "f1_score", "Yes Ratio": "yes_percentage" } # Iterate over each item in the original data for model_type, metrics in data.items(): for old_key, new_suffix in key_mapping.items(): # Format the new key according to the required format 2 style new_key = f"{model_type}_{new_suffix}" # Assign the corresponding value to the new key in the results dictionary transformed_data["results"][new_key] = { new_key: round(metrics[old_key], 4) if isinstance(metrics[old_key], float) else metrics[old_key] } return transformed_data def calculate_metrics(json_output_directory, model_name): final_metrics = aggregate_metrics(json_output_directory) transformed_metrics = transform_format(final_metrics, model_name) # write to a file results_path = os.path.join(EVAL_RESULTS_PATH, '3d-pope', model_name) if not os.path.exists(results_path): os.makedirs(results_path) with open(os.path.join(results_path, 'results.json'), 'w') as f: json.dump(transformed_metrics, f, indent=4) print(json.dumps(final_metrics, indent=4))