import os
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download
from src.about import (
BOTTOM_LOGO,
CITATION_BUTTON_LABEL,
CITATION_BUTTON_LABEL_JA,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
EVALUATION_QUEUE_TEXT_JA,
INTRODUCTION_TEXT,
INTRODUCTION_TEXT_JA,
LLM_BENCHMARKS_TEXT,
LLM_BENCHMARKS_TEXT_JA,
TITLE,
TaskType,
)
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AddSpecialTokens,
AutoEvalColumn,
ModelType,
NumFewShots,
Precision,
Version,
fields,
)
from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
def restart_space():
API.restart_space(repo_id=REPO_ID)
# Space initialization
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO,
local_dir=EVAL_REQUESTS_PATH,
repo_type="dataset",
tqdm_class=None,
etag_timeout=30,
)
except Exception:
restart_space()
try:
print(EVAL_RESULTS_PATH)
snapshot_download(
repo_id=RESULTS_REPO,
local_dir=EVAL_RESULTS_PATH,
repo_type="dataset",
tqdm_class=None,
etag_timeout=30,
)
except Exception:
restart_space()
# Searching and filtering
def filter_models(
df: pd.DataFrame,
type_query: list,
size_query: list,
precision_query: list,
add_special_tokens_query: list,
num_few_shots_query: list,
version_query: list,
# backend_query: list,
) -> pd.DataFrame:
print(f"Initial df shape: {df.shape}")
print(f"Initial df content:\n{df}")
filtered_df = df
# Model Type フィルタリング
type_column = "T" if "T" in df.columns else "Type_"
type_emoji = [t.split()[0] for t in type_query]
filtered_df = df[df[type_column].isin(type_emoji)]
print(f"After type filter: {filtered_df.shape}")
# Precision フィルタリング
filtered_df = filtered_df[filtered_df["Precision"].isin(precision_query)]
print(f"After precision filter: {filtered_df.shape}")
# Model Size フィルタリング
size_mask = filtered_df["#Params (B)"].apply(
lambda x: any(x in NUMERIC_INTERVALS[s] for s in size_query if s != "Unknown")
)
if "Unknown" in size_query:
size_mask |= filtered_df["#Params (B)"].isna() | (filtered_df["#Params (B)"] == 0)
filtered_df = filtered_df[size_mask]
print(f"After size filter: {filtered_df.shape}")
# Add Special Tokens フィルタリング
filtered_df = filtered_df[filtered_df["Add Special Tokens"].isin(add_special_tokens_query)]
print(f"After add_special_tokens filter: {filtered_df.shape}")
# Num Few Shots フィルタリング
filtered_df = filtered_df[filtered_df["Few-shot"].astype(str).isin(num_few_shots_query)]
print(f"After num_few_shots filter: {filtered_df.shape}")
# Version フィルタリング
filtered_df = filtered_df[filtered_df["llm-jp-eval version"].isin(version_query)]
print(f"After version filter: {filtered_df.shape}")
# Backend フィルタリング
# filtered_df = filtered_df[filtered_df["Backend Library"].isin(backend_query)]
# print(f"After backend filter: {filtered_df.shape}")
print("Filtered dataframe head:")
print(filtered_df.head())
return filtered_df
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[df[AutoEvalColumn.dummy.name].str.contains(query, case=False)]
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame:
"""Added by Abishek"""
if not query:
return filtered_df
final_df = []
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
filtered_df = filtered_df.drop_duplicates(
subset=[
AutoEvalColumn.model.name,
AutoEvalColumn.precision.name,
AutoEvalColumn.revision.name,
]
)
return filtered_df
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [
AutoEvalColumn.model_type_symbol.name, # 'T'
AutoEvalColumn.model.name, # 'Model'
]
# 'always_here_cols' を 'columns' から除外して重複を避ける
columns = [c for c in columns if c not in always_here_cols]
new_columns = always_here_cols + [c for c in COLS if c in df.columns and c in columns]
# 重複を排除しつつ順序を維持
seen = set()
unique_columns = []
for c in new_columns:
if c not in seen:
unique_columns.append(c)
seen.add(c)
# フィルタリングされたカラムでデータフレームを作成
filtered_df = df[unique_columns]
return filtered_df
def update_table(
hidden_df: pd.DataFrame,
type_query: list,
precision_query: str,
size_query: list,
add_special_tokens_query: list,
num_few_shots_query: list,
version_query: list,
# backend_query: list,
query: str,
*columns,
):
columns = [item for column in columns for item in column]
print(
f"Update table called with: type_query={type_query}, precision_query={precision_query}, size_query={size_query}"
)
print(f"hidden_df shape before filtering: {hidden_df.shape}")
filtered_df = filter_models(
hidden_df,
type_query,
size_query,
precision_query,
add_special_tokens_query,
num_few_shots_query,
version_query,
# backend_query,
)
print(f"filtered_df shape after filter_models: {filtered_df.shape}")
filtered_df = filter_queries(query, filtered_df)
print(f"filtered_df shape after filter_queries: {filtered_df.shape}")
print(
f"Filter applied: query={query}, columns={columns}, type_query={type_query}, precision_query={precision_query}"
)
print("Filtered dataframe head:")
print(filtered_df.head())
df = select_columns(filtered_df, columns)
print(f"Final df shape: {df.shape}")
print("Final dataframe head:")
print(df.head())
return df
def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists
query = request.query_params.get("query") or ""
return (
query,
query,
) # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed
# Prepare the dataframes
original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
leaderboard_df = original_df.copy()
(
finished_eval_queue_df,
running_eval_queue_df,
pending_eval_queue_df,
failed_eval_queue_df,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
leaderboard_df = filter_models(
leaderboard_df,
[t.to_str(" : ") for t in ModelType],
list(NUMERIC_INTERVALS.keys()),
[i.value.name for i in Precision],
[i.value.name for i in AddSpecialTokens],
[i.value.name for i in NumFewShots],
[i.value.name for i in Version],
# [i.value.name for i in Backend],
)
leaderboard_df_filtered = filter_models(
leaderboard_df,
[t.to_str(" : ") for t in ModelType],
list(NUMERIC_INTERVALS.keys()),
[i.value.name for i in Precision],
[i.value.name for i in AddSpecialTokens],
[i.value.name for i in NumFewShots],
[i.value.name for i in Version],
# [i.value.name for i in Backend],
)
# DataFrameの初期化部分のみを修正
initial_columns = ["T"] + [
c.name for c in fields(AutoEvalColumn) if (c.never_hidden or c.displayed_by_default) and c.name != "T"
]
leaderboard_df_filtered = select_columns(leaderboard_df, initial_columns)
# Leaderboard demo
def toggle_all_categories(action: str) -> list[gr.CheckboxGroup]:
"""全カテゴリーのチェックボックスを一括制御する関数"""
results = []
for task_type in TaskType:
if task_type == TaskType.NotTask:
# Model detailsの場合は既存の選択状態を維持
results.append(gr.CheckboxGroup())
else:
if action == "all":
# 全選択
results.append(
gr.CheckboxGroup(
value=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type
]
)
)
elif action == "none":
# 全解除
results.append(gr.CheckboxGroup(value=[]))
elif action == "avg_only":
# AVGのみ
results.append(
gr.CheckboxGroup(
value=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden
and not c.never_hidden
and c.task_type == task_type
and ((task_type == TaskType.AVG) or (task_type != TaskType.AVG and c.average))
]
)
)
return results
def plot_size_vs_score(df: pd.DataFrame, hidden_df: pd.DataFrame) -> go.Figure:
df2 = hidden_df.iloc[df.index]
df2 = df2[df2["#Params (B)"] > 0]
df2 = df2[["model_name_for_query", "#Params (B)", "AVG", "Few-shot"]]
df2["AVG"] = df2["AVG"].astype(float)
df2 = df2.rename(columns={"model_name_for_query": "Model", "Few-shot": "n-shot"})
df2["model_name_without_org_name"] = df2["Model"].str.split("/").str[-1] + " (" + df2["n-shot"] + "-shot)"
fig = px.scatter(
df2,
x="#Params (B)",
y="AVG",
text="model_name_without_org_name",
hover_data=["Model", "n-shot"],
)
fig.update_traces(
hovertemplate="%{customdata[0]}
#Params: %{x:.2f}B
n-shot: %{customdata[1]}
AVG: %{y:.4f}",
textposition="top right",
)
fig.update_layout(yaxis_range=[0, 1])
return fig
TASK_AVG_NAME_MAP = {
c.name: c.task_type.name for c in fields(AutoEvalColumn) if c.average and c.task_type != TaskType.AVG
}
def plot_average_scores(df: pd.DataFrame, hidden_df: pd.DataFrame) -> go.Figure:
df2 = hidden_df.iloc[df.index]
df2 = df2[["model_name_for_query", "Few-shot"] + list(TASK_AVG_NAME_MAP.keys())]
df2 = df2.rename(columns={"model_name_for_query": "Model", "Few-shot": "n-shot"})
df2 = df2.rename(columns=TASK_AVG_NAME_MAP)
df2["n-shot"] = df2["n-shot"].astype(int)
df2 = df2.set_index(["Model", "n-shot"]).astype(float)
fig = go.Figure()
for i, ((name, n_shot), row) in enumerate(df2.iterrows()):
visible = True if i < 3 else "legendonly" # Display only the first 3 models
fig.add_trace(
go.Scatterpolar(
r=row.values,
theta=row.index,
fill="toself",
name=f"{name} ({n_shot}-shot)",
hovertemplate="%{theta}: %{r}",
visible=visible,
)
)
fig.update_layout(
polar={
"radialaxis": {"range": [0, 1]},
},
showlegend=True,
)
return fig
SELECT_ALL_BUTTON_LABEL = "Select All"
SELECT_ALL_BUTTON_LABEL_JA = "全選択"
SELECT_NONE_BUTTON_LABEL = "Select None"
SELECT_NONE_BUTTON_LABEL_JA = "全解除"
SELECT_AVG_ONLY_BUTTON_LABEL = "AVG Only"
SELECT_AVG_ONLY_BUTTON_LABEL_JA = "AVGのみ"
shown_columns_dict: dict[str, gr.CheckboxGroup] = {}
checkboxes: list[gr.CheckboxGroup] = []
with gr.Blocks() as demo_leaderboard:
with gr.Row():
search_bar = gr.Textbox(
placeholder=" 🔍 Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Accordion("Column Filter", open=True):
with gr.Row():
with gr.Row():
select_all_button = gr.Button(SELECT_ALL_BUTTON_LABEL_JA, size="sm")
select_none_button = gr.Button(SELECT_NONE_BUTTON_LABEL_JA, size="sm")
select_avg_only_button = gr.Button(SELECT_AVG_ONLY_BUTTON_LABEL_JA, size="sm")
for task_type in TaskType:
if task_type == TaskType.NotTask:
label = "Model details"
else:
label = task_type.value
with gr.Accordion(label, open=True, elem_classes="accordion"):
with gr.Row(height=110):
shown_column = gr.CheckboxGroup(
show_label=False,
choices=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type
],
value=[
c.name
for c in fields(AutoEvalColumn)
if c.displayed_by_default
and not c.hidden
and not c.never_hidden
and c.task_type == task_type
],
elem_id="column-select",
container=False,
)
shown_columns_dict[task_type.name] = shown_column
checkboxes.append(shown_column)
# with gr.Row(height=110):
# shown_column = gr.CheckboxGroup(
# show_label=False,
# choices=[
# c.name
# for c in fields(AutoEvalColumn)
# if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type
# # and not c.average
# # or (task_type == TaskType.AVG and c.average)
# ],
# value=[
# c.name
# for c in fields(AutoEvalColumn)
# if c.displayed_by_default
# and not c.hidden
# and not c.never_hidden
# and c.task_type == task_type
# # and not c.average
# # or (task_type == TaskType.AVG and c.average)
# ],
# elem_id="column-select",
# container=False,
# )
# shown_columns_dict[task_type.name] = shown_column
with gr.Accordion("Model Filter", open=True):
with gr.Row():
filter_columns_type = gr.CheckboxGroup(
label="Model types",
choices=[t.to_str() for t in ModelType],
value=[t.to_str() for t in ModelType],
elem_id="filter-columns-type",
)
filter_columns_precision = gr.CheckboxGroup(
label="Precision",
choices=[i.value.name for i in Precision],
value=[i.value.name for i in Precision],
elem_id="filter-columns-precision",
)
filter_columns_size = gr.CheckboxGroup(
label="Model sizes (in billions of parameters)",
choices=list(NUMERIC_INTERVALS.keys()),
value=list(NUMERIC_INTERVALS.keys()),
elem_id="filter-columns-size",
)
filter_columns_add_special_tokens = gr.CheckboxGroup(
label="Add Special Tokens",
choices=[i.value.name for i in AddSpecialTokens],
value=[i.value.name for i in AddSpecialTokens],
elem_id="filter-columns-add-special-tokens",
)
filter_columns_num_few_shots = gr.CheckboxGroup(
label="Num Few Shots",
choices=[i.value.name for i in NumFewShots],
value=[i.value.name for i in NumFewShots],
elem_id="filter-columns-num-few-shots",
)
filter_columns_version = gr.CheckboxGroup(
label="llm-jp-eval version",
choices=[i.value.name for i in Version],
value=[i.value.name for i in Version],
elem_id="filter-columns-version",
)
# filter_columns_backend = gr.CheckboxGroup(
# label="Backend Library",
# choices=[i.value.name for i in Backend],
# value=[i.value.name for i in Backend],
# elem_id="filter-columns-backend",
# )
# DataFrameコンポーネントの初期化
leaderboard_table = gr.Dataframe(
value=leaderboard_df_filtered,
headers=initial_columns,
datatype=TYPES,
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
graph_size_vs_score = gr.Plot(label="Model size vs. Average score")
graph_average_scores = gr.Plot(label="Model Performance across Task Categories")
# Dummy leaderboard for handling the case when the user uses backspace key
hidden_leaderboard_table_for_search = gr.Dataframe(
value=original_df[COLS],
headers=COLS,
datatype=TYPES,
visible=False,
)
# Define a hidden component that will trigger a reload only if a query parameter has been set
hidden_search_bar = gr.Textbox(value="", visible=False)
select_all_button.click(
fn=lambda: toggle_all_categories("all"),
outputs=checkboxes,
api_name=False,
queue=False,
)
select_none_button.click(
fn=lambda: toggle_all_categories("none"),
outputs=checkboxes,
api_name=False,
queue=False,
)
select_avg_only_button.click(
fn=lambda: toggle_all_categories("avg_only"),
outputs=checkboxes,
api_name=False,
queue=False,
)
gr.on(
triggers=[
hidden_search_bar.change,
filter_columns_type.change,
filter_columns_precision.change,
filter_columns_size.change,
filter_columns_add_special_tokens.change,
filter_columns_num_few_shots.change,
filter_columns_version.change,
# filter_columns_backend.change,
search_bar.submit,
]
+ [shown_columns.change for shown_columns in shown_columns_dict.values()],
fn=update_table,
inputs=[
hidden_leaderboard_table_for_search,
filter_columns_type,
filter_columns_precision,
filter_columns_size,
filter_columns_add_special_tokens,
filter_columns_num_few_shots,
filter_columns_version,
# filter_columns_backend,
search_bar,
]
+ [shown_columns for shown_columns in shown_columns_dict.values()],
outputs=leaderboard_table,
)
leaderboard_table.change(
fn=plot_size_vs_score,
inputs=[leaderboard_table, hidden_leaderboard_table_for_search],
outputs=graph_size_vs_score,
api_name=False,
queue=False,
)
leaderboard_table.change(
fn=plot_average_scores,
inputs=[leaderboard_table, hidden_leaderboard_table_for_search],
outputs=graph_average_scores,
api_name=False,
queue=False,
)
# Check query parameter once at startup and update search bar + hidden component
demo_leaderboard.load(fn=load_query, outputs=[search_bar, hidden_search_bar])
# Submission demo
with gr.Blocks() as demo_submission:
with gr.Column():
with gr.Row():
evaluation_queue_text = gr.Markdown(EVALUATION_QUEUE_TEXT_JA, elem_classes="markdown-text")
with gr.Column():
with gr.Accordion(
f"✅ Finished Evaluations ({len(finished_eval_queue_df)})",
open=False,
):
with gr.Row():
finished_eval_table = gr.Dataframe(
value=finished_eval_queue_df,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"🔄 Running Evaluation Queue ({len(running_eval_queue_df)})",
open=False,
):
with gr.Row():
running_eval_table = gr.Dataframe(
value=running_eval_queue_df,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"⏳ Pending Evaluation Queue ({len(pending_eval_queue_df)})",
open=False,
):
with gr.Row():
pending_eval_table = gr.Dataframe(
value=pending_eval_queue_df,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"❎ Failed Evaluation Queue ({len(failed_eval_queue_df)})",
open=False,
):
with gr.Row():
failed_eval_table = gr.Dataframe(
value=failed_eval_queue_df,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Row():
gr.Markdown("# ✉️✨ Submit your model here!", elem_classes="markdown-text")
with gr.Row():
with gr.Column():
model_name_textbox = gr.Textbox(label="Model name")
revision_name_textbox = gr.Textbox(label="Revision commit", placeholder="main")
model_type = gr.Dropdown(
label="Model type",
choices=[t.to_str(" : ") for t in ModelType if t != ModelType.Unknown],
multiselect=False,
value=None,
)
with gr.Column():
precision = gr.Dropdown(
label="Precision",
choices=[i.value.name for i in Precision if i != Precision.Unknown],
multiselect=False,
value="float16",
)
add_special_tokens = gr.Dropdown(
label="AddSpecialTokens",
choices=[i.value.name for i in AddSpecialTokens if i != AddSpecialTokens.Unknown],
multiselect=False,
value="False",
)
submit_button = gr.Button("Submit Eval")
submission_result = gr.Markdown()
submit_button.click(
fn=add_new_eval,
inputs=[
model_name_textbox,
revision_name_textbox,
precision,
model_type,
add_special_tokens,
],
outputs=submission_result,
)
# Main demo
CITATION_ACCORDION_LABEL = "📙 Citation"
CITATION_ACCORDION_LABEL_JA = "📙 引用"
def set_default_language(request: gr.Request) -> gr.Radio:
if request.headers["Accept-Language"].split(",")[0].lower().startswith("ja"):
return gr.Radio(value="🇯🇵 JA")
else:
return gr.Radio(value="🇺🇸 EN")
def update_language(
language: str,
) -> tuple[
gr.Markdown,
gr.Markdown,
gr.Markdown,
gr.Textbox,
gr.Button,
gr.Button,
gr.Button,
gr.Accordion,
]:
if language == "🇯🇵 JA":
return (
gr.Markdown(value=INTRODUCTION_TEXT_JA),
gr.Markdown(value=LLM_BENCHMARKS_TEXT_JA),
gr.Markdown(value=EVALUATION_QUEUE_TEXT_JA),
gr.Textbox(label=CITATION_BUTTON_LABEL_JA),
gr.Button(value=SELECT_ALL_BUTTON_LABEL_JA),
gr.Button(value=SELECT_NONE_BUTTON_LABEL_JA),
gr.Button(value=SELECT_AVG_ONLY_BUTTON_LABEL_JA),
gr.Accordion(label=CITATION_ACCORDION_LABEL_JA),
)
else:
return (
gr.Markdown(value=INTRODUCTION_TEXT),
gr.Markdown(value=LLM_BENCHMARKS_TEXT),
gr.Markdown(value=EVALUATION_QUEUE_TEXT),
gr.Textbox(label=CITATION_BUTTON_LABEL),
gr.Button(value=SELECT_ALL_BUTTON_LABEL),
gr.Button(value=SELECT_NONE_BUTTON_LABEL),
gr.Button(value=SELECT_AVG_ONLY_BUTTON_LABEL),
gr.Accordion(label=CITATION_ACCORDION_LABEL),
)
with gr.Blocks(css_paths="style.css", theme=gr.themes.Glass()) as demo:
gr.HTML(TITLE)
introduction_text = gr.Markdown(INTRODUCTION_TEXT_JA, elem_classes="markdown-text")
with gr.Tabs() as tabs:
with gr.Tab("🏅 LLM Benchmark", elem_id="llm-benchmark-tab-table"):
demo_leaderboard.render()
with gr.Tab("📝 About", elem_id="llm-benchmark-tab-about"):
llm_benchmarks_text = gr.Markdown(LLM_BENCHMARKS_TEXT_JA, elem_classes="markdown-text")
with gr.Tab("🚀 Submit here! ", elem_id="llm-benchmark-tab-submit"):
demo_submission.render()
with gr.Row():
with gr.Accordion(CITATION_ACCORDION_LABEL_JA, open=False) as citation_accordion:
citation_button = gr.Textbox(
label=CITATION_BUTTON_LABEL_JA,
value=CITATION_BUTTON_TEXT,
lines=20,
elem_id="citation-button",
show_copy_button=True,
)
gr.HTML(BOTTOM_LOGO)
language = gr.Radio(
choices=["🇯🇵 JA", "🇺🇸 EN"],
value="🇯🇵 JA",
elem_classes="language-selector",
show_label=False,
container=False,
)
demo.load(fn=set_default_language, outputs=language)
language.change(
fn=update_language,
inputs=language,
outputs=[
introduction_text,
llm_benchmarks_text,
evaluation_queue_text,
citation_button,
select_all_button,
select_none_button,
select_avg_only_button,
citation_accordion,
],
api_name=False,
)
if __name__ == "__main__":
if os.getenv("SPACE_ID"):
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue(default_concurrency_limit=40).launch()