import subprocess import gradio as gr import zipfile import os import shutil import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import snapshot_download, Repository, HfFolder from src.about import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, EVALUATION_QUEUE_TEXT, INTRODUCTION_TEXT, LLM_BENCHMARKS_TEXT, TITLE, ) from src.display.css_html_js import custom_css from src.display.utils import ( BENCHMARK_COLS, COLS, EVAL_COLS, EVAL_TYPES, NUMERIC_INTERVALS, TYPES, AutoEvalColumn, ModelType, fields, WeightType, Precision ) from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND from src.populate import get_evaluation_queue_df, get_leaderboard_df from src.submission.submit import add_new_eval from src.submission.evaluate import calculate_metrics import json def handle_new_eval_submission(model_name, model_zip, model_link=None) -> str: try: # Input validation if not model_name: return "Please enter a model name." if not isinstance(model_name, str): return "Model name must be a string." if len(model_name.split()) > 1: return "Model name should be a single word with hyphens." # Check if the model name is already in the leaderboard if model_name in leaderboard_df[AutoEvalColumn.model.name].values: return "Model name already exists in the leaderboard. Please choose a different name." if model_zip is None: return "Please provide a zip file." extraction_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, model_name) if model_zip is not None: # Check if the zip file is actually a zip file if not zipfile.is_zipfile(model_zip): return "Please upload a valid zip file." # Create extraction path if it doesn't exist os.makedirs(extraction_path, exist_ok=True) # Extract the zip file try: with zipfile.ZipFile(model_zip, 'r') as zip_ref: zip_ref.extractall(extraction_path) except zipfile.BadZipFile: return "The uploaded file is not a valid zip file." except Exception as e: return f"An error occurred while extracting the zip file: {str(e)}" print("File unzipped successfully to:", extraction_path) # Evaluate the model's performance try: calculate_metrics(extraction_path, model_name) except Exception as e: return f"An error occurred while calculating metrics: {str(e)}" # Upload results to repo results_file_path = os.path.join(os.getcwd(), EVAL_RESULTS_PATH, '3d-pope', model_name, 'results.json') if not os.path.exists(results_file_path): return f"Results file not found at {results_file_path}" try: with open(results_file_path, 'r') as f: json.load(f) # Validate JSON structure except json.JSONDecodeError: return "The results file is not a valid JSON file." try: API.upload_file( path_or_fileobj=results_file_path, path_in_repo=os.path.join('3d-pope', model_name, 'results.json'), repo_id=RESULTS_REPO, repo_type="dataset", ) except Exception as e: return f"An error occurred while uploading results: {str(e)}" # Restart the space try: restart_space() except Exception as e: return f"An error occurred while restarting the space: {str(e)}" return "Submission received and results are being processed. Please check the leaderboard for updates." except Exception as e: return f"An unexpected error occurred: {str(e)}" def restart_space(): API.restart_space(repo_id=REPO_ID) try: print(EVAL_REQUESTS_PATH) snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: print(EVAL_RESULTS_PATH) snapshot_download( repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) leaderboard_df = original_df.copy() def custom_format(x): if pd.isna(x): return x # Return as is if NaN try: float_x = float(x) if float_x.is_integer(): return f"{int(float_x)}" else: return f"{float_x:.2f}".rstrip('0').rstrip('.') except ValueError: return x # Return as is if conversion to float fails numeric_cols = [col for col in leaderboard_df.columns if leaderboard_df[col].dtype in ['float64', 'float32']] leaderboard_df[numeric_cols] = leaderboard_df[numeric_cols].applymap(custom_format) ( finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df, ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) # Searching and filtering def update_table( hidden_df: pd.DataFrame, columns: list, # type_query: list, # precision_query: str, # size_query: list, # show_deleted: bool, query: str, ): # filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted) filtered_df = filter_queries(query, hidden_df) df = select_columns(filtered_df, columns) return df def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: return df[(df[AutoEvalColumn.model.name].str.contains(query, case=False))] def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: always_here_cols = [ # AutoEvalColumn.model_type_symbol.name, AutoEvalColumn.model.name, ] # We use COLS to maintain sorting filtered_df = df[ always_here_cols + [c for c in COLS if c in df.columns and c in columns] ] return filtered_df def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame: final_df = [] if query != "": queries = [q.strip() for q in query.split(";")] for _q in queries: _q = _q.strip() if _q != "": temp_filtered_df = search_table(filtered_df, _q) if len(temp_filtered_df) > 0: final_df.append(temp_filtered_df) if len(final_df) > 0: filtered_df = pd.concat(final_df) existing_columns = [col for col in [AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] if col in filtered_df.columns] filtered_df = filtered_df.drop_duplicates(subset=existing_columns) return filtered_df def filter_models( df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool ) -> pd.DataFrame: # Show all models # if show_deleted: # filtered_df = df # else: # Show only still on the hub models # filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True] filtered_df = df type_emoji = [t[0] for t in type_query] # filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)] # filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])] numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query])) params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce") mask = params_column.apply(lambda x: any(numeric_interval.contains(x))) filtered_df = filtered_df.loc[mask] return filtered_df demo = gr.Blocks(css=custom_css) with demo: gr.HTML(TITLE) gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") with gr.Tabs(elem_classes="tab-buttons") as tabs: with gr.TabItem("🏅 3D-POPE Benchmark", elem_id="llm-benchmark-tab-table", id=0): with gr.Row(): with gr.Column(): with gr.Row(): search_bar = gr.Textbox( placeholder=" 🔍 Search for your model (separate multiple queries with `;`) and press ENTER...", show_label=False, elem_id="search-bar", ) with gr.Row(): shown_columns = gr.CheckboxGroup( choices=[ c.name for c in fields(AutoEvalColumn) if not c.hidden and not c.never_hidden ], value=[ c.name for c in fields(AutoEvalColumn) if c.displayed_by_default and not c.hidden and not c.never_hidden ], label="Select columns to show", elem_id="column-select", interactive=True, ) # with gr.Row(): # deleted_models_visibility = gr.Checkbox( # value=False, label="Show gated/private/deleted models", interactive=True # ) # with gr.Column(min_width=320): #with gr.Box(elem_id="box-filter"): leaderboard_table = gr.components.Dataframe( value=leaderboard_df[ [c.name for c in fields(AutoEvalColumn) if c.never_hidden] + shown_columns.value ], headers=[c.name for c in fields(AutoEvalColumn) if c.never_hidden] + shown_columns.value, datatype=TYPES, elem_id="leaderboard-table", interactive=False, visible=True, ) # Dummy leaderboard for handling the case when the user uses backspace key hidden_leaderboard_table_for_search = gr.components.Dataframe( value=original_df[COLS], headers=COLS, datatype=TYPES, visible=False, ) search_bar.submit( update_table, [ hidden_leaderboard_table_for_search, shown_columns, # deleted_models_visibility, search_bar, ], leaderboard_table, ) for selector in [shown_columns]: selector.change( update_table, [ hidden_leaderboard_table_for_search, shown_columns, # deleted_models_visibility, search_bar, ], leaderboard_table, queue=True, ) with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=2): gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") with gr.TabItem("🚀 Submit here! ", elem_id="llm-benchmark-tab-table", id=3): with gr.Column(): with gr.Row(): gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") with gr.Row(): gr.Markdown("# 📋 Submit your results here!", elem_classes="markdown-text") with gr.Row(): model_name_textbox = gr.Textbox(label="Model name") model_zip_file = gr.File(label="Upload model prediction result ZIP file") # model_link_textbox = gr.Textbox(label="Link to model page") with gr.Row(): gr.Column() with gr.Column(scale=2): submit_button = gr.Button("Submit Model") submission_result = gr.Markdown() submit_button.click( handle_new_eval_submission, [model_name_textbox, model_zip_file], submission_result ) gr.Column() with gr.Row(): with gr.Accordion("📙 Citation", open=False): citation_button = gr.Textbox( value=CITATION_BUTTON_TEXT, label=CITATION_BUTTON_LABEL, lines=20, elem_id="citation-button", show_copy_button=True, ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=1800) scheduler.start() demo.queue(default_concurrency_limit=40).launch()