import os import gradio as gr import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import snapshot_download from src.about import ( BOTTOM_LOGO, CITATION_BUTTON_LABEL, CITATION_BUTTON_LABEL_JA, CITATION_BUTTON_TEXT, EVALUATION_QUEUE_TEXT, EVALUATION_QUEUE_TEXT_JA, INTRODUCTION_TEXT, INTRODUCTION_TEXT_JA, LLM_BENCHMARKS_TEXT, LLM_BENCHMARKS_TEXT_JA, TITLE, TaskType, ) from src.display.utils import ( BENCHMARK_COLS, COLS, EVAL_COLS, EVAL_TYPES, NUMERIC_INTERVALS, TYPES, AddSpecialTokens, AutoEvalColumn, ModelType, NumFewShots, Precision, Version, fields, ) from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO from src.populate import get_evaluation_queue_df, get_leaderboard_df from src.submission.submit import add_new_eval def restart_space(): API.restart_space(repo_id=REPO_ID) # Space initialization try: print(EVAL_REQUESTS_PATH) snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, ) except Exception: restart_space() try: print(EVAL_RESULTS_PATH) snapshot_download( repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, ) except Exception: restart_space() # Searching and filtering def filter_models( df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, add_special_tokens_query: list, num_few_shots_query: list, version_query: list, # backend_query: list, ) -> pd.DataFrame: print(f"Initial df shape: {df.shape}") print(f"Initial df content:\n{df}") filtered_df = df # Model Type フィルタリング type_column = "T" if "T" in df.columns else "Type_" type_emoji = [t.split()[0] for t in type_query] filtered_df = df[df[type_column].isin(type_emoji)] print(f"After type filter: {filtered_df.shape}") # Precision フィルタリング filtered_df = filtered_df[filtered_df["Precision"].isin(precision_query)] print(f"After precision filter: {filtered_df.shape}") # Model Size フィルタリング size_mask = filtered_df["#Params (B)"].apply( lambda x: any(x in NUMERIC_INTERVALS[s] for s in size_query if s != "Unknown") ) if "Unknown" in size_query: size_mask |= filtered_df["#Params (B)"].isna() | (filtered_df["#Params (B)"] == 0) filtered_df = filtered_df[size_mask] print(f"After size filter: {filtered_df.shape}") # Add Special Tokens フィルタリング filtered_df = filtered_df[filtered_df["Add Special Tokens"].isin(add_special_tokens_query)] print(f"After add_special_tokens filter: {filtered_df.shape}") # Num Few Shots フィルタリング filtered_df = filtered_df[filtered_df["Few-shot"].astype(str).isin(num_few_shots_query)] print(f"After num_few_shots filter: {filtered_df.shape}") # Version フィルタリング filtered_df = filtered_df[filtered_df["llm-jp-eval version"].isin(version_query)] print(f"After version filter: {filtered_df.shape}") # Backend フィルタリング # filtered_df = filtered_df[filtered_df["Backend Library"].isin(backend_query)] # print(f"After backend filter: {filtered_df.shape}") print("Filtered dataframe head:") print(filtered_df.head()) return filtered_df def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: return df[df[AutoEvalColumn.dummy.name].str.contains(query, case=False)] def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame: """Added by Abishek""" if not query: return filtered_df final_df = [] queries = [q.strip() for q in query.split(";")] for _q in queries: _q = _q.strip() if _q != "": temp_filtered_df = search_table(filtered_df, _q) if len(temp_filtered_df) > 0: final_df.append(temp_filtered_df) if len(final_df) > 0: filtered_df = pd.concat(final_df) filtered_df = filtered_df.drop_duplicates( subset=[ AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name, ] ) return filtered_df def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: always_here_cols = [ AutoEvalColumn.model_type_symbol.name, # 'T' AutoEvalColumn.model.name, # 'Model' ] # 'always_here_cols' を 'columns' から除外して重複を避ける columns = [c for c in columns if c not in always_here_cols] new_columns = always_here_cols + [c for c in COLS if c in df.columns and c in columns] # 重複を排除しつつ順序を維持 seen = set() unique_columns = [] for c in new_columns: if c not in seen: unique_columns.append(c) seen.add(c) # フィルタリングされたカラムでデータフレームを作成 filtered_df = df[unique_columns] return filtered_df def update_table( hidden_df: pd.DataFrame, type_query: list, precision_query: str, size_query: list, add_special_tokens_query: list, num_few_shots_query: list, version_query: list, # backend_query: list, query: str, *columns, ): columns = [item for column in columns for item in column] print( f"Update table called with: type_query={type_query}, precision_query={precision_query}, size_query={size_query}" ) print(f"hidden_df shape before filtering: {hidden_df.shape}") filtered_df = filter_models( hidden_df, type_query, size_query, precision_query, add_special_tokens_query, num_few_shots_query, version_query, # backend_query, ) print(f"filtered_df shape after filter_models: {filtered_df.shape}") filtered_df = filter_queries(query, filtered_df) print(f"filtered_df shape after filter_queries: {filtered_df.shape}") print( f"Filter applied: query={query}, columns={columns}, type_query={type_query}, precision_query={precision_query}" ) print("Filtered dataframe head:") print(filtered_df.head()) df = select_columns(filtered_df, columns) print(f"Final df shape: {df.shape}") print("Final dataframe head:") print(df.head()) return df def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists query = request.query_params.get("query") or "" return ( query, query, ) # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed # Prepare the dataframes original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) leaderboard_df = original_df.copy() ( finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df, failed_eval_queue_df, ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) leaderboard_df = filter_models( leaderboard_df, [t.to_str(" : ") for t in ModelType], list(NUMERIC_INTERVALS.keys()), [i.value.name for i in Precision], [i.value.name for i in AddSpecialTokens], [i.value.name for i in NumFewShots], [i.value.name for i in Version], # [i.value.name for i in Backend], ) leaderboard_df_filtered = filter_models( leaderboard_df, [t.to_str(" : ") for t in ModelType], list(NUMERIC_INTERVALS.keys()), [i.value.name for i in Precision], [i.value.name for i in AddSpecialTokens], [i.value.name for i in NumFewShots], [i.value.name for i in Version], # [i.value.name for i in Backend], ) # DataFrameの初期化部分のみを修正 initial_columns = ["T"] + [ c.name for c in fields(AutoEvalColumn) if (c.never_hidden or c.displayed_by_default) and c.name != "T" ] leaderboard_df_filtered = select_columns(leaderboard_df, initial_columns) # Leaderboard demo def toggle_all_categories(action: str) -> list[gr.CheckboxGroup]: """全カテゴリーのチェックボックスを一括制御する関数""" results = [] for task_type in TaskType: if task_type == TaskType.NotTask: # Model detailsの場合は既存の選択状態を維持 results.append(gr.CheckboxGroup()) else: if action == "all": # 全選択 results.append( gr.CheckboxGroup( value=[ c.name for c in fields(AutoEvalColumn) if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type ] ) ) elif action == "none": # 全解除 results.append(gr.CheckboxGroup(value=[])) elif action == "avg_only": # AVGのみ results.append( gr.CheckboxGroup( value=[ c.name for c in fields(AutoEvalColumn) if not c.hidden and not c.never_hidden and c.task_type == task_type and ((task_type == TaskType.AVG) or (task_type != TaskType.AVG and c.average)) ] ) ) return results SELECT_ALL_BUTTON_LABEL = "Select All" SELECT_ALL_BUTTON_LABEL_JA = "全選択" SELECT_NONE_BUTTON_LABEL = "Select None" SELECT_NONE_BUTTON_LABEL_JA = "全解除" SELECT_AVG_ONLY_BUTTON_LABEL = "AVG Only" SELECT_AVG_ONLY_BUTTON_LABEL_JA = "AVGのみ" shown_columns_dict: dict[str, gr.CheckboxGroup] = {} checkboxes: list[gr.CheckboxGroup] = [] with gr.Blocks() as demo_leaderboard: with gr.Row(): search_bar = gr.Textbox( placeholder=" 🔍 Search for your model (separate multiple queries with `;`) and press ENTER...", show_label=False, elem_id="search-bar", ) with gr.Row(): with gr.Row(): select_all_button = gr.Button(SELECT_ALL_BUTTON_LABEL_JA, size="sm") select_none_button = gr.Button(SELECT_NONE_BUTTON_LABEL_JA, size="sm") select_avg_only_button = gr.Button(SELECT_AVG_ONLY_BUTTON_LABEL_JA, size="sm") for task_type in TaskType: if task_type == TaskType.NotTask: label = "Model details" else: label = task_type.value with gr.Accordion(label, open=True, elem_classes="accordion"): with gr.Row(height=110): shown_column = gr.CheckboxGroup( show_label=False, choices=[ c.name for c in fields(AutoEvalColumn) if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type ], value=[ c.name for c in fields(AutoEvalColumn) if c.displayed_by_default and not c.hidden and not c.never_hidden and c.task_type == task_type ], elem_id="column-select", container=False, ) shown_columns_dict[task_type.name] = shown_column checkboxes.append(shown_column) # with gr.Row(height=110): # shown_column = gr.CheckboxGroup( # show_label=False, # choices=[ # c.name # for c in fields(AutoEvalColumn) # if not c.hidden and not c.never_hidden and not c.dummy and c.task_type == task_type # # and not c.average # # or (task_type == TaskType.AVG and c.average) # ], # value=[ # c.name # for c in fields(AutoEvalColumn) # if c.displayed_by_default # and not c.hidden # and not c.never_hidden # and c.task_type == task_type # # and not c.average # # or (task_type == TaskType.AVG and c.average) # ], # elem_id="column-select", # container=False, # ) # shown_columns_dict[task_type.name] = shown_column with gr.Row(): filter_columns_type = gr.CheckboxGroup( label="Model types", choices=[t.to_str() for t in ModelType], value=[t.to_str() for t in ModelType], elem_id="filter-columns-type", ) filter_columns_precision = gr.CheckboxGroup( label="Precision", choices=[i.value.name for i in Precision], value=[i.value.name for i in Precision], elem_id="filter-columns-precision", ) filter_columns_size = gr.CheckboxGroup( label="Model sizes (in billions of parameters)", choices=list(NUMERIC_INTERVALS.keys()), value=list(NUMERIC_INTERVALS.keys()), elem_id="filter-columns-size", ) filter_columns_add_special_tokens = gr.CheckboxGroup( label="Add Special Tokens", choices=[i.value.name for i in AddSpecialTokens], value=[i.value.name for i in AddSpecialTokens], elem_id="filter-columns-add-special-tokens", ) filter_columns_num_few_shots = gr.CheckboxGroup( label="Num Few Shots", choices=[i.value.name for i in NumFewShots], value=[i.value.name for i in NumFewShots], elem_id="filter-columns-num-few-shots", ) filter_columns_version = gr.CheckboxGroup( label="llm-jp-eval version", choices=[i.value.name for i in Version], value=[i.value.name for i in Version], elem_id="filter-columns-version", ) # filter_columns_backend = gr.CheckboxGroup( # label="Backend Library", # choices=[i.value.name for i in Backend], # value=[i.value.name for i in Backend], # elem_id="filter-columns-backend", # ) # DataFrameコンポーネントの初期化 leaderboard_table = gr.Dataframe( value=leaderboard_df_filtered, headers=initial_columns, datatype=TYPES, elem_id="leaderboard-table", interactive=False, visible=True, ) # Dummy leaderboard for handling the case when the user uses backspace key hidden_leaderboard_table_for_search = gr.Dataframe( value=original_df[COLS], headers=COLS, datatype=TYPES, visible=False, ) # Define a hidden component that will trigger a reload only if a query parameter has been set hidden_search_bar = gr.Textbox(value="", visible=False) select_all_button.click( fn=lambda: toggle_all_categories("all"), outputs=checkboxes, api_name=False, queue=False, ) select_none_button.click( fn=lambda: toggle_all_categories("none"), outputs=checkboxes, api_name=False, queue=False, ) select_avg_only_button.click( fn=lambda: toggle_all_categories("avg_only"), outputs=checkboxes, api_name=False, queue=False, ) gr.on( triggers=[ hidden_search_bar.change, filter_columns_type.change, filter_columns_precision.change, filter_columns_size.change, filter_columns_add_special_tokens.change, filter_columns_num_few_shots.change, filter_columns_version.change, # filter_columns_backend.change, search_bar.submit, ] + [shown_columns.change for shown_columns in shown_columns_dict.values()], fn=update_table, inputs=[ hidden_leaderboard_table_for_search, filter_columns_type, filter_columns_precision, filter_columns_size, filter_columns_add_special_tokens, filter_columns_num_few_shots, filter_columns_version, # filter_columns_backend, search_bar, ] + [shown_columns for shown_columns in shown_columns_dict.values()], outputs=leaderboard_table, ) # Check query parameter once at startup and update search bar + hidden component demo_leaderboard.load(fn=load_query, outputs=[search_bar, hidden_search_bar]) # Submission demo with gr.Blocks() as demo_submission: with gr.Column(): with gr.Row(): evaluation_queue_text = gr.Markdown(EVALUATION_QUEUE_TEXT_JA, elem_classes="markdown-text") with gr.Column(): with gr.Accordion( f"✅ Finished Evaluations ({len(finished_eval_queue_df)})", open=False, ): with gr.Row(): finished_eval_table = gr.Dataframe( value=finished_eval_queue_df, headers=EVAL_COLS, datatype=EVAL_TYPES, row_count=5, ) with gr.Accordion( f"🔄 Running Evaluation Queue ({len(running_eval_queue_df)})", open=False, ): with gr.Row(): running_eval_table = gr.Dataframe( value=running_eval_queue_df, headers=EVAL_COLS, datatype=EVAL_TYPES, row_count=5, ) with gr.Accordion( f"⏳ Pending Evaluation Queue ({len(pending_eval_queue_df)})", open=False, ): with gr.Row(): pending_eval_table = gr.Dataframe( value=pending_eval_queue_df, headers=EVAL_COLS, datatype=EVAL_TYPES, row_count=5, ) with gr.Accordion( f"❎ Failed Evaluation Queue ({len(failed_eval_queue_df)})", open=False, ): with gr.Row(): failed_eval_table = gr.Dataframe( value=failed_eval_queue_df, headers=EVAL_COLS, datatype=EVAL_TYPES, row_count=5, ) with gr.Row(): gr.Markdown("# ✉️✨ Submit your model here!", elem_classes="markdown-text") with gr.Row(): with gr.Column(): model_name_textbox = gr.Textbox(label="Model name") revision_name_textbox = gr.Textbox(label="Revision commit", placeholder="main") model_type = gr.Dropdown( label="Model type", choices=[t.to_str(" : ") for t in ModelType if t != ModelType.Unknown], multiselect=False, value=None, ) with gr.Column(): precision = gr.Dropdown( label="Precision", choices=[i.value.name for i in Precision if i != Precision.Unknown], multiselect=False, value="float16", ) add_special_tokens = gr.Dropdown( label="AddSpecialTokens", choices=[i.value.name for i in AddSpecialTokens if i != AddSpecialTokens.Unknown], multiselect=False, value="False", ) submit_button = gr.Button("Submit Eval") submission_result = gr.Markdown() submit_button.click( fn=add_new_eval, inputs=[ model_name_textbox, revision_name_textbox, precision, model_type, add_special_tokens, ], outputs=submission_result, ) # Main demo CITATION_ACCORDION_LABEL = "📙 Citation" CITATION_ACCORDION_LABEL_JA = "📙 引用" def set_default_language(request: gr.Request) -> gr.Radio: if request.headers["Accept-Language"].split(",")[0].lower().startswith("ja"): return gr.Radio(value="🇯🇵 JA") else: return gr.Radio(value="🇺🇸 EN") def update_language( language: str, ) -> tuple[ gr.Markdown, gr.Markdown, gr.Markdown, gr.Textbox, gr.Button, gr.Button, gr.Button, gr.Accordion, ]: if language == "🇯🇵 JA": return ( gr.Markdown(value=INTRODUCTION_TEXT_JA), gr.Markdown(value=LLM_BENCHMARKS_TEXT_JA), gr.Markdown(value=EVALUATION_QUEUE_TEXT_JA), gr.Textbox(label=CITATION_BUTTON_LABEL_JA), gr.Button(value=SELECT_ALL_BUTTON_LABEL_JA), gr.Button(value=SELECT_NONE_BUTTON_LABEL_JA), gr.Button(value=SELECT_AVG_ONLY_BUTTON_LABEL_JA), gr.Accordion(label=CITATION_ACCORDION_LABEL_JA), ) else: return ( gr.Markdown(value=INTRODUCTION_TEXT), gr.Markdown(value=LLM_BENCHMARKS_TEXT), gr.Markdown(value=EVALUATION_QUEUE_TEXT), gr.Textbox(label=CITATION_BUTTON_LABEL), gr.Button(value=SELECT_ALL_BUTTON_LABEL), gr.Button(value=SELECT_NONE_BUTTON_LABEL), gr.Button(value=SELECT_AVG_ONLY_BUTTON_LABEL), gr.Accordion(label=CITATION_ACCORDION_LABEL), ) with gr.Blocks(css_paths="style.css", theme=gr.themes.Glass()) as demo: gr.HTML(TITLE) introduction_text = gr.Markdown(INTRODUCTION_TEXT_JA, elem_classes="markdown-text") with gr.Tabs(elem_classes="tab-buttons") as tabs: with gr.TabItem("🏅 LLM Benchmark", elem_id="llm-benchmark-tab-table", id=0): demo_leaderboard.render() with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=2): llm_benchmarks_text = gr.Markdown(LLM_BENCHMARKS_TEXT_JA, elem_classes="markdown-text") with gr.TabItem("🚀 Submit here! ", elem_id="llm-benchmark-tab-table", id=3): demo_submission.render() with gr.Row(): with gr.Accordion(CITATION_ACCORDION_LABEL_JA, open=False) as citation_accordion: citation_button = gr.Textbox( label=CITATION_BUTTON_LABEL_JA, value=CITATION_BUTTON_TEXT, lines=20, elem_id="citation-button", show_copy_button=True, ) gr.HTML(BOTTOM_LOGO) language = gr.Radio( choices=["🇯🇵 JA", "🇺🇸 EN"], value="🇯🇵 JA", elem_classes="language-selector", show_label=False, container=False, ) demo.load(fn=set_default_language, outputs=language) language.change( fn=update_language, inputs=language, outputs=[ introduction_text, llm_benchmarks_text, evaluation_queue_text, citation_button, select_all_button, select_none_button, select_avg_only_button, citation_accordion, ], api_name=False, ) if __name__ == "__main__": if os.getenv("SPACE_ID"): scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=1800) scheduler.start() demo.queue(default_concurrency_limit=40).launch()