import os import streamlit as st import pandas as pd from constants import BIGOS_INFO, PELCRA_INFO, ANALYSIS_INFO, ABOUT_INFO, INSPECTION_INFO, COMPARISON_INFO from utils import read_latest_results, basic_stats_per_dimension, retrieve_asr_systems_meta_from_the_catalog, box_plot_per_dimension,box_plot_per_dimension_subsets, box_plot_per_dimension_with_colors, get_total_audio_duration, check_impact_of_normalization, calculate_wer_per_meta_category, calculate_wer_per_audio_feature from app_utils import calculate_height_to_display, filter_dataframe import matplotlib.pyplot as plt import numpy as np import statsmodels.api as sm import seaborn as sns hf_token = os.getenv('HF_TOKEN') if hf_token is None: raise ValueError("HF_TOKEN environment variable is not set. Please check your secrets settings.") # Tabs # About (description of the benchmark) - methodology # Leaderboards # Interactive analysis # Acknowledgements # select the dataset to display results datasets_secret = [ "amu-cai/pl-asr-bigos-v2-secret", "pelcra/pl-asr-pelcra-for-bigos-secret"] datasets_public = [] #["amu-cai/pl-asr-bigos-synth-med"] #amu-cai/pl-asr-bigos-v2-diagnostic" st.set_page_config(layout="wide") about, lead_bigos, lead_pelcra, analysis, interactive_comparison = st.tabs(["About", "ASR Leaderboard - BIGOS corpora", "ASR Leaderboard - PELCRA corpora", "ASR evaluation scenarios", "Interactive comparison of ASR accuracy"]) # "Results inspection""Results inspection" # inspection # acknowledgements, changelog, faq, todos = st.columns(4) #lead_bigos_diagnostic, lead_bigos_synth cols_to_select_all = ["system", "subset", "ref_type", "norm_type", "SER", "MER", "WER", "CER"] def plot_performance(systems_to_plot, df_per_system_with_type): # Get unique subsets subsets = df_per_system_with_type['subset'].unique() # Create a color and label map color_label_map = { free_system_with_best_wer: ('blue', 'Best Free'), free_system_with_worst_wer: ('red', 'Worst Free'), commercial_system_with_best_wer: ('green', 'Best Paid'), commercial_system_with_worst_wer: ('orange', 'Worst Paid') } # Plot the data fig, ax = plt.subplots(figsize=(14, 7)) bar_width = 0.3 index = np.arange(len(subsets)) for i, system in enumerate(systems_to_plot): subset_wer = df_per_system_with_type[df_per_system_with_type['system'] == system].set_index('subset')['WER'] color, label = color_label_map[system] ax.bar(index + i * bar_width, subset_wer.loc[subsets], bar_width, label=label + ' - ' + system, color=color) # Adding labels and title ax.set_xlabel('Subset') ax.set_ylabel('WER (%)') ax.set_title('Comparison of performance of ASR systems.') ax.set_xticks(index + bar_width * 1.5) ax.set_xticklabels(subsets, rotation=90, ha='right') ax.legend() st.pyplot(fig) def round_to_nearest(value, multiple): return multiple * round(value / multiple) def create_bar_chart(df, systems, metric, norm_type, ref_type='orig', orientation='vertical'): df = df[df['norm_type'] == norm_type] df = df[df['ref_type'] == ref_type] # Prepare the data for the bar chart subsets = df['subset'].unique() num_vars = len(subsets) bar_width = 0.2 # Width of the bars fig, ax = plt.subplots(figsize=(10, 10)) max_value_all_systems = 0 for i, system in enumerate(systems): system_data = df[df['system'] == system] max_value_for_system = max(system_data[metric]) if max_value_for_system > max_value_all_systems: max_value_all_systems = round_to_nearest(max_value_for_system + 2, 10) # Ensure the system data is in the same order as subsets values = [] for subset in subsets: subset_value = system_data[system_data['subset'] == subset][metric].values if len(subset_value) > 0: values.append(subset_value[0]) else: values.append(0) # Append 0 if the subset value is missing if orientation == 'vertical': # Plot each system's bars with an offset for vertical orientation x_pos = np.arange(len(subsets)) + i * bar_width ax.bar(x_pos, values, bar_width, label=system) # Add value labels for j, value in enumerate(values): ax.text(x_pos[j], value + max(values) * 0.03, f'{value}', ha='center', va='bottom',fontsize=6) else: # Plot each system's bars with an offset for horizontal orientation y_pos = np.arange(len(subsets)) + i * bar_width ax.barh(y_pos, values, bar_width, label=system) # Add value labels for j, value in enumerate(values): ax.text(value + max(values) * 0.03, y_pos[j], f'{value}', ha='left', va='center', fontsize=6) if orientation == 'vertical': ax.set_xticks(np.arange(len(subsets)) + bar_width * (len(systems) - 1) / 2) ax.set_xticklabels(subsets, rotation=45, ha='right') ax.set_ylabel(metric) else: ax.set_yticks(np.arange(len(subsets)) + bar_width * (len(systems) - 1) / 2) ax.set_yticklabels(subsets) ax.set_xlabel(metric) # Add grid values for the vertical and horizontal bar plots if orientation == 'vertical': ax.set_yticks(np.linspace(0, max_value_all_systems, 5)) else: ax.set_xticks(np.linspace(0, max_value_all_systems, 5)) # Put legend on the right side outside of the plot plt.legend(loc='upper right', bbox_to_anchor=(1.2, 1), shadow=True, ncol=1) st.pyplot(fig) def create_radar_plot(df, enable_labels, systems, metric, norm_type, ref_type='orig'): df = df[df['norm_type'] == norm_type] df = df[df['ref_type'] == ref_type] # Prepare the data for the radar plot #systems = df['system'].unique() subsets = df['subset'].unique() num_vars = len(subsets) angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() angles += angles[:1] # Complete the loop fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(polar=True)) max_value_all_systems = 0 for system in systems: system_data = df[df['system'] == system] max_value_for_system = max(system_data[metric]) if max_value_for_system > max_value_all_systems: max_value_all_systems = round_to_nearest(max_value_for_system + 2, 10) # Ensure the system data is in the same order as subsets values = [] for subset in subsets: subset_value = system_data[system_data['subset'] == subset][metric].values if len(subset_value) > 0: values.append(subset_value[0]) else: values.append(0) # Append 0 if the subset value is missing values += values[:1] # Complete the loop # Plot each system ax.plot(angles, values, label=system) ax.fill(angles, values, alpha=0.25) # Add value labels for angle, value in zip(angles, values): ax.text(angle, value + max(values) * 0.01, f'{value}', ha='center', va='center', fontsize=6) ax.set_xticklabels(subsets) ax.set_yticks(np.linspace(0, max_value_all_systems, 5)) # put legend at the bottom of the page plt.legend(loc='upper right', bbox_to_anchor=(1.2, 1), shadow=True, ncol=1) st.pyplot(fig) with about: st.title("AMU Polish ASR Leaderboard") st.markdown(ABOUT_INFO, unsafe_allow_html=True) # Table - evaluated systems # TODO - change to concatenated table dataset = "amu-cai/pl-asr-bigos-v2-secret" split = "test" df_per_sample, df_per_dataset = read_latest_results(dataset, split, codename_to_shortname_mapping=None) evaluated_systems_list = df_per_sample["system"].unique() #print("ASR systems available in the eval results for dataset {}: ".format(dataset), evaluated_systems_list ) df_evaluated_systems = retrieve_asr_systems_meta_from_the_catalog(evaluated_systems_list) # drop columns "Included in BIGOS benchmark" df_evaluated_systems = df_evaluated_systems.drop(columns=["Included in BIGOS benchmark"]) # drop empty rows df_evaluated_systems = df_evaluated_systems.dropna(how='all') # drop empty columns df_evaluated_systems = df_evaluated_systems.dropna(axis=1, how='all') codename_to_shortname_mapping = dict(zip(df_evaluated_systems["Codename"],df_evaluated_systems["Shortname"])) #print(codename_to_shortname_mapping) h_df_systems = calculate_height_to_display(df_evaluated_systems) df_evaluated_systems_types_and_count = df_evaluated_systems["Type"].value_counts().reset_index() df_evaluated_systems_types_and_count.columns = ["Type", "Count"] st.subheader("Evaluated systems:") st.dataframe(df_evaluated_systems_types_and_count, hide_index=True, use_container_width=False) #TODO - add info who created the system (company, institution, team, etc.) # Split into separate tables for free and commercial systems free_systems = df_evaluated_systems[df_evaluated_systems['Type'] == 'free'] commercial_systems = df_evaluated_systems[df_evaluated_systems['Type'] == 'commercial'] st.subheader("Free systems:") # drop empty columns free_systems = free_systems.dropna(axis=1, how='all') # drop empty rows free_systems = free_systems.dropna(how='all') # do not display index st.dataframe(free_systems, hide_index=True, height = h_df_systems, use_container_width=True) st.subheader("Commercial systems:") # drop empty columns commercial_systems = commercial_systems.dropna(axis=1, how='all') # do not display index # drop empty rows commercial_systems = commercial_systems.dropna(how='all') st.dataframe(commercial_systems, hide_index=True, height = h_df_systems, use_container_width=True) # Table - evaluation datasets # Table - evaluation metrics # Table - evaluation metadata # List - references # List - contact points # List - acknowledgements # List - changelog # List - FAQ # List - TODOs with lead_bigos: st.title("BIGOS Leaderboard") st.markdown(BIGOS_INFO, unsafe_allow_html=True) # configuration for tab dataset = "amu-cai/pl-asr-bigos-v2-secret" dataset_short_name = "BIGOS" dataset_version = "V2" eval_date = "March 2024" split = "test" norm_type = "all" ref_type = "orig" # common, reusable part for all tabs presenting leaderboards for specific datasets #### DATA LOADING AND AUGMENTATION #### df_per_sample_all, df_per_dataset_all = read_latest_results(dataset, split, codename_to_shortname_mapping) # filter only the ref_type and norm_type we want to analyze df_per_sample = df_per_sample_all[(df_per_sample_all["ref_type"] == ref_type) & (df_per_sample_all["norm_type"] == norm_type)] # filter only the ref_type and norm_type we want to analyze df_per_dataset = df_per_dataset_all[(df_per_dataset_all["ref_type"] == ref_type) & (df_per_dataset_all["norm_type"] == norm_type)] ##### PARAMETERS CALCULATION #### evaluated_systems_list = df_per_sample["system"].unique() no_of_evaluated_systems = len(evaluated_systems_list) no_of_eval_subsets = len(df_per_dataset["subset"].unique()) no_of_test_cases = len(df_per_sample) no_of_unique_recordings = len(df_per_sample["id"].unique()) total_audio_duration_hours = get_total_audio_duration(df_per_sample) no_of_unique_speakers = len(df_per_sample["speaker_id"].unique()) df_per_dataset_with_asr_systems_meta = pd.merge(df_per_dataset, df_evaluated_systems, how="left", left_on="system", right_on="Shortname") print(df_per_dataset_with_asr_systems_meta.sample(5)) # save sample to tsv df_per_dataset_with_asr_systems_meta.sample(5).to_csv("sample.tsv", sep="\t", index=False) ########### EVALUATION PARAMETERS PRESENTATION ################ st.title("ASR leaderboard for dataset: {} {}".format(dataset_short_name, dataset_version)) # MOST IMPORTANT RESULTS analysis_dim = "system" metric = "WER" st.subheader("Leaderboard - Median {} per ASR {} across all subsets of {} dataset".format(metric, analysis_dim, dataset_short_name)) fig = box_plot_per_dimension_with_colors(df_per_dataset_with_asr_systems_meta, metric, analysis_dim, "{} per {} for dataset {}".format(metric, analysis_dim, dataset_short_name), analysis_dim, metric + "[%]","System", "Type") st.pyplot(fig, clear_figure=True, use_container_width=True) st.header("Benchmark details") st.markdown("**Evaluation date:** {}".format(eval_date)) st.markdown("**Number of evaluated system-model variants:** {}".format(no_of_evaluated_systems)) st.markdown("**Number of evaluated subsets:** {}".format(no_of_eval_subsets)) st.markdown("**Number of evaluated system-model-subsets combinations**: {}".format(len(df_per_dataset))) st.markdown("**Number of unique speakers**: {}".format(no_of_unique_speakers)) st.markdown("**Number of unique recordings used for evaluation:** {}".format(no_of_unique_recordings)) st.markdown("**Total size of the dataset:** {:.2f} hours".format(total_audio_duration_hours)) st.markdown("**Total number of test cases (audio-hypothesis pairs):** {}".format(no_of_test_cases)) st.markdown("**Dataset:** {}".format(dataset)) st.markdown("**Dataset version:** {}".format(dataset_version)) st.markdown("**Split:** {}".format(split)) st.markdown("**Text reference type:** {}".format(ref_type)) st.markdown("**Normalization steps:** {}".format(norm_type)) ########### RESULTS ################ st.header("WER (Word Error Rate) analysis") st.subheader("Average WER for the whole dataset") df_wer_avg = basic_stats_per_dimension(df_per_dataset, "WER", "dataset") st.dataframe(df_wer_avg) st.subheader("Comparison of average WER for free and commercial systems") df_wer_avg_free_commercial = basic_stats_per_dimension(df_per_dataset_with_asr_systems_meta, "WER", "Type") st.dataframe(df_wer_avg_free_commercial) ##################### PER SYSTEM ANALYSIS ######################### analysis_dim = "system" metric = "WER" st.subheader("Table showing {} per {} sorted by median values".format(metric, analysis_dim)) df_wer_per_system_from_per_dataset = basic_stats_per_dimension(df_per_dataset, metric, analysis_dim) h_df_per_system_per_dataset = calculate_height_to_display(df_wer_per_system_from_per_dataset) st.dataframe(df_wer_per_system_from_per_dataset, height = h_df_per_system_per_dataset ) ##################### PER SUBSET ANALYSIS ######################### analysis_dim = "subset" metric = "WER" st.subheader("Table showing {} per {} sorted by median values".format(metric, analysis_dim)) df_wer_per_system_from_per_dataset = basic_stats_per_dimension(df_per_dataset, metric, analysis_dim) h_df_per_system_per_dataset = calculate_height_to_display(df_wer_per_system_from_per_dataset) st.dataframe(df_wer_per_system_from_per_dataset, height = h_df_per_system_per_dataset ) st.subheader("Boxplot showing {} per {} sorted by median values".format(metric, analysis_dim)) fig = box_plot_per_dimension_subsets(df_per_dataset, metric, analysis_dim, "{} per {} for dataset {}".format(metric, analysis_dim, dataset_short_name), analysis_dim +' of dataset ' + dataset_short_name , metric + " (%)", "system") st.pyplot(fig, clear_figure=True, use_container_width=True) ### IMPACT OF NORMALIZATION ON ERROR RATES ##### # Calculate the average impact of various norm_types for all datasets and systems df_per_dataset_selected_cols = df_per_dataset_all[cols_to_select_all] diff_in_metrics = check_impact_of_normalization(df_per_dataset_selected_cols) st.subheader("Impact of normalization of references and hypothesis on evaluation metrics") st.dataframe(diff_in_metrics, use_container_width=False) # Visualizing the differences in metrics graphically with data labels # Visualizing the differences in metrics graphically with data labels fig, axs = plt.subplots(3, 2, figsize=(12, 12)) fig.subplots_adjust(hspace=0.6, wspace=0.6) #remove the sixth subplot fig.delaxes(axs[2,1]) metrics = ['SER', 'WER', 'MER', 'CER', "Average"] colors = ['blue', 'orange', 'green', 'red', 'purple'] for ax, metric, color in zip(axs.flatten(), metrics, colors): bars = ax.bar(diff_in_metrics.index, diff_in_metrics[metric], color=color) ax.set_title(f'Normalization impact on {metric}') if metric == 'Average': ax.set_title('Average normalization impact on all metrics') ax.set_xlabel('Normalization Type') ax.set_ylabel(f'Difference in {metric}') ax.grid(True) ax.set_xticklabels(diff_in_metrics.index, rotation=45, ha='right') min_val = diff_in_metrics[metric].min() ax.set_ylim([min_val * 1.1, diff_in_metrics[metric].max() * 1.1]) for bar in bars: height = bar.get_height() ax.annotate(f'{height:.2f}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, -12), # 3 points vertical offset textcoords="offset points", ha='center', va='bottom') # Display the plot in Streamlit st.pyplot(fig) ##################### APPENDIX ######################### st.header("Appendix - Full evaluation results per subset for all evaluated systems") # select only the columns we want to plot st.dataframe(df_per_dataset_selected_cols, hide_index=True, use_container_width=False) with lead_pelcra: st.title("PELCRA Leaderboard") st.markdown(PELCRA_INFO, unsafe_allow_html=True) # configuration for tab dataset = "pelcra/pl-asr-pelcra-for-bigos-secret" dataset_short_name = "PELCRA" dataset_version = "V1" eval_date = "March 2024" split = "test" norm_type = "all" ref_type = "orig" # common, reusable part for all tabs presenting leaderboards for specific datasets #### DATA LOADING AND AUGMENTATION #### df_per_sample_all, df_per_dataset_all = read_latest_results(dataset, split, codename_to_shortname_mapping) # filter only the ref_type and norm_type we want to analyze df_per_sample = df_per_sample_all[(df_per_sample_all["ref_type"] == ref_type) & (df_per_sample_all["norm_type"] == norm_type)] # filter only the ref_type and norm_type we want to analyze df_per_dataset = df_per_dataset_all[(df_per_dataset_all["ref_type"] == ref_type) & (df_per_dataset_all["norm_type"] == norm_type)] ##### PARAMETERS CALCULATION #### evaluated_systems_list = df_per_sample["system"].unique() no_of_evaluated_systems = len(evaluated_systems_list) no_of_eval_subsets = len(df_per_dataset["subset"].unique()) no_of_test_cases = len(df_per_sample) no_of_unique_recordings = len(df_per_sample["id"].unique()) total_audio_duration_hours = get_total_audio_duration(df_per_sample) no_of_unique_speakers = len(df_per_sample["speaker_id"].unique()) df_per_dataset_with_asr_systems_meta = pd.merge(df_per_dataset, df_evaluated_systems, how="left", left_on="system", right_on="Shortname") # MOST IMPORTANT RESULTS analysis_dim = "system" metric = "WER" st.subheader("Leaderboard - Median {} per ASR {} across all subsets of {} dataset".format(metric, analysis_dim, dataset_short_name)) fig = box_plot_per_dimension_with_colors(df_per_dataset_with_asr_systems_meta, metric, analysis_dim, "{} per {}".format(metric, analysis_dim), analysis_dim, metric + "[%]","System", "Type") st.pyplot(fig, clear_figure=True, use_container_width=True) st.header("Benchmark details") st.markdown("**Evaluation date:** {}".format(eval_date)) st.markdown("**Number of evaluated system-model variants:** {}".format(no_of_evaluated_systems)) st.markdown("**Number of evaluated subsets:** {}".format(no_of_eval_subsets)) st.markdown("**Number of evaluated system-model-subsets combinations**: {}".format(len(df_per_dataset))) st.markdown("**Number of unique speakers**: {}".format(no_of_unique_speakers)) st.markdown("**Number of unique recordings used for evaluation:** {}".format(no_of_unique_recordings)) st.markdown("**Total size of the dataset:** {:.2f} hours".format(total_audio_duration_hours)) st.markdown("**Total number of test cases (audio-hypothesis pairs):** {}".format(no_of_test_cases)) st.markdown("**Dataset:** {}".format(dataset)) st.markdown("**Dataset version:** {}".format(dataset_version)) st.markdown("**Split:** {}".format(split)) st.markdown("**Text reference type:** {}".format(ref_type)) st.markdown("**Normalization steps:** {}".format(norm_type)) ########### RESULTS ################ st.header("WER (Word Error Rate) analysis") st.subheader("Average WER for the whole dataset") df_wer_avg = basic_stats_per_dimension(df_per_dataset, "WER", "dataset") st.dataframe(df_wer_avg) st.subheader("Comparison of average WER for free and commercial systems") df_wer_avg_free_commercial = basic_stats_per_dimension(df_per_dataset_with_asr_systems_meta, "WER", "Type") st.dataframe(df_wer_avg_free_commercial) ##################### PER SYSTEM ANALYSIS ######################### analysis_dim = "system" metric = "WER" st.subheader("Table showing {} per {} sorted by median values".format(metric, analysis_dim)) df_wer_per_system_from_per_dataset = basic_stats_per_dimension(df_per_dataset, metric, analysis_dim) h_df_per_system_per_dataset = calculate_height_to_display(df_wer_per_system_from_per_dataset) st.dataframe(df_wer_per_system_from_per_dataset, height = h_df_per_system_per_dataset ) ##################### PER SUBSET ANALYSIS ######################### analysis_dim = "subset" metric = "WER" st.subheader("Table showing {} per {} sorted by median values".format(metric, analysis_dim)) df_wer_per_system_from_per_dataset = basic_stats_per_dimension(df_per_dataset, metric, analysis_dim) h_df_per_system_per_dataset = calculate_height_to_display(df_wer_per_system_from_per_dataset) st.dataframe(df_wer_per_system_from_per_dataset, height = h_df_per_system_per_dataset ) st.subheader("Boxplot showing {} per {} sorted by median values".format(metric, analysis_dim)) fig = box_plot_per_dimension_subsets(df_per_dataset, metric, analysis_dim, "{} per {} for dataset {}".format(metric, analysis_dim, dataset_short_name), analysis_dim +' of dataset ' + dataset_short_name , metric + " (%)", "system") st.pyplot(fig, clear_figure=True, use_container_width=True) ### IMPACT OF NORMALIZATION ON ERROR RATES ##### # Calculate the average impact of various norm_types for all datasets and systems df_per_dataset_selected_cols = df_per_dataset_all[cols_to_select_all] diff_in_metrics = check_impact_of_normalization(df_per_dataset_selected_cols) st.subheader("Impact of normalization on WER") st.dataframe(diff_in_metrics, use_container_width=False) # Visualizing the differences in metrics graphically with data labels # Visualizing the differences in metrics graphically with data labels fig, axs = plt.subplots(3, 2, figsize=(12, 12)) fig.subplots_adjust(hspace=0.6, wspace=0.6) #remove the sixth subplot fig.delaxes(axs[2,1]) metrics = ['SER', 'WER', 'MER', 'CER', "Average"] colors = ['blue', 'orange', 'green', 'red', 'purple'] for ax, metric, color in zip(axs.flatten(), metrics, colors): bars = ax.bar(diff_in_metrics.index, diff_in_metrics[metric], color=color) ax.set_title(f'Normalization impact on {metric}') if metric == 'Average': ax.set_title('Average normalization impact on all metrics') ax.set_xlabel('Normalization Type') ax.set_ylabel(f'Difference in {metric}') ax.grid(True) ax.set_xticklabels(diff_in_metrics.index, rotation=45, ha='right') min_val = diff_in_metrics[metric].min() ax.set_ylim([min_val * 1.1, diff_in_metrics[metric].max() * 1.1]) for bar in bars: height = bar.get_height() ax.annotate(f'{height:.2f}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, -12), # 3 points vertical offset textcoords="offset points", ha='center', va='bottom') # Display the plot in Streamlit st.pyplot(fig) ##################### APPENDIX ######################### st.header("Appendix - Full evaluation results per subset for all evaluated systems") # select only the columns we want to plot df_per_dataset_selected_cols = df_per_dataset_all[cols_to_select_all] st.dataframe(df_per_dataset_selected_cols, hide_index=True, use_container_width=False) with analysis: datasets = datasets_secret + datasets_public dataset = st.selectbox("Select Dataset", datasets, index=datasets.index('amu-cai/pl-asr-bigos-v2-secret'), key="select_dataset_scenarios") if dataset == "amu-cai/pl-asr-bigos-v2-secret": dataset_short_name = "BIGOS" elif dataset == "pelcra/pl-asr-pelcra-for-bigos-secret": dataset_short_name = "PELCRA" else: dataset_short_name = "UNKNOWN" # read the latest results for the selected dataset print("Reading the latest results for dataset: ", dataset) df_per_sample_all, df_per_dataset_all = read_latest_results(dataset, split, codename_to_shortname_mapping) # filter only the ref_type and norm_type we want to analyze df_per_sample = df_per_sample_all[(df_per_sample_all["ref_type"] == ref_type) & (df_per_sample_all["norm_type"] == norm_type)] # filter only the ref_type and norm_type we want to analyze df_per_dataset = df_per_dataset_all[(df_per_dataset_all["ref_type"] == ref_type) & (df_per_dataset_all["norm_type"] == norm_type)] evaluated_systems_list = df_per_sample["system"].unique() print(evaluated_systems_list) df_evaluated_systems = retrieve_asr_systems_meta_from_the_catalog(evaluated_systems_list) print(df_evaluated_systems) ##### ANALYSIS - COMMERCIAL VS FREE SYSTEMS ##### # Generate dataframe with columns as follows System Type Subset Avg_WER df_per_dataset_with_asr_systems_meta = pd.merge(df_per_dataset, df_evaluated_systems, how="left", left_on="system", right_on="Shortname") df_wer_avg_per_system_all_subsets_with_type = df_per_dataset_with_asr_systems_meta.groupby(['system', 'Type', 'subset'])['WER'].mean().reset_index() print(df_wer_avg_per_system_all_subsets_with_type) # Select the best and worse system for free and commercial systems free_systems = df_wer_avg_per_system_all_subsets_with_type[df_wer_avg_per_system_all_subsets_with_type['Type'] == 'free']['system'].unique() commercial_systems = df_wer_avg_per_system_all_subsets_with_type[df_wer_avg_per_system_all_subsets_with_type['Type'] == 'commercial']['system'].unique() free_system_with_best_wer = df_wer_avg_per_system_all_subsets_with_type[df_wer_avg_per_system_all_subsets_with_type['system'].isin(free_systems)].groupby('system')['WER'].mean().idxmin() free_system_with_worst_wer = df_wer_avg_per_system_all_subsets_with_type[df_wer_avg_per_system_all_subsets_with_type['system'].isin(free_systems)].groupby('system')['WER'].mean().idxmax() commercial_system_with_best_wer = df_wer_avg_per_system_all_subsets_with_type[df_wer_avg_per_system_all_subsets_with_type['system'].isin(commercial_systems)].groupby('system')['WER'].mean().idxmin() commercial_system_with_worst_wer = df_wer_avg_per_system_all_subsets_with_type[df_wer_avg_per_system_all_subsets_with_type['system'].isin(commercial_systems)].groupby('system')['WER'].mean().idxmax() #print(f"Best free system: {free_system_with_best_wer}") #print(f"Worst free system: {free_system_with_worst_wer}") #print(f"Best commercial system: {commercial_system_with_best_wer}") #print(f"Worst commercial system: {commercial_system_with_worst_wer}") st.subheader("Comparison of WER for free and commercial systems") # Best and worst system for free and commercial systems - print table header = ["Type", "Best System", "Worst System"] data = [ ["Free", free_system_with_best_wer, free_system_with_worst_wer], ["Commercial", commercial_system_with_best_wer, commercial_system_with_worst_wer] ] st.subheader("Best and worst systems for dataset {}".format(dataset)) df_best_worse_systems = pd.DataFrame(data, columns=header) # do not display index st.dataframe(df_best_worse_systems, hide_index=True) st.subheader("Comparison of average WER for best systems") df_per_dataset_best_systems = df_per_dataset_with_asr_systems_meta[df_per_dataset_with_asr_systems_meta['system'].isin([free_system_with_best_wer, commercial_system_with_best_wer])] df_wer_avg_best_free_commercial = basic_stats_per_dimension(df_per_dataset_best_systems, "WER", "Type") st.dataframe(df_wer_avg_best_free_commercial) # Create lookup table to get system type based on its name #system_type_lookup = dict(zip(df_wer_avg_per_system_all_subsets_with_type['system'], df_wer_avg_per_system_all_subsets_with_type['Type'])) systems_to_plot_best= [free_system_with_best_wer, commercial_system_with_best_wer] plot_performance(systems_to_plot_best, df_wer_avg_per_system_all_subsets_with_type) st.subheader("Comparison of average WER for the worst systems") df_per_dataset_worst_systems = df_per_dataset_with_asr_systems_meta[df_per_dataset_with_asr_systems_meta['system'].isin([free_system_with_worst_wer, commercial_system_with_worst_wer])] df_wer_avg_worst_free_commercial = basic_stats_per_dimension(df_per_dataset_worst_systems, "WER", "Type") st.dataframe(df_wer_avg_worst_free_commercial) systems_to_plot_worst=[free_system_with_worst_wer, commercial_system_with_worst_wer] plot_performance(systems_to_plot_worst, df_wer_avg_per_system_all_subsets_with_type) # WER in function of model size st.subheader("WER in function of model size for dataset {}".format(dataset)) # select only free systems for the analysis from df_wer_avg_per_system_all_subsets_with_type dataframe free_systems_wer_per_subset = df_per_dataset_with_asr_systems_meta.groupby(['system', 'Parameters [M]', 'subset'])['WER'].mean().reset_index() # sort by model size # change column type Parameters [M] to integer free_systems_wer_per_subset['Parameters [M]'] = free_systems_wer_per_subset['Parameters [M]'].astype(int) free_systems_wer_per_subset = free_systems_wer_per_subset.sort_values(by='Parameters [M]') free_systems_wer_average_across_all_subsets = free_systems_wer_per_subset.groupby(['system', 'Parameters [M]'])['WER'].mean().reset_index() # change column type Parameters [M] to integer free_systems_wer_average_across_all_subsets['Parameters [M]'] = free_systems_wer_average_across_all_subsets['Parameters [M]'].astype(int) # sort by model size free_systems_wer_average_across_all_subsets = free_systems_wer_average_across_all_subsets.sort_values(by='Parameters [M]') free_systems_wer = free_systems_wer_average_across_all_subsets # use system name as index free_systems_wer_to_show = free_systems_wer.set_index('system') # sort by WER and round WER by value to 2 decimal places free_systems_wer_to_show = free_systems_wer_to_show.sort_values(by='WER').round({'WER': 2}) # print dataframe in streamlit with average WER, system name and model size st.dataframe(free_systems_wer_to_show) # plot scatter plot with values of WER # X axis is the model size (parameters [M]) # Y is thw average WER # make each point a different color # provide legend with system names fig, ax = plt.subplots(figsize=(10, 7)) # Define larger jitter for close points jitter_x = 5 jitter_y = 0.2 # Alternate marker shapes to distinguish overlapping points marker_styles = ['o', 's', 'D', '^', 'v', '<', '>'] # Circle, square, diamond, and other shapes marker_dict = {system: marker_styles[i % len(marker_styles)] for i, system in enumerate(free_systems_wer['system'].unique())} for system in free_systems_wer['system'].unique(): subset = free_systems_wer[free_systems_wer['system'] == system] marker_style = marker_dict[system] # Scatter plot with distinct marker shapes for each system ax.scatter( subset['Parameters [M]'] + jitter_x * (np.random.rand(len(subset)) - 0.5), # Apply jitter to x for overlap subset['WER'] + jitter_y * (np.random.rand(len(subset)) - 0.5), # Apply jitter to y for overlap label=system, s=100, alpha=0.7, edgecolor='black', marker=marker_style ) # Add text annotations with dynamic positioning to avoid overlap with y-axis for i, point in subset.iterrows(): # Adjust position to avoid overlap with y-axis x_offset = 10 if point['Parameters [M]'] < 50 else -10 if i % 2 == 1 else 10 # Push right if close to y-axis y_offset = -0.5 if i % 2 == 0 else 0.5 # Alternate vertical offset ax.annotate( point['system'], (point['Parameters [M]'], point['WER']), textcoords="offset points", xytext=(x_offset, y_offset), ha='right' if x_offset < 0 else 'left', fontsize=10, bbox=dict(boxstyle="round,pad=0.3", edgecolor='white', facecolor='white', alpha=0.7) ) # Set axis labels and title ax.set_xlabel('Model Size [M Parameters]', fontsize=12) ax.set_ylabel('WER (%)', fontsize=12) ax.set_title(f'WER vs. Model Size for Dataset {dataset_short_name}', fontsize=14, pad=20) # Adjust legend settings to fit outside the main plot area ax.legend( title='System', bbox_to_anchor=(0.8, 1), loc='upper left', fontsize=8, title_fontsize=9, frameon=True, shadow=False, facecolor='white') #) # Add grid lines and minor ticks for better readability ax.grid(True, linestyle='--', alpha=0.5) ax.minorticks_on() ax.tick_params(which='both', direction='in', top=True, right=True) # increase granularity of y-axis to 20 points per whole range # Set y-axis limits: lower bound at 0, upper bound to next highest multiple of 5 y_min = 0 y_max = ax.get_ylim()[1] # Get the current maximum y value y_max_rounded = np.ceil(y_max / 5) * 5 # Round y_max up to the next highest multiple of 5 ax.set_ylim(y_min, y_max_rounded) # Improve layout spacing plt.tight_layout() # Display the plot st.pyplot(fig) ################################################################################################################################################## # WER per audio duration # calculate average WER per audio duration bucket for the best and worse commercial and free systems selected_systems = [free_system_with_best_wer, commercial_system_with_best_wer] # filter out results for selected systems df_per_sample_selected_systems = df_per_sample[df_per_sample['system'].isin(selected_systems)] # calculate average WER per audio duration for the best system # add column with audio duration in seconds rounded to nearest integer value. audio_duration_buckets = [1,2,3,4,5,10,15,20,30,40,50,60] # map audio duration to the closest bucket df_per_sample_selected_systems['audio_duration_buckets'] = df_per_sample_selected_systems['audio_duration'].apply(lambda x: min(audio_duration_buckets, key=lambda y: abs(x-y))) # calculate average WER per audio duration bucket df_per_sample_wer_audio = df_per_sample_selected_systems.groupby(['system', 'audio_duration_buckets'])['WER'].mean().reset_index() # add column with number of samples for specific audio bucket size df_per_sample_wer_audio['number_of_samples'] = df_per_sample_selected_systems.groupby(['system', 'audio_duration_buckets'])['WER'].count().values df_per_sample_wer_audio = df_per_sample_wer_audio.sort_values(by='audio_duration_buckets') # round values in WER column in df_per_sample_wer to 2 decimal places df_per_sample_wer_audio['WER'].round(2) # transform df_per_sample_wer. Use system values as columns, while audio_duration_buckets as main index df_per_sample_wer_audio_pivot = df_per_sample_wer_audio.pivot(index='audio_duration_buckets', columns='system', values='WER') df_per_sample_wer_audio_pivot = df_per_sample_wer_audio_pivot.round(2) df_per_sample_wer_audio_pivot['number_of_samples'] = df_per_sample_wer_audio[df_per_sample_wer_audio['system']==free_system_with_best_wer].groupby('audio_duration_buckets')['number_of_samples'].sum().values # put number_of_samples as the first column after index df_per_sample_wer_audio_pivot = df_per_sample_wer_audio_pivot[['number_of_samples'] + [col for col in df_per_sample_wer_audio_pivot.columns if col != 'number_of_samples']] # print dataframe in streamlit st.dataframe(df_per_sample_wer_audio_pivot) # create scatter plot with WER in function of audio duration fig, ax = plt.subplots() for system in selected_systems: subset = df_per_sample_wer_audio[df_per_sample_wer_audio['system'] == system] ax.scatter(subset['audio_duration_buckets'], subset['WER'], label=system, s=subset['number_of_samples']*0.5) ax.set_xlabel('Audio Duration [s]') ax.set_ylabel('WER (%)') ax.set_title('WER in function of audio duration.') # place legend outside the plot on the right ax.legend(title='System', bbox_to_anchor=(1.05, 1), loc='upper left') st.pyplot(fig) ################################################################################################################################################## # WER per speech rate # speech rate chars unique values audio_feature_to_analyze = 'speech_rate_words' audio_feature_unit = ' [words/s]' metric = 'WER' metric_unit = ' (%)' no_of_buckets = 10 # calculate average WER per audio duration bucket for the best and worse commercial and free systems selected_systems = [free_system_with_best_wer, commercial_system_with_best_wer] df_per_sample_wer_feature_pivot, df_per_sample_wer_feature = calculate_wer_per_audio_feature(df_per_sample, selected_systems, audio_feature_to_analyze, metric, no_of_buckets) # print dataframe in streamlit st.dataframe(df_per_sample_wer_feature_pivot) # Set a threshold to remove outliers - here we use the 97th percentile of WER threshold = df_per_sample_wer_feature[metric].quantile(0.97) # Remove data points with WER greater than the threshold filtered_df = df_per_sample_wer_feature[df_per_sample_wer_feature[metric] <= threshold] # Create figure and axis with larger size fig, ax = plt.subplots(figsize=(10, 7)) # Scatter plot for each system for system in selected_systems: subset = filtered_df[filtered_df['system'] == system] ax.scatter(subset[audio_feature_to_analyze], subset[metric], label=system, s=subset['number_of_samples'] * 0.5, alpha=0.6) # Set alpha for better visibility of overlapping points # Adding a trend line using LOWESS lowess = sm.nonparametric.lowess trend = lowess(subset[metric], subset[audio_feature_to_analyze], frac=0.3) # Adjust frac to control smoothing ax.plot(trend[:, 0], trend[:, 1], label=f'{system} Trend', linestyle='-', linewidth=2) # Set axis labels with improved formatting for readability ax.set_xlabel(audio_feature_to_analyze.replace('_', ' ').capitalize() + ' ' + audio_feature_unit ) ax.set_ylabel(metric + ' ' + metric_unit ) # Set an improved title that is more informative ax.set_title('Word Error Rate (WER) vs Speech Rate\nBest Performing Free and Paid Systems', fontsize=14) # increase granularity of y-axis to 20 points per whole range # Set y-axis limits: lower bound at 0, upper bound to next highest multiple of 5 y_min = 0 y_max = ax.get_ylim()[1] # Get the current maximum y value y_max_rounded = np.ceil(y_max / 5) * 5 # Round y_max up to the next highest multiple of 5 ax.set_ylim(y_min, y_max_rounded) # Add a grid to improve readability and alignment ax.grid(True, linestyle='--', alpha=0.7) # Place legend outside the plot area to prevent overlapping with data points ax.legend(title='System', loc='upper right', bbox_to_anchor=(0.95, 1)) # Add tight layout to improve spacing between elements fig.tight_layout() # Display the plot st.pyplot(fig) ################################################################################################################################################ # WER PER GENDER #selected_systems = [free_system_with_best_wer, commercial_system_with_best_wer, free_system_with_worst_wer, commercial_system_with_worst_wer] selected_systems = df_per_sample['system'].unique() df_per_sample_wer_gender_pivot, df_available_samples_per_category_per_system, no_samples_per_category = calculate_wer_per_meta_category(df_per_sample, selected_systems, 'WER', 'speaker_gender') #print(df_per_sample_wer_gender_pivot) #print(no_samples_per_category) # print dataframe in streamlit st.write("Number of samples per category") for system in selected_systems: st.write(f"System: {system}") df_available_samples_per_category = df_available_samples_per_category_per_system[system] st.dataframe(df_available_samples_per_category) st.write("Number of samples analyzed per category - {}".format(no_samples_per_category)) st.dataframe(df_per_sample_wer_gender_pivot) #print(difference_values) #print(selected_systems) # create the scatter plot # the x axis should be the systems from selected_systems # the y axis should be the difference from difference_values # each system should have a different color fig, ax = plt.subplots() difference_values = df_per_sample_wer_gender_pivot['Difference'][:-3] selected_systems = df_per_sample_wer_gender_pivot.index[:-3] ax.scatter(difference_values, selected_systems, c=range(len(selected_systems)), cmap='viridis') ax.set_ylabel('ASR System') ax.set_xlabel('Difference in WER across speaker gender') ax.set_title('ASR systems perfomance bias for genders.') # add labels with difference in WER values for i, txt in enumerate(difference_values): ax.annotate(txt, (difference_values[i], selected_systems[i]), fontsize=5, ha='right') st.pyplot(fig) ##################################################################################################################################################################################### # WER per age df_per_sample_wer_age_pivot, df_available_samples_per_category_per_system, no_samples_per_category = calculate_wer_per_meta_category(df_per_sample, selected_systems,'WER','speaker_age') #print(df_per_sample_wer_age_pivot) #print(no_samples_per_category) # print dataframe in streamlit st.write("Number of samples per category") for system in selected_systems: st.write(f"System: {system}") df_available_samples_per_category = df_available_samples_per_category_per_system[system] st.dataframe(df_available_samples_per_category) st.write("Number of samples analyzed per category - {}".format(no_samples_per_category)) st.write("WER per age") st.dataframe(df_per_sample_wer_age_pivot) # extract columns from df_per_sample_wer_age_pivot for selected_systems (skip the last 3 values corresponding to median, average and std values) #print(selected_systems) # create the scatter plot # the x axis should be the systems from selected_systems # the y axis should be the difference from difference_values # each system should have a different color fig, ax = plt.subplots() difference_values = df_per_sample_wer_age_pivot['Std Dev'][:-3] selected_systems = df_per_sample_wer_age_pivot.index[:-3] ax.scatter(difference_values,selected_systems , c=range(len(selected_systems)), cmap='viridis') ax.set_ylabel('ASR System') ax.set_xlabel('Standard Deviation in WER across speaker age') ax.set_title('ASR systems perfomance bias for age groups') # add labels with difference in WER values for i, txt in enumerate(difference_values): ax.annotate(txt, (difference_values[i], selected_systems[i]), fontsize=5, ha='right') st.pyplot(fig) # READ vs CONVERSIONAL SPEECH AVERAGE WER # Hallucinations rate per system with interactive_comparison: st.title("Interactive comparison of ASR Systems performance") st.markdown(COMPARISON_INFO, unsafe_allow_html=True) st.title("Plots for analyzing ASR Systems performance") datasets = datasets_secret + datasets_public dataset = st.selectbox("Select Dataset", datasets, index=datasets.index('amu-cai/pl-asr-bigos-v2-secret'), key="select_dataset_interactive_comparison") # read the latest results for the selected dataset print("Reading the latest results for dataset: ", dataset) df_per_sample_all, df_per_dataset_all = read_latest_results(dataset, split, codename_to_shortname_mapping) # filter only the ref_type and norm_type we want to analyze df_per_sample = df_per_sample_all[(df_per_sample_all["ref_type"] == ref_type) & (df_per_sample_all["norm_type"] == norm_type)] # filter only the ref_type and norm_type we want to analyze df_per_dataset = df_per_dataset_all[(df_per_dataset_all["ref_type"] == ref_type) & (df_per_dataset_all["norm_type"] == norm_type)] evaluated_systems_list = df_per_sample["system"].unique() print(evaluated_systems_list) df_evaluated_systems = retrieve_asr_systems_meta_from_the_catalog(evaluated_systems_list) print(df_evaluated_systems) # read available options to analyze for specific dataset splits = list(df_per_dataset_all['subset'].unique()) # Get the unique splits norm_types = list(df_per_dataset_all['norm_type'].unique()) # Get the unique norm_types ref_types = list(df_per_dataset_all['ref_type'].unique()) # Get the unique ref_types systems = list(df_per_dataset_all['system'].unique()) # Get the unique systems metrics = list(df_per_dataset_all.columns[7:]) # Get the unique metrics # Select the system to display. More than 1 system can be selected. systems_selected = st.multiselect("Select ASR Systems", systems) # Select the metric to display metric = st.selectbox("Select Metric", metrics, index=metrics.index('WER')) # Select the normalization type norm_type = st.selectbox("Select Normalization Type", norm_types, index=norm_types.index('all')) # Select the reference type ref_type = st.selectbox("Select Reference Type", ref_types, index=ref_types.index('orig')) enable_labels = st.checkbox("Enable labels on radar plot", value=True) enable_bar_chart = st.checkbox("Enable bar chart", value=True) enable_polar_plot = st.checkbox("Enable radar plot", value=True) orientation = st.selectbox("Select orientation", ["vertical", "horizontal"], index=0) if enable_polar_plot: if metric: if systems_selected: create_radar_plot(df_per_dataset_all, enable_labels, systems_selected, metric, norm_type, ref_type) if enable_bar_chart: if metric: if systems_selected: create_bar_chart(df_per_dataset_all, systems_selected , metric, norm_type, ref_type, orientation)