Madhavan Iyengar
add submission capability
ab76bab
import json
import os
import json
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import re
from src.envs import EVAL_RESULTS_PATH
def parse_first_word(answer):
# Extract the first word and check if it's 'yes' or 'no'
first_word = re.split(r'[\s,\.]', answer.lower())[0]
if first_word.startswith('yes'):
return 'yes'
elif first_word.startswith('no'):
return 'no'
else:
return None
def compute_metrics(true_labels, predicted_labels):
# Filtering out invalid answers
valid_indices = [i for i, label in enumerate(predicted_labels) if label in ['yes', 'no']]
filtered_true_labels = [true_labels[i] for i in valid_indices]
filtered_predicted_labels = [predicted_labels[i] for i in valid_indices]
# Calculating metrics
accuracy = accuracy_score(filtered_true_labels, filtered_predicted_labels)
precision, recall, f1_score, _ = precision_recall_fscore_support(
filtered_true_labels, filtered_predicted_labels, average='binary', pos_label='yes')
yes_ratio = filtered_predicted_labels.count('yes') / len(filtered_predicted_labels) if filtered_predicted_labels else 0
return {
"Accuracy": accuracy,
"Precision": precision,
"Recall": recall,
"F1 Score": f1_score,
"Yes Ratio": yes_ratio
}
def aggregate_metrics(directory_path):
metrics_data = {"random": {"true": [], "pred": [], "invalid": []},
"popular": {"true": [], "pred": [], "invalid": []},
"adversarial": {"true": [], "pred": [], "invalid": []}}
# Process each file in the directory
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
with open(file_path, 'r') as f:
data = json.load(f)
question_type = filename.split('_')[0]
if question_type in metrics_data:
for entry in data[next(iter(data))]:
first_word = parse_first_word(entry['predicted_answer'])
if first_word is None:
metrics_data[question_type]["invalid"].append(entry['predicted_answer'])
metrics_data[question_type]["true"].append(entry['ground_truth_answer'].lower())
metrics_data[question_type]["pred"].append(first_word if first_word else entry['predicted_answer'].lower())
results = {}
for q_type, data in metrics_data.items():
result = compute_metrics(data["true"], data["pred"])
result["Non-Binary Responses Count"] = len(data["invalid"])
result["Non-Binary Responses"] = data["invalid"]
results[q_type] = result
return results
def transform_format(data, model_name):
# Define the new format's base structure
transformed_data = {
"config": {
"model_name": model_name
},
"results": {}
}
# Mapping of old keys to new keys
key_mapping = {
"Accuracy": "accuracy",
"Precision": "precision",
"Recall": "recall",
"F1 Score": "f1_score",
"Yes Ratio": "yes_percentage"
}
# Iterate over each item in the original data
for model_type, metrics in data.items():
for old_key, new_suffix in key_mapping.items():
# Format the new key according to the required format 2 style
new_key = f"{model_type}_{new_suffix}"
# Assign the corresponding value to the new key in the results dictionary
transformed_data["results"][new_key] = {
new_key: round(metrics[old_key], 4) if isinstance(metrics[old_key], float) else metrics[old_key]
}
return transformed_data
def calculate_metrics(json_output_directory, model_name):
final_metrics = aggregate_metrics(json_output_directory)
transformed_metrics = transform_format(final_metrics, model_name)
# write to a file
results_path = os.path.join(EVAL_RESULTS_PATH, '3d-pope', model_name)
if not os.path.exists(results_path):
os.makedirs(results_path)
with open(os.path.join(results_path, 'results.json'), 'w') as f:
json.dump(transformed_metrics, f, indent=4)
print(json.dumps(final_metrics, indent=4))