|
import json |
|
|
|
|
|
import os |
|
import json |
|
from sklearn.metrics import accuracy_score, precision_recall_fscore_support |
|
import re |
|
|
|
from src.envs import EVAL_RESULTS_PATH |
|
|
|
def parse_first_word(answer): |
|
|
|
first_word = re.split(r'[\s,\.]', answer.lower())[0] |
|
if first_word.startswith('yes'): |
|
return 'yes' |
|
elif first_word.startswith('no'): |
|
return 'no' |
|
else: |
|
return None |
|
|
|
def compute_metrics(true_labels, predicted_labels): |
|
|
|
valid_indices = [i for i, label in enumerate(predicted_labels) if label in ['yes', 'no']] |
|
filtered_true_labels = [true_labels[i] for i in valid_indices] |
|
filtered_predicted_labels = [predicted_labels[i] for i in valid_indices] |
|
|
|
|
|
accuracy = accuracy_score(filtered_true_labels, filtered_predicted_labels) |
|
precision, recall, f1_score, _ = precision_recall_fscore_support( |
|
filtered_true_labels, filtered_predicted_labels, average='binary', pos_label='yes') |
|
|
|
yes_ratio = filtered_predicted_labels.count('yes') / len(filtered_predicted_labels) if filtered_predicted_labels else 0 |
|
|
|
return { |
|
"Accuracy": accuracy, |
|
"Precision": precision, |
|
"Recall": recall, |
|
"F1 Score": f1_score, |
|
"Yes Ratio": yes_ratio |
|
} |
|
|
|
def aggregate_metrics(directory_path): |
|
metrics_data = {"random": {"true": [], "pred": [], "invalid": []}, |
|
"popular": {"true": [], "pred": [], "invalid": []}, |
|
"adversarial": {"true": [], "pred": [], "invalid": []}} |
|
|
|
|
|
for filename in os.listdir(directory_path): |
|
if filename.endswith(".json"): |
|
file_path = os.path.join(directory_path, filename) |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
|
|
question_type = filename.split('_')[0] |
|
if question_type in metrics_data: |
|
for entry in data[next(iter(data))]: |
|
first_word = parse_first_word(entry['predicted_answer']) |
|
if first_word is None: |
|
metrics_data[question_type]["invalid"].append(entry['predicted_answer']) |
|
metrics_data[question_type]["true"].append(entry['ground_truth_answer'].lower()) |
|
metrics_data[question_type]["pred"].append(first_word if first_word else entry['predicted_answer'].lower()) |
|
|
|
results = {} |
|
for q_type, data in metrics_data.items(): |
|
result = compute_metrics(data["true"], data["pred"]) |
|
result["Non-Binary Responses Count"] = len(data["invalid"]) |
|
result["Non-Binary Responses"] = data["invalid"] |
|
results[q_type] = result |
|
|
|
return results |
|
|
|
def transform_format(data, model_name): |
|
|
|
transformed_data = { |
|
"config": { |
|
"model_name": model_name |
|
}, |
|
"results": {} |
|
} |
|
|
|
|
|
key_mapping = { |
|
"Accuracy": "accuracy", |
|
"Precision": "precision", |
|
"Recall": "recall", |
|
"F1 Score": "f1_score", |
|
"Yes Ratio": "yes_percentage" |
|
} |
|
|
|
|
|
for model_type, metrics in data.items(): |
|
for old_key, new_suffix in key_mapping.items(): |
|
|
|
new_key = f"{model_type}_{new_suffix}" |
|
|
|
transformed_data["results"][new_key] = { |
|
new_key: round(metrics[old_key], 4) if isinstance(metrics[old_key], float) else metrics[old_key] |
|
} |
|
|
|
return transformed_data |
|
|
|
def calculate_metrics(json_output_directory, model_name): |
|
final_metrics = aggregate_metrics(json_output_directory) |
|
transformed_metrics = transform_format(final_metrics, model_name) |
|
|
|
results_path = os.path.join(EVAL_RESULTS_PATH, '3d-pope', model_name) |
|
if not os.path.exists(results_path): |
|
os.makedirs(results_path) |
|
with open(os.path.join(results_path, 'results.json'), 'w') as f: |
|
json.dump(transformed_metrics, f, indent=4) |
|
print(json.dumps(final_metrics, indent=4)) |