import os
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download
from src.about import (
BOTTOM_LOGO,
CITATION_BUTTON_LABEL,
CITATION_BUTTON_LABEL_JA,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
EVALUATION_QUEUE_TEXT_JA,
INTRODUCTION_TEXT,
INTRODUCTION_TEXT_JA,
LLM_BENCHMARKS_TEXT,
LLM_BENCHMARKS_TEXT_JA,
TITLE,
TaskType,
)
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AddSpecialTokens,
AutoEvalColumn,
ModelType,
NumFewShots,
Precision,
Version,
VllmVersion,
fields,
)
from src.envs import API, CONTENTS_REPO, EVAL_REQUESTS_PATH, QUEUE_REPO, REPO_ID
from src.i18n import (
CITATION_ACCORDION_LABEL,
CITATION_ACCORDION_LABEL_JA,
SELECT_ALL_BUTTON_LABEL,
SELECT_ALL_BUTTON_LABEL_JA,
SELECT_AVG_ONLY_BUTTON_LABEL,
SELECT_AVG_ONLY_BUTTON_LABEL_JA,
SELECT_NONE_BUTTON_LABEL,
SELECT_NONE_BUTTON_LABEL_JA,
)
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
def restart_space():
API.restart_space(repo_id=REPO_ID)
# Space initialization
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO,
local_dir=EVAL_REQUESTS_PATH,
repo_type="dataset",
tqdm_class=None,
etag_timeout=30,
)
except Exception:
restart_space()
# Get dataframes
(
FINISHED_EVAL_QUEUE_DF,
RUNNING_EVAL_QUEUE_DF,
PENDING_EVAL_QUEUE_DF,
FAILED_EVAL_QUEUE_DF,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
ORIGINAL_DF = get_leaderboard_df(CONTENTS_REPO, COLS, BENCHMARK_COLS)
MAX_MODEL_SIZE = ORIGINAL_DF["#Params (B)"].max()
# Searching and filtering
def filter_models(
df: pd.DataFrame,
type_query: list,
size_query: list,
precision_query: list,
add_special_tokens_query: list,
num_few_shots_query: list,
version_query: list,
vllm_query: list,
) -> pd.DataFrame:
print(f"Initial df shape: {df.shape}")
print(f"Initial df content:\n{df}")
filtered_df = df
# Model Type フィルタリング
type_column = "T" if "T" in df.columns else "Type_"
type_emoji = [t.split()[0] for t in type_query]
filtered_df = df[df[type_column].isin(type_emoji)]
print(f"After type filter: {filtered_df.shape}")
# Precision フィルタリング
filtered_df = filtered_df[filtered_df["Precision"].isin(precision_query)]
print(f"After precision filter: {filtered_df.shape}")
# Model Size フィルタリング
size_mask = filtered_df["#Params (B)"].apply(
lambda x: any(x in NUMERIC_INTERVALS[s] for s in size_query if s != "Unknown")
)
if "Unknown" in size_query:
size_mask |= filtered_df["#Params (B)"].isna() | (filtered_df["#Params (B)"] == 0)
filtered_df = filtered_df[size_mask]
print(f"After size filter: {filtered_df.shape}")
# Add Special Tokens フィルタリング
filtered_df = filtered_df[filtered_df["Add Special Tokens"].isin(add_special_tokens_query)]
print(f"After add_special_tokens filter: {filtered_df.shape}")
# Num Few Shots フィルタリング
filtered_df = filtered_df[filtered_df["Few-shot"].astype(str).isin(num_few_shots_query)]
print(f"After num_few_shots filter: {filtered_df.shape}")
# Version フィルタリング
filtered_df = filtered_df[filtered_df["llm-jp-eval version"].isin(version_query)]
print(f"After version filter: {filtered_df.shape}")
# Vllm Version フィルタリング
filtered_df = filtered_df[filtered_df["vllm version"].isin(vllm_query)]
print(f"After vllm version filter: {filtered_df.shape}")
print("Filtered dataframe head:")
print(filtered_df.head())
return filtered_df
def search_model_by_name(df: pd.DataFrame, model_name: str) -> pd.DataFrame:
return df[df[AutoEvalColumn.dummy.name].str.contains(model_name, case=False)]
def search_models_by_multiple_names(df: pd.DataFrame, search_text: str) -> pd.DataFrame:
if not search_text:
return df
model_names = [name.strip() for name in search_text.split(";")]
dfs = [search_model_by_name(df, name) for name in model_names if name]
return pd.concat(dfs).drop_duplicates(subset=AutoEvalColumn.row_id.name)
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [
AutoEvalColumn.model_type_symbol.name, # 'T'
AutoEvalColumn.model.name, # 'Model'
]
# 'always_here_cols' を 'columns' から除外して重複を避ける
columns = [c for c in columns if c not in always_here_cols]
new_columns = (
always_here_cols + [c for c in COLS if c in df.columns and c in columns] + [AutoEvalColumn.row_id.name]
)
# 重複を排除しつつ順序を維持
seen = set()
unique_columns = []
for c in new_columns:
if c not in seen:
unique_columns.append(c)
seen.add(c)
# フィルタリングされたカラムでデータフレームを作成
filtered_df = df[unique_columns]
return filtered_df
def update_table(
type_query: list,
precision_query: str,
size_query: list,
add_special_tokens_query: list,
num_few_shots_query: list,
version_query: list,
vllm_query: list,
query: str,
*columns,
):
columns = [item for column in columns for item in column]
print(
f"Update table called with: type_query={type_query}, precision_query={precision_query}, size_query={size_query}"
)
filtered_df = filter_models(
ORIGINAL_DF,
type_query,
size_query,
precision_query,
add_special_tokens_query,
num_few_shots_query,
version_query,
vllm_query,
)
print(f"filtered_df shape after filter_models: {filtered_df.shape}")
filtered_df = search_models_by_multiple_names(filtered_df, query)
print(f"filtered_df shape after search_models_by_multiple_names: {filtered_df.shape}")
print(
f"Filter applied: query={query}, columns={columns}, type_query={type_query}, precision_query={precision_query}"
)
print("Filtered dataframe head:")
print(filtered_df.head())
df = select_columns(filtered_df, columns)
print(f"Final df shape: {df.shape}")
print("Final dataframe head:")
print(df.head())
return df
# Prepare the dataframes
leaderboard_df = ORIGINAL_DF.copy()
leaderboard_df = filter_models(
leaderboard_df,
[t.to_str(" : ") for t in ModelType],
list(NUMERIC_INTERVALS.keys()),
[i.value.name for i in Precision],
[i.value.name for i in AddSpecialTokens],
[i.value.name for i in NumFewShots],
[i.value.name for i in Version],
[i.value.name for i in VllmVersion],
)
# DataFrameの初期化部分のみを修正
INITIAL_COLUMNS = ["T"] + [
c.name for c in fields(AutoEvalColumn) if (c.never_hidden or c.displayed_by_default) and c.name != "T"
]
leaderboard_df = select_columns(leaderboard_df, INITIAL_COLUMNS)
# Leaderboard demo
def toggle_all_categories(action: str) -> list[gr.CheckboxGroup]:
"""全カテゴリーのチェックボックスを一括制御する関数"""
results = []
for task_type in TaskType:
if task_type == TaskType.NotTask:
# Model detailsの場合は既存の選択状態を維持
results.append(gr.CheckboxGroup())
else:
if action == "all":
# 全選択
results.append(
gr.CheckboxGroup(
value=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type
]
)
)
elif action == "none":
# 全解除
results.append(gr.CheckboxGroup(value=[]))
elif action == "avg_only":
# AVGのみ
results.append(
gr.CheckboxGroup(
value=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden
and not c.never_hidden
and c.task_type == task_type
and ((task_type == TaskType.AVG) or (task_type != TaskType.AVG and c.average))
]
)
)
return results
TASK_AVG_NAME_MAP = {
c.name: c.task_type.name for c in fields(AutoEvalColumn) if c.average and c.task_type != TaskType.AVG
}
def plot_size_vs_score(df_filtered: pd.DataFrame) -> go.Figure:
df = ORIGINAL_DF[ORIGINAL_DF[AutoEvalColumn.row_id.name].isin(df_filtered[AutoEvalColumn.row_id.name])]
df = df[df["#Params (B)"] > 0]
AVG_COLUMNS = ["AVG"] + list(TASK_AVG_NAME_MAP.keys())
df = df[["model_name_for_query", "#Params (B)", "Few-shot"] + AVG_COLUMNS]
df[AVG_COLUMNS] = df[AVG_COLUMNS].astype(float)
df = df.rename(columns={"model_name_for_query": "Model", "Few-shot": "n-shot"})
df["model_name_without_org_name"] = df["Model"].str.split("/").str[-1] + " (" + df["n-shot"].astype(str) + "-shot)"
df = pd.melt(
df,
id_vars=["Model", "model_name_without_org_name", "#Params (B)", "n-shot"],
value_vars=AVG_COLUMNS,
var_name="Category",
value_name="Score",
)
fig = px.scatter(
df,
x="#Params (B)",
y="Score",
text="model_name_without_org_name",
color="Category",
hover_data=["Model", "n-shot", "Category"],
)
fig.update_traces(
hovertemplate="%{customdata[0]}
#Params: %{x:.2f}B
n-shot: %{customdata[1]}
%{customdata[2]}: %{y:.4f}",
textposition="top right",
)
for trace in fig.data:
if trace.name != "AVG":
trace.visible = "legendonly"
fig.update_layout(xaxis_range=[0, MAX_MODEL_SIZE * 1.2], yaxis_range=[0, 1])
fig.update_layout(
updatemenus=[
dict(
type="buttons",
direction="left",
showactive=True,
buttons=[
dict(label="Show Labels", method="update", args=[{"mode": ["markers+text"]}]),
dict(label="Hide Labels", method="update", args=[{"mode": ["markers"]}]),
],
x=0.5,
y=-0.2,
xanchor="center",
yanchor="top",
)
]
)
return fig
def plot_average_scores(df_filtered: pd.DataFrame) -> go.Figure:
df = ORIGINAL_DF[ORIGINAL_DF[AutoEvalColumn.row_id.name].isin(df_filtered[AutoEvalColumn.row_id.name])]
df = df[["model_name_for_query", "Few-shot"] + list(TASK_AVG_NAME_MAP.keys())]
df = df.rename(columns={"model_name_for_query": "Model", "Few-shot": "n-shot"})
df = df.rename(columns=TASK_AVG_NAME_MAP)
df["n-shot"] = df["n-shot"].astype(int)
df = df.set_index(["Model", "n-shot"]).astype(float)
fig = go.Figure()
for i, ((name, n_shot), row) in enumerate(df.iterrows()):
visible = True if i < 2 else "legendonly" # Display only the first 2 models
fig.add_trace(
go.Scatterpolar(
r=row.values,
theta=row.index,
fill="toself",
name=f"{name} ({n_shot}-shot)",
hovertemplate="%{theta}: %{r}",
visible=visible,
)
)
fig.update_layout(
polar={
"radialaxis": {"range": [0, 1]},
},
showlegend=True,
)
return fig
shown_columns_dict: dict[str, gr.CheckboxGroup] = {}
checkboxes: list[gr.CheckboxGroup] = []
with gr.Blocks() as demo_leaderboard:
with gr.Row():
search_bar = gr.Textbox(
placeholder=" 🔍 Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Accordion("Column Filter", open=True):
with gr.Row():
with gr.Row():
select_all_button = gr.Button(SELECT_ALL_BUTTON_LABEL_JA, size="sm")
select_none_button = gr.Button(SELECT_NONE_BUTTON_LABEL_JA, size="sm")
select_avg_only_button = gr.Button(SELECT_AVG_ONLY_BUTTON_LABEL_JA, size="sm")
for task_type in TaskType:
if task_type == TaskType.NotTask:
label = "Model details"
else:
label = task_type.value
with gr.Accordion(label, open=True, elem_classes="accordion"):
with gr.Row(height=110):
shown_column = gr.CheckboxGroup(
show_label=False,
choices=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type
],
value=[
c.name
for c in fields(AutoEvalColumn)
if c.displayed_by_default
and not c.hidden
and not c.never_hidden
and c.task_type == task_type
],
elem_id="column-select",
container=False,
)
shown_columns_dict[task_type.name] = shown_column
checkboxes.append(shown_column)
with gr.Accordion("Model Filter", open=True):
with gr.Row():
filter_columns_type = gr.CheckboxGroup(
label="Model types",
choices=[t.to_str() for t in ModelType],
value=[t.to_str() for t in ModelType],
elem_id="filter-columns-type",
)
filter_columns_precision = gr.CheckboxGroup(
label="Precision",
choices=[i.value.name for i in Precision],
value=[i.value.name for i in Precision],
elem_id="filter-columns-precision",
)
filter_columns_size = gr.CheckboxGroup(
label="Model sizes (in billions of parameters)",
choices=list(NUMERIC_INTERVALS.keys()),
value=list(NUMERIC_INTERVALS.keys()),
elem_id="filter-columns-size",
)
filter_columns_add_special_tokens = gr.CheckboxGroup(
label="Add Special Tokens",
choices=[i.value.name for i in AddSpecialTokens],
value=[i.value.name for i in AddSpecialTokens],
elem_id="filter-columns-add-special-tokens",
)
filter_columns_num_few_shots = gr.CheckboxGroup(
label="Num Few Shots",
choices=[i.value.name for i in NumFewShots],
value=[i.value.name for i in NumFewShots],
elem_id="filter-columns-num-few-shots",
)
filter_columns_version = gr.CheckboxGroup(
label="llm-jp-eval version",
choices=[i.value.name for i in Version],
value=[i.value.name for i in Version],
elem_id="filter-columns-version",
)
filter_columns_vllm = gr.CheckboxGroup(
label="vllm version",
choices=[i.value.name for i in VllmVersion],
value=[i.value.name for i in VllmVersion],
elem_id="filter-columns-vllm",
)
leaderboard_table = gr.Dataframe(
value=leaderboard_df,
headers=INITIAL_COLUMNS,
datatype=TYPES,
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
graph_size_vs_score = gr.Plot(label="Size vs. Score")
graph_average_scores = gr.Plot(label="Performance across Task Categories")
select_all_button.click(
fn=lambda: toggle_all_categories("all"),
outputs=checkboxes,
api_name=False,
queue=False,
)
select_none_button.click(
fn=lambda: toggle_all_categories("none"),
outputs=checkboxes,
api_name=False,
queue=False,
)
select_avg_only_button.click(
fn=lambda: toggle_all_categories("avg_only"),
outputs=checkboxes,
api_name=False,
queue=False,
)
gr.on(
triggers=[
filter_columns_type.change,
filter_columns_precision.change,
filter_columns_size.change,
filter_columns_add_special_tokens.change,
filter_columns_num_few_shots.change,
filter_columns_version.change,
filter_columns_vllm.change,
search_bar.submit,
]
+ [shown_columns.change for shown_columns in shown_columns_dict.values()],
fn=update_table,
inputs=[
filter_columns_type,
filter_columns_precision,
filter_columns_size,
filter_columns_add_special_tokens,
filter_columns_num_few_shots,
filter_columns_version,
filter_columns_vllm,
search_bar,
]
+ [shown_columns for shown_columns in shown_columns_dict.values()],
outputs=leaderboard_table,
)
leaderboard_table.change(
fn=plot_size_vs_score,
inputs=leaderboard_table,
outputs=graph_size_vs_score,
api_name=False,
queue=False,
)
leaderboard_table.change(
fn=plot_average_scores,
inputs=leaderboard_table,
outputs=graph_average_scores,
api_name=False,
queue=False,
)
# Submission demo
with gr.Blocks() as demo_submission:
with gr.Column():
with gr.Row():
evaluation_queue_text = gr.Markdown(EVALUATION_QUEUE_TEXT_JA, elem_classes="markdown-text")
with gr.Column():
with gr.Accordion(
f"✅ Finished Evaluations ({len(FINISHED_EVAL_QUEUE_DF)})",
open=False,
):
with gr.Row():
finished_eval_table = gr.Dataframe(
value=FINISHED_EVAL_QUEUE_DF,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"🔄 Running Evaluation Queue ({len(RUNNING_EVAL_QUEUE_DF)})",
open=False,
):
with gr.Row():
running_eval_table = gr.Dataframe(
value=RUNNING_EVAL_QUEUE_DF,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"⏳ Pending Evaluation Queue ({len(PENDING_EVAL_QUEUE_DF)})",
open=False,
):
with gr.Row():
pending_eval_table = gr.Dataframe(
value=PENDING_EVAL_QUEUE_DF,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"❎ Failed Evaluation Queue ({len(FAILED_EVAL_QUEUE_DF)})",
open=False,
):
with gr.Row():
failed_eval_table = gr.Dataframe(
value=FAILED_EVAL_QUEUE_DF,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Row():
gr.Markdown("# ✉️✨ Submit your model here!", elem_classes="markdown-text")
with gr.Row():
with gr.Column():
model_name_textbox = gr.Textbox(label="Model name")
revision_name_textbox = gr.Textbox(label="Revision commit", placeholder="main")
model_type = gr.Dropdown(
label="Model type",
choices=[t.to_str(" : ") for t in ModelType if t != ModelType.Unknown],
multiselect=False,
value=None,
)
with gr.Column():
precision = gr.Dropdown(
label="Precision",
choices=[i.value.name for i in Precision if i != Precision.Unknown],
multiselect=False,
value="float16",
)
add_special_tokens = gr.Dropdown(
label="AddSpecialTokens",
choices=[i.value.name for i in AddSpecialTokens if i != AddSpecialTokens.Unknown],
multiselect=False,
value="False",
)
submit_button = gr.Button("Submit Eval")
submission_result = gr.Markdown()
submit_button.click(
fn=add_new_eval,
inputs=[
model_name_textbox,
revision_name_textbox,
precision,
model_type,
add_special_tokens,
],
outputs=submission_result,
)
# Main demo
def set_default_language(request: gr.Request) -> gr.Radio:
if request.headers["Accept-Language"].split(",")[0].lower().startswith("ja"):
return gr.Radio(value="🇯🇵 JA")
else:
return gr.Radio(value="🇺🇸 EN")
def update_language(
language: str,
) -> tuple[
gr.Markdown, # introduction_text
gr.Markdown, # llm_benchmarks_text
gr.Markdown, # evaluation_queue_text
gr.Textbox, # citation_button
gr.Button, # select_all_button
gr.Button, # select_none_button
gr.Button, # select_avg_only_button
gr.Accordion, # citation_accordion
]:
if language == "🇯🇵 JA":
return (
gr.Markdown(value=INTRODUCTION_TEXT_JA),
gr.Markdown(value=LLM_BENCHMARKS_TEXT_JA),
gr.Markdown(value=EVALUATION_QUEUE_TEXT_JA),
gr.Textbox(label=CITATION_BUTTON_LABEL_JA),
gr.Button(value=SELECT_ALL_BUTTON_LABEL_JA),
gr.Button(value=SELECT_NONE_BUTTON_LABEL_JA),
gr.Button(value=SELECT_AVG_ONLY_BUTTON_LABEL_JA),
gr.Accordion(label=CITATION_ACCORDION_LABEL_JA),
)
else:
return (
gr.Markdown(value=INTRODUCTION_TEXT),
gr.Markdown(value=LLM_BENCHMARKS_TEXT),
gr.Markdown(value=EVALUATION_QUEUE_TEXT),
gr.Textbox(label=CITATION_BUTTON_LABEL),
gr.Button(value=SELECT_ALL_BUTTON_LABEL),
gr.Button(value=SELECT_NONE_BUTTON_LABEL),
gr.Button(value=SELECT_AVG_ONLY_BUTTON_LABEL),
gr.Accordion(label=CITATION_ACCORDION_LABEL),
)
with gr.Blocks(css_paths="style.css", theme=gr.themes.Glass()) as demo:
gr.HTML(TITLE)
introduction_text = gr.Markdown(INTRODUCTION_TEXT_JA, elem_classes="markdown-text")
with gr.Tabs() as tabs:
with gr.Tab("🏅 LLM Benchmark", elem_id="llm-benchmark-tab-table"):
demo_leaderboard.render()
with gr.Tab("📝 About", elem_id="llm-benchmark-tab-about"):
llm_benchmarks_text = gr.Markdown(LLM_BENCHMARKS_TEXT_JA, elem_classes="markdown-text")
with gr.Tab("🚀 Submit here! ", elem_id="llm-benchmark-tab-submit"):
demo_submission.render()
with gr.Row():
with gr.Accordion(CITATION_ACCORDION_LABEL_JA, open=False) as citation_accordion:
citation_button = gr.Textbox(
label=CITATION_BUTTON_LABEL_JA,
value=CITATION_BUTTON_TEXT,
lines=20,
elem_id="citation-button",
show_copy_button=True,
)
gr.HTML(BOTTOM_LOGO)
language = gr.Radio(
choices=["🇯🇵 JA", "🇺🇸 EN"],
value="🇯🇵 JA",
elem_classes="language-selector",
show_label=False,
container=False,
)
demo.load(fn=set_default_language, outputs=language)
language.change(
fn=update_language,
inputs=language,
outputs=[
introduction_text,
llm_benchmarks_text,
evaluation_queue_text,
citation_button,
select_all_button,
select_none_button,
select_avg_only_button,
citation_accordion,
],
api_name=False,
)
if __name__ == "__main__":
if os.getenv("SPACE_ID"):
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue(default_concurrency_limit=40).launch()