import pandas as pd import streamlit as st import seaborn as sns import matplotlib.pyplot as plt import os import requests import numpy as np from datasets import Dataset from huggingface_hub import hf_hub_download def download_tsv_from_google_sheet(sheet_url): # Modify the Google Sheet URL to export it as TSV tsv_url = sheet_url.replace('/edit#gid=', '/export?format=tsv&gid=') # Send a GET request to download the TSV file response = requests.get(tsv_url) response.encoding = 'utf-8' # Check if the request was successful if response.status_code == 200: # Read the TSV content into a pandas DataFrame from io import StringIO tsv_content = StringIO(response.text) df = pd.read_csv(tsv_content, sep='\t', encoding='utf-8') return df else: print("Failed to download the TSV file.") return None def generate_path_to_latest_tsv(dataset_name, split, type_of_result): fn = os.path.join("./data", dataset_name, split, "eval_results-{}-latest.tsv".format(type_of_result)) #print(fn) return(fn) @st.cache_data def read_latest_results(dataset_name, split, codename_to_shortname_mapping): # Set your Hugging Face API token as an environment variable # Define the path to your dataset directory repo_id = os.getenv('HF_SECRET_REPO_ID') #"michaljunczyk/bigos-eval-results-secret" dataset = dataset_name dataset_path = os.path.join("leaderboard_input", dataset, split) print(dataset_path) fn_results_per_dataset = 'eval_results-per_dataset-latest.tsv' fn_results_per_sample = 'eval_results-per_sample-latest.tsv' fp_results_per_dataset_repo = os.path.join(dataset_path, fn_results_per_dataset) print(fp_results_per_dataset_repo) fp_results_per_sample_repo = os.path.join(dataset_path, fn_results_per_sample) # Download the file from the Hugging Face Hub local_fp_per_dataset = hf_hub_download(repo_id=repo_id, repo_type='dataset', filename=fp_results_per_dataset_repo, use_auth_token=os.getenv('HF_TOKEN')) local_fp_per_sample = hf_hub_download(repo_id=repo_id, repo_type='dataset', filename=fp_results_per_sample_repo, use_auth_token=os.getenv('HF_TOKEN')) # Read the TSV file into a pandas DataFrame df_per_dataset = pd.read_csv(local_fp_per_dataset, delimiter='\t') df_per_sample = pd.read_csv(local_fp_per_sample, delimiter='\t') # Print the DataFrame print(df_per_dataset) print(df_per_sample) #replace column system with Shortname if (codename_to_shortname_mapping): df_per_sample['system'] = df_per_sample['system'].replace(codename_to_shortname_mapping) df_per_dataset['system'] = df_per_dataset['system'].replace(codename_to_shortname_mapping) return df_per_sample, df_per_dataset @st.cache_data def retrieve_asr_systems_meta_from_the_catalog(asr_systems_list): #print("Retrieving ASR systems metadata for systems: ", asr_systems_list) #print("Number of systems: ", len(asr_systems_list)) #print("Reading ASR systems catalog") asr_systems_cat_url = "https://docs.google.com/spreadsheets/d/1fVsE98Ulmt-EIEe4wx8sUdo7RLigDdAVjQxNpAJIrH8/edit#gid=681521237" #print("Reading the catalog from: ", asr_systems_cat_url) catalog = download_tsv_from_google_sheet(asr_systems_cat_url) #print("ASR systems catalog read") #print("Catalog contains information about {} ASR systems".format(len(catalog))) ##print("Catalog columns: ", catalog.columns) ##print("ASR systems available in the catalog: ", catalog["Codename"] ) #print("Filter only the systems we are interested in") catalog = catalog[(catalog["Codename"].isin(asr_systems_list)) | (catalog["Shortname"].isin(asr_systems_list))] return catalog def basic_stats_per_dimension(df_input, metric, dimension): #Median value df_median = df_input.groupby(dimension)[metric].median().sort_values().round(2) #Average value df_avg = df_input.groupby(dimension)[metric].mean().sort_values().round(2) #Standard deviation df_std = df_input.groupby(dimension)[metric].std().sort_values().round(2) # Min df_min = df_input.groupby(dimension)[metric].min().sort_values().round(2) # Max df_max = df_input.groupby(dimension)[metric].max().sort_values().round(2) # concatanate all WER statistics df_stats = pd.concat([df_median, df_avg, df_std, df_min, df_max], axis=1) df_stats.columns = ["med_{}".format(metric), "avg_{}".format(metric), "std_{}".format(metric), "min_{}".format(metric), "max_{}".format(metric)] # sort by median values df_stats = df_stats.sort_values(by="med_{}".format(metric)) return df_stats def ser_from_per_sample_results(df_per_sample, dimension): # group by dimension e.g dataset or sample and calculate fraction of samples with WER equal to 0 df_ser = df_per_sample.groupby(dimension)["WER"].apply(lambda x: (x != 0).mean()*100).sort_values().round(2) # change column names df_ser.name = "SER" return df_ser def get_total_audio_duration(df_per_sample): # filter the df_per_sample dataframe to leave only unique audio recordings df_per_sample_unique_audio = df_per_sample.drop_duplicates(subset='id') # calculate the total size of the dataset in hours based on the list of unique audio recordings total_duration_hours = df_per_sample_unique_audio['audio_duration'].sum() / 3600 #print(f"Total duration of the dataset: {total_duration_hours:.2f} hours") return total_duration_hours def extend_meta_per_sample_words_chars(df_per_sample): # extend the results with the number of words in the reference and hypothesis df_per_sample['ref_words'] = df_per_sample['ref'].apply(lambda x: len(x.split())) df_per_sample['hyp_words'] = df_per_sample['hyp'].apply(lambda x: len(x.split())) # extend the df_per_sample with the number of words per seconds (based on duration column) for reference and hypothesis df_per_sample['ref_wps'] = df_per_sample['ref_words'] / df_per_sample['audio_duration'].round(2) df_per_sample['hyp_wps'] = df_per_sample['hyp_words'] / df_per_sample['audio_duration'].round(2) # extend the df_per_sample with the number of characters per seconds (based on duration column) for reference and hypothesis df_per_sample['ref_cps'] = df_per_sample['ref'].apply(lambda x: len(x)) / df_per_sample['audio_duration'].round(2) df_per_sample['hyp_cps'] = df_per_sample['hyp'].apply(lambda x: len(x)) / df_per_sample['audio_duration'].round(2) # extend the df_per_sample with the number of characters per words for reference and hypothesis df_per_sample['ref_cpw'] = df_per_sample['ref'].apply(lambda x: len(x)) / df_per_sample['ref_words'].round(2) df_per_sample['hyp_cpw'] = df_per_sample['hyp'].apply(lambda x: len(x)) / df_per_sample['hyp_words'].round(2) # extend metadata with number of words and characters return df_per_sample def filter_top_outliers(df_input, metric, max_threshold): # filter out outliers exceeding max_threshold df_filtered = df_input[df_input[metric] < max_threshold] return df_filtered def filter_bottom_outliers(df_input, metric, min_threshold): # filter out outliers below min_threshold df_filtered = df_input[df_input[metric] > min_threshold] return df_filtered def box_plot_per_dimension(df_input, metric, dimension, title, xlabel, ylabel): # Box plot for WER per dataset plt.figure(figsize=(20, 10)) # generate box plot without outliers sns.boxplot(x=dimension, y=metric, data=df_input, order=df_input.groupby(dimension)[metric].median().sort_values().index, showfliers=False) plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.xticks(rotation=90) #return figure return plt def check_impact_of_normalization(data_in, ref_type='orig'): # Filter the data to include only the specific reference type data_ref_type = data_in[data_in['ref_type'] == ref_type] data = data_ref_type.drop(columns=['system','subset', 'ref_type']) # Calculate the average impact of each normalization type on the metrics average_impact = data.groupby('norm_type').mean() baseline_metrics = average_impact.loc['none'] # Calculate the difference in metrics compared to the baseline difference_metrics = average_impact.subtract(baseline_metrics) # Removing the baseline row for clarity difference_metrics = difference_metrics.drop(index='none') # Rounding the results to 2 decimal places difference_metrics_rounded = difference_metrics.round(2) # add column with average impact on error reduction for all metric types difference_metrics_rounded['Average'] = difference_metrics_rounded.mean(axis=1).round(2) # Sorting the results based on the average impact on error reduction. The lower the absolute value, the higher the impact difference_metrics_sorted_abs = difference_metrics_rounded.sort_values(by='Average', key=abs) # Display the resulting differences return(difference_metrics_sorted_abs) def calculate_wer_per_meta_category(df_per_sample, selected_systems, metric, analysis_dimension = 'speaker_gender'): # filter out from df_per_sample rows where analysis_dimension is null df_per_sample_dimension = df_per_sample[df_per_sample[analysis_dimension].notnull()] #print(df_per_sample_dimension) meta_values = df_per_sample_dimension[analysis_dimension].unique() if (analysis_dimension == 'speaker_age'): # sort values in the meta_values list, so the order of the values is consistent, starting from teens, twenties, thirties, fourties, fifties, sixties, seventies, eighties, nineties # Example usage: sorted_values = sort_age_categories(meta_values) #print(sorted_values) print("meta values sorted:", sorted_values) meta_values = sorted_values # calculate number of available systems for specific category #print(df_per_sample_dimension) # create table with number of samples in df_per_sample_single_system for each meta category from meta_values df_per_sample_single_system = df_per_sample_dimension[df_per_sample['system'] == selected_systems[0]] # select the value with the smallest number of available samples for all systems min_samples = 0 df_available_samples_per_category_per_system = {} for system in selected_systems: df_per_sample_single_system = df_per_sample_dimension[df_per_sample['system'] == system] df_available_samples_per_category_per_system[system] = df_per_sample_single_system.groupby(analysis_dimension)[metric].count().reset_index() df_available_samples_per_category_per_system[system] = df_available_samples_per_category_per_system[system].rename(columns={metric: 'available_samples'}) # replace index with values from analysis_dimension df_available_samples_per_category_per_system[system] = df_available_samples_per_category_per_system[system].set_index(analysis_dimension) #print(df_available_samples_per_category_per_system[system]) min_samples_system = df_available_samples_per_category_per_system[system]['available_samples'].min() if (min_samples_system < min_samples) or (min_samples == 0): min_samples = min_samples_system #print(min_samples) # get the subset of the df_per_sample_dimension with results for all systems to analyze df_per_sample_selected_systems = df_per_sample_dimension[df_per_sample['system'].isin(selected_systems)] #print(df_per_sample_selected_systems) # select equal number of samples for each system and analysis_dimension equal to the number of samples for the dimension with the smallest number of samples (min_samples) df_per_sample_selected_systems = df_per_sample_selected_systems.groupby(['system',analysis_dimension]).apply(lambda x: x.sample(min_samples)).reset_index(drop=True) #print(df_per_sample_selected_systems) df_per_sample_metric_dimension = df_per_sample_selected_systems.groupby(['system', analysis_dimension])[metric].mean().round(2).reset_index() df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension.pivot(index=analysis_dimension, columns='system', values=metric) df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.round(2) # add row with the difference between the male and female metric values for values. Add "Difference" row at the end of the dataframe to the index # calculate the difference between the smallest and largest metric values # if there are only two values in the analysis_dimension, calculate the difference between them if len(meta_values) == 2: gap_metrics = ['Difference'] df_per_sample_metric_dimension_pivot.loc[gap_metrics[0]] = df_per_sample_metric_dimension_pivot.loc[meta_values[0]] - df_per_sample_metric_dimension_pivot.loc[meta_values[1]] # if there are more than two values in the analysis_dimension, calculate the difference between the smallest and the largest value elif len(meta_values) > 2: gap_metrics = ['Std Dev', 'MAD', 'Range'] metrics = pd.DataFrame([]) df = df_per_sample_metric_dimension_pivot print(df) # calculate the standard deviation of the metric values metrics[gap_metrics[0]] = df.std() # calculate the mean absolute deviation of the metric values metrics[gap_metrics[1]] = df.apply(lambda x: np.mean(np.abs(x - np.mean(x))), axis=0) # calculate the difference between the smallest and largest metric values metrics[gap_metrics[2]] = df.max() - df.min() metrics_t = metrics.round(2).transpose() print(metrics_t) #concatante the metrics dataframe to the df_per_sample_metric_dimension_pivot df_per_sample_metric_dimension_pivot = pd.concat([df_per_sample_metric_dimension_pivot, metrics_t], axis=0) print(df_per_sample_metric_dimension_pivot) # transpose the dataframe to have systems as rows # sort by the average difference from the smallest to the largest value df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.transpose() df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.sort_values(by=gap_metrics[0], axis=0) # add average, median and standard deviation as the last 3 rows to the dataframe # calculate average, median, and standard deviation of the difference between the smallest and largest metric values avg_difference = df_per_sample_metric_dimension_pivot.mean().round(2) median_difference = df_per_sample_metric_dimension_pivot.median().round(2) std_difference = df_per_sample_metric_dimension_pivot.std().round(2) # add average, median, and standard deviation as the last 3 rows to the dataframe df_per_sample_metric_dimension_pivot.loc['median'] = median_difference df_per_sample_metric_dimension_pivot.loc['average'] = avg_difference df_per_sample_metric_dimension_pivot.loc['std'] = std_difference analyzed_samples_per_category = min_samples # round all values to 2 decimal places df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.round(2) # keep the order of columns as in the meta_values list columns = list(meta_values) + gap_metrics print(columns) df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot[columns] return df_per_sample_metric_dimension_pivot, df_available_samples_per_category_per_system, analyzed_samples_per_category def sort_age_categories(meta_values): order = ["teens", "twenties", "thirties", "fourties", "fifties", "sixties", "seventies", "eighties", "nineties"] order_dict = {age: index for index, age in enumerate(order)} sorted_values = sorted(meta_values, key=lambda x: order_dict.get(x, float('inf'))) return sorted_values def calculate_wer_per_audio_feature(df_per_sample, selected_systems, audio_feature_to_analyze, metric, no_of_buckets): # filter out results for selected systems print(df_per_sample) feature_values_uniq = df_per_sample[audio_feature_to_analyze].unique() df_per_sample_selected_systems = df_per_sample[df_per_sample['system'].isin(selected_systems)] # create buckets based on speech rate words unique values (min, max,step) min_feature_value = round(min(feature_values_uniq), 1) max_feature_value = round(max(feature_values_uniq), 1) step = max_feature_value / no_of_buckets audio_feature_buckets = [min_feature_value + i * step for i in range(no_of_buckets)] # add column with speech_rate_words rounded to nearest bucket value. # map audio duration to the closest bucket df_per_sample[audio_feature_to_analyze + '_bucket'] = df_per_sample[audio_feature_to_analyze].apply( lambda x: min(audio_feature_buckets, key=lambda y: abs(x - y))) # calculate average WER per audio duration bucket df_per_sample_wer_feature = df_per_sample_selected_systems.groupby(['system', audio_feature_to_analyze])[metric].mean().reset_index() # add column with number of samples for specific audio bucket size df_per_sample_wer_feature['number_of_samples'] = df_per_sample_selected_systems.groupby(['system', audio_feature_to_analyze])[metric].count().values df_per_sample_wer_feature = df_per_sample_wer_feature.sort_values(by=audio_feature_to_analyze) # round values in WER column in df_per_sample_wer to 2 decimal places df_per_sample_wer_feature[metric].round(2) # transform df_per_sample_wer. Use system values as columns, while audio_duration_buckets as main index df_per_sample_wer_feature_pivot = df_per_sample_wer_feature.pivot(index=audio_feature_to_analyze, columns='system', values=metric) df_per_sample_wer_feature_pivot = df_per_sample_wer_feature_pivot.round(2) df_per_sample_wer_feature_pivot['number_of_samples'] = df_per_sample_wer_feature[ df_per_sample_wer_feature['system'] == selected_systems[0]].groupby(audio_feature_to_analyze)[ 'number_of_samples'].sum().values # put number_of_samples as the first column after index df_per_sample_wer_feature_pivot = df_per_sample_wer_feature_pivot[ ['number_of_samples'] + [col for col in df_per_sample_wer_feature_pivot.columns if col != 'number_of_samples']] return df_per_sample_wer_feature_pivot, df_per_sample_wer_feature