import json import os import pandas as pd import numpy as np from src.display.formatting import has_no_nan_values, make_clickable_model from src.display.utils import AutoEvalColumn, EvalQueueColumn from src.leaderboard.read_evals import get_raw_eval_results, get_raw_model_results def get_model_leaderboard_df(results_path: str, requests_path: str="", cols: list=[], benchmark_cols: list=[], rank_col: list=[]) -> pd.DataFrame: """Creates a dataframe from all the individual experiment results""" raw_data = get_raw_model_results(results_path) all_data_json = [v.to_dict() for v in raw_data] # assert len(rank_col) <= 1, "Only one column can be selected for ranking" df = pd.DataFrame.from_records(all_data_json) df = df[benchmark_cols] # print(df.head()) # if there is one col in rank_col, this is an isolated dimension to rank by # sort by that selected column and remove NaN values if rank_col and rank_col[0] not in ["sort_by_score", "sort_by_rank"]: # df = df.dropna(subset=benchmark_cols) df = df.dropna(subset=rank_col) df = df.fillna(0.00) # print(df[rank_col[0]]) df = df.sort_values(by=[rank_col[0]], ascending=True) # print(rank_col, benchmark_cols) # print(df.head()) for col in benchmark_cols: if 'Std dev' in col or 'Score' in col: df[col] = (df[col]).map('{:.2f}'.format) df[col] = df[col].round(decimals=2) elif rank_col and rank_col[0] == "sort_by_score": # sorting by averaging all benchmark cols, except cols before offset_idx start_idx = rank_col[1] end_idx = rank_col[2] avg_scores = df.iloc[:, start_idx:end_idx].mean(axis=1) if len(rank_col) == 4: avg_col_name = f"Overall ({rank_col[3]})" else: # avg_col_name = "Average Score" avg_col_name = 'Overall' df.insert(1, avg_col_name, avg_scores) df[avg_col_name] = avg_scores.round(decimals=4) df = df.sort_values(by=[avg_col_name], ascending=False) df[avg_col_name] = df[avg_col_name].map('{:.2f}'.format) # df = df.drop(columns=benchmark_cols[offset_idx:]) # print(benchmark_cols) # print(df.head()) # insert a rank column rank = np.arange(1, len(df)+1) df.insert(0, 'Rank', rank) for col in benchmark_cols: if 'Std dev' in col or 'Score' in col: df[col] = (df[col]).map('{:.2f}'.format) df[col] = df[col].round(decimals=2) # df = df.fillna('--') df.replace("nan", '--', inplace=True) elif rank_col and rank_col[0] == "sort_by_rank": # else: # when rank_col, the first in benchmark_cols is empty, sort by averaging all the benchmarks, except the first one start_idx = rank_col[1] end_idx = rank_col[2] avg_rank = df.iloc[:, start_idx:end_idx].mean(axis=1) if len(rank_col) == 4: avg_col_name = f"Overall ({rank_col[3]})" else: # avg_col_name = "Average Rank" avg_col_name = 'Overall' df.insert(1, avg_col_name, avg_rank) df[avg_col_name] = avg_rank.round(decimals=4) df = df.sort_values(by=[avg_col_name], ascending=True) df[avg_col_name] = df[avg_col_name].map('{:.2f}'.format) # we'll skip NaN, instrad of deleting the whole row df = df.fillna('--') # insert a rank column rank = np.arange(1, len(df)+1) df.insert(0, 'Rank', rank) # print(benchmark_cols) # df.style.background_gradient(cmap='coolwarm', subset=benchmark_cols) # for col in benchmark_cols: # # print(col) # # if 'Std dev' in col or 'Score' in col: # if 'Std dev' in col or 'Score' in col: # # if set(['Chemistry', 'Reasoning']).intersection(set(col.split())): # # df[col] = (df[col]).map('{:.2f}'.format) # # else: # # df[col] = (df[col]*100).map('{:.2f}'.format) # # if "Chemistry" in col or "C++" in col: # if "Chemistry" in col or "C++" in col or "Overall" in col or "Probability" in col or "Logical" in col: # df[col] = (df[col]).map('{:.2f}'.format) # else: # df[col] = (df[col]*100).map('{:.2f}'.format) # df[col] = df[col].round(decimals=2) # df = df.sort_values(by=[AutoEvalColumn.score.name], ascending=True) # df[AutoEvalColumn.rank.name] = df[AutoEvalColumn.score.name].rank(ascending=True, method="min") # print(cols) # [] # print(df.columns) # ['eval_name', 'Model', 'Hub License', 'Organization', 'Knowledge cutoff', 'Overall'] # exit() # only keep the columns that are in the cols list # for col in cols: # if col not in df.columns: # df[col] = None # else: # df = df[cols].round(decimals=2) # filter out if any of the benchmarks have not been produced # df = df[has_no_nan_values(df, benchmark_cols)] return df def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: """Creates a dataframe from all the individual experiment results""" raw_data = get_raw_eval_results(results_path, requests_path) # raw_data = get_raw_model_results(results_path) all_data_json = [v.to_dict() for v in raw_data] df = pd.DataFrame.from_records(all_data_json) df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) for col in cols: if col not in df.columns: df[col] = None else: df[col] = df[col].round(decimals=2) # filter out if any of the benchmarks have not been produced df = df[has_no_nan_values(df, benchmark_cols)] return df def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: """Creates the different dataframes for the evaluation queues requestes""" entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] all_evals = [] for entry in entries: if ".json" in entry: file_path = os.path.join(save_path, entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) elif ".md" not in entry: # this is a folder sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] for sub_entry in sub_entries: file_path = os.path.join(save_path, entry, sub_entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] running_list = [e for e in all_evals if e["status"] == "RUNNING"] finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] df_pending = pd.DataFrame.from_records(pending_list, columns=cols) df_running = pd.DataFrame.from_records(running_list, columns=cols) df_finished = pd.DataFrame.from_records(finished_list, columns=cols) return df_finished[cols], df_running[cols], df_pending[cols]