DGEB / leaderboard /app.py
add paper link
import importlib.util
import json
import math
from pathlib import Path
from typing import List
import gradio as gr
import pandas as pd
from pydantic import ValidationError, parse_obj_as
# HACK: very hacky way to import from parent directory, while avoiding needing all the deps of the parent package
modality_path = "../dgeb/modality.py"
spec = importlib.util.spec_from_file_location("modality", modality_path)
modality = importlib.util.module_from_spec(spec)
Modality = modality.Modality
tasks_path = "../dgeb/tasks/tasks.py"
# Load the module
spec = importlib.util.spec_from_file_location("tasks", tasks_path)
tasks = importlib.util.module_from_spec(spec)
TaskResult = tasks.TaskResult
DGEBModel = tasks.DGEBModel
# Assuming the class definitions provided above are complete and imported here
def format_num_params(param: int) -> str:
# if the number of parameters is greater than 1 billion, display billion
million = 1_000_000
# billion = 1_000_000_000
# if param >= billion:
# num_billions = int(param / 1_000_000_000)
# return f"{num_billions:}B"
if param >= million:
num_millions = int(param / 1_000_000)
return f"{num_millions:}M"
return f"{param:,}"
def load_json_files_from_directory(directory_path: Path) -> List[dict]:
Recursively load all JSON files within the specified directory path.
:param directory_path: Path to the directory to search for JSON files.
:return: List of dictionaries loaded from JSON files.
json_files_content = []
for json_file in directory_path.rglob("*.json"): # Recursively find all JSON files
with open(json_file, "r", encoding="utf-8") as file:
json_content = json.load(file)
except Exception as e:
print(f"Error loading {json_file}: {e}")
return json_files_content
def load_results() -> List[TaskResult]:
Recursively load JSON files in ./submissions/** and return a list of TaskResult objects.
submissions_path = Path("./submissions")
json_contents = load_json_files_from_directory(submissions_path)
task_results_objects = []
for content in json_contents:
task_result = parse_obj_as(
TaskResult, content
) # Using Pydantic's parse_obj_as for creating TaskResult objects
except ValidationError as e:
print(f"Error parsing TaskResult object: {e}")
raise e
return task_results_objects
def task_results_to_dgeb_score(
model: DGEBModel, model_results: List[TaskResult]
) -> dict:
best_scores_per_task = []
modalities_seen = set()
for task_result in model_results:
assert (
task_result.model.hf_name == model.hf_name
), f"Model names do not match, {task_result.model.hf_name} != {model.hf_name}"
primary_metric_id = task_result.task.primary_metric_id
scores = []
# Get the primary score for each layer.
for result in task_result.results:
for metric in result.metrics:
if metric.id == primary_metric_id:
best_score = max(scores)
assert (
len(modalities_seen) == 1
), f"Multiple modalities found for model {model.hf_name}"
# Calculate the average of the best scores for each task.
assert len(best_scores_per_task) > 0, f"No tasks found for model {model.hf_name}"
dgeb_score = sum(best_scores_per_task) / len(best_scores_per_task)
return {
"Task Name": "DGEB Score",
"Task Category": "DGEB",
"Model": model.hf_name,
"Modality": list(modalities_seen)[0],
"Num. Parameters (millions)": format_num_params(model.num_params),
"Emb. Dimension": model.embed_dim,
"Score": dgeb_score,
def task_results_to_df(model_results: List[TaskResult]) -> pd.DataFrame:
# Initialize an empty list to hold all rows of data
data_rows = []
all_models = {}
for res in model_results:
task = res.task
model = res.model
all_models[model.hf_name] = model
print(f"Processing {task.display_name} for {model.hf_name}")
for layer in res.results:
total_layers = model.num_layers - 1
mid_layer = math.ceil(total_layers / 2)
if mid_layer == layer.layer_number:
layer.layer_display_name = "mid"
elif total_layers == layer.layer_number:
layer.layer_display_name = "last"
if layer.layer_display_name not in ["mid", "last"]:
# calculate if the layer is mid or last
f"Layer {layer.layer_number} is not mid or last out of {total_layers}. Skipping"
# For each Metric in the Layer
# pivoting the data so that each metric is a row
metric_ids = []
primary_metric_label = f"{task.primary_metric_id} (primary metric)"
for metric in layer.metrics:
if task.primary_metric_id == metric.id:
metric_values = [metric.value for metric in layer.metrics]
zipped = zip(metric_ids, metric_values)
# sort primary metric id first
sorted_zip = sorted(
key=lambda x: x[0] != primary_metric_label,
"Task Name": task.display_name,
"Task Category": task.type,
"Model": model.hf_name,
"Num. Parameters (millions)": format_num_params(
"Emb. Dimension": model.embed_dim,
"Modality": task.modality,
"Layer": layer.layer_display_name,
for model_name, model in all_models.items():
results_for_model = [
res for res in model_results if res.model.hf_name == model_name
assert len(results_for_model) > 0, f"No results found for model {model_name}"
dgeb_score_record = task_results_to_dgeb_score(model, results_for_model)
print(f'model {model.hf_name} dgeb score: {dgeb_score_record["Score"]}')
print("Finished processing all results")
df = pd.DataFrame(data_rows)
return df
df = task_results_to_df(load_results())
image_path = "./DGEB_Figure.png"
with gr.Blocks() as demo:
gr.Label("Diverse Genomic Embedding Benchmark", show_label=False, scale=2)
f"<img src='file/{image_path}' alt='DGEB Figure' style='border-radius: 0.8rem; width: 50%; margin-left: auto; margin-right: auto; margin-top:12px;'>"
<div style='width: 50%; margin-left: auto; margin-right: auto; padding-bottom: 8px;text-align: center;'>
DGEB Leaderboard. To submit, refer to the <a href="https://github.com/TattaBio/DGEB/blob/leaderboard/README.md" target="_blank" style="text-decoration: underline">DGEB GitHub repository</a> Refer to the <a href="https://www.tatta.bio/dgeb" target="_blank" style="text-decoration: underline">DGEB paper</a> for details on metrics, tasks, and models.
unique_categories = df["Task Category"].unique()
# sort "DGEB" to the start
unique_categories = sorted(unique_categories, key=lambda x: x != "DGEB")
for category in unique_categories:
with gr.Tab(label=category):
unique_tasks_in_category = df[df["Task Category"] == category][
"Task Name"
# sort "Overall" to the start
unique_tasks_in_category = sorted(
unique_tasks_in_category, key=lambda x: x != "Overall"
for task in unique_tasks_in_category:
with gr.Tab(label=task):
columns_to_hide = ["Task Name", "Task Category"]
# get rows where Task Name == task and Task Category == category
filtered_df = (
(df["Task Name"] == task)
& (df["Task Category"] == category)
).dropna(axis=1, how="all") # drop all NaN columns for Overall tab
# round all values to 4 decimal places
rounded_df = filtered_df.round(SIG_FIGS)
# calculate ranking column
# if in Overview tab, rank by average metric value
if task == "Overall":
# rank by average col
rounded_df["Rank"] = filtered_df["Average"].rank(
avoid_cols = [
"Emb. Dimension",
"Num. Parameters (millions)",
rounded_df["Rank"] = (
rounded_df.drop(columns=avoid_cols, errors="ignore")
# make Rank first column
cols = list(rounded_df.columns)
cols.insert(0, cols.pop(cols.index("Rank")))
rounded_df = rounded_df[cols]
# sort by rank
rounded_df = rounded_df.sort_values("Rank")
data_frame = gr.DataFrame(rounded_df)