Spaces:
Sleeping
Sleeping
import json | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import os | |
import numpy as np | |
# move to consts | |
buckets_age=['teens','twenties', 'thirties', 'fourties', 'fifties', 'sixties', 'seventies', 'eighties', 'nineties'] | |
buckets_sex=["male", "female"] | |
def load_bigos_analyzer_report(fp:str)->dict: | |
with open(fp, 'r') as f: | |
data = json.load(f) | |
return data | |
def num_of_samples_per_split(dataset_hf): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about number of samples per split | |
out_dict = {} | |
# number of samples per subset and split | |
metric = "samples" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
samples = dataset_hf[split].num_rows | |
##print(split, samples) | |
out_dict[split] = samples | |
# add number of samples for all splits | |
out_dict["all_splits"] = sum(out_dict.values()) | |
return out_dict | |
def audio_duration_per_split(dataset_hf): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "audio[h]" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
#sampling_rate = dataset_hf[split]["sampling_rate"][0] | |
#audio_total_length_samples = 0 | |
#audio_total_length_samples = sum(len(audio_file["array"]) for audio_file in dataset_hf["test"]["audio"]) | |
audio_total_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) | |
audio_total_length_hours = round(audio_total_length_seconds / 3600,2) | |
out_dict[split] = audio_total_length_hours | |
#print(split, audio_total_length_hours) | |
# add number of samples for all splits | |
out_dict["all_splits"] = sum(out_dict.values()) | |
return out_dict | |
def speakers_per_split(dataset_hf): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "speakers" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
speakers_ids_all = [str(fileid).split("-")[4] for fileid in dataset_hf[split]["audioname"]] | |
speakers_ids_uniq = list(set(speakers_ids_all)) | |
speakers_count = len(speakers_ids_uniq) | |
#print(split, speakers_count) | |
out_dict[split] = speakers_count | |
# add number of samples for all splits | |
out_dict["all_splits"] = sum(out_dict.values()) | |
return out_dict | |
def uniq_utts_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "utts_unique" | |
print("Calculating {}".format(metric)) | |
utts_all = [] | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if (split == "test"): | |
utts_split = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_split = dataset_hf[split]["ref_orig"] | |
utts_all = utts_all + utts_split | |
utts_uniq = list(set(utts_split)) | |
utts_uniq_count = len(utts_uniq) | |
#print(split, utts_uniq_count) | |
out_dict[split] = utts_uniq_count | |
# add number of samples for all splits | |
out_dict["all_splits"] = len(list(set(utts_all))) | |
return out_dict,utts_all | |
def words_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "words" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if (split == "test"): | |
utts_all = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_all = dataset_hf[split]["ref_orig"] | |
utts_lenghts = [len(utt.split(" ")) for utt in utts_all] | |
words_all_count = sum(utts_lenghts) | |
#print(split, words_all_count) | |
out_dict[split] = words_all_count | |
# add number of samples for all splits | |
out_dict["all_splits"] = sum(out_dict.values()) | |
return out_dict | |
def uniq_words_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
out_words_list = [] | |
metric = "words_unique" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if (split == "test"): | |
utts_all = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_all = dataset_hf[split]["ref_orig"] | |
words_all = " ".join(utts_all).split(" ") | |
words_uniq = list(set(words_all)) | |
out_words_list = out_words_list + words_uniq | |
words_uniq_count = len(words_uniq) | |
#print(split, words_uniq_count) | |
out_dict[split] = words_uniq_count | |
# add number of samples for all splits | |
out_words_uniq = list(set((out_words_list))) | |
out_words_uniq_count = len(out_words_uniq) | |
out_dict["all_splits"] = out_words_uniq_count | |
#print("all", out_words_uniq_count) | |
return out_dict, out_words_uniq | |
def chars_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "chars" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if (split=="test"): | |
utts_all = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_all = dataset_hf[split]["ref_orig"] | |
words_all = " ".join(utts_all).split(" ") | |
chars_all = " ".join(words_all) | |
chars_all_count = len(chars_all) | |
#print(split, chars_all_count) | |
out_dict[split] = chars_all_count | |
# add number of samples for all splits | |
out_dict["all_splits"] = sum(out_dict.values()) | |
return out_dict | |
def uniq_chars_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
out_chars_list = [] | |
metric = "chars_unique" | |
print("Calculating {}".format(metric)) | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if(split == "test"): | |
utts_all = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_all = dataset_hf[split]["ref_orig"] | |
words_all = " ".join(utts_all).split(" ") | |
words_uniq = list(set(words_all)) | |
chars_uniq = list(set("".join(words_uniq))) | |
chars_uniq_count = len(chars_uniq) | |
#print(split, chars_uniq_count) | |
out_dict[split] = chars_uniq_count | |
out_chars_list = out_chars_list + chars_uniq | |
# add number of samples for all splits | |
out_chars_uniq = list(set((out_chars_list))) | |
out_chars_uniq_count = len(out_chars_uniq) | |
out_dict["all_splits"] = out_chars_uniq_count | |
#print("all", out_chars_uniq_count) | |
return out_dict, out_chars_uniq | |
def meta_cov_per_split(dataset_hf, meta_field): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
no_meta=False | |
# TODO move to config | |
if meta_field == 'speaker_age': | |
buckets = buckets_age | |
if meta_field == 'speaker_sex': | |
buckets = buckets_sex | |
out_dict = {} | |
metric = "meta_cov_" + meta_field | |
print("Calculating {}".format(metric)) | |
meta_info_all = 0 | |
meta_info_not_null_all = 0 | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
meta_info = dataset_hf[split][meta_field] | |
meta_info_count = len(meta_info) | |
meta_info_all += meta_info_count | |
# calculate coverage | |
meta_info_not_null_count = len([x for x in meta_info if x != "N/A"]) | |
if meta_info_not_null_count == 0: | |
out_dict[split] = "N/A" | |
continue | |
meta_info_not_null_all += meta_info_not_null_count | |
meta_info_coverage = round(meta_info_not_null_count / meta_info_count, 2) | |
#print(split, meta_info_coverage) | |
# add number of samples for all splits | |
out_dict[split] = meta_info_coverage | |
# add number of samples for all splits | |
if (meta_info_not_null_all == 0): | |
out_dict["all_splits"] = "N/A" | |
else: | |
out_dict["all_splits"] = round(meta_info_not_null_all/meta_info_all,2 ) | |
return out_dict | |
def speech_rate_words_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "words_per_second" | |
print("Calculating {}".format(metric)) | |
words_all_count = 0 | |
audio_total_length_seconds = 0 | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if (split == "test"): | |
utts_split = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_split = dataset_hf[split]["ref_orig"] | |
words_split = " ".join(utts_split).split(" ") | |
words_split_count = len(words_split) | |
words_all_count += words_split_count | |
audio_split_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) | |
audio_total_length_seconds += audio_split_length_seconds | |
speech_rate = round(words_split_count / audio_split_length_seconds, 2) | |
#print(split, speech_rate) | |
out_dict[split] = speech_rate | |
# add number of samples for all splits | |
out_dict["all_splits"] = round(words_all_count / audio_total_length_seconds, 2) | |
return out_dict | |
def speech_rate_chars_per_split(dataset_hf, dataset_hf_secret): | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "chars_per_second" | |
print("Calculating {}".format(metric)) | |
chars_all_count = 0 | |
audio_total_length_seconds = 0 | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
if (split == "test"): | |
utts_split = dataset_hf_secret[split]["ref_orig"] | |
else: | |
utts_split = dataset_hf[split]["ref_orig"] | |
words_split = " ".join(utts_split).split(" ") | |
chars_split_count = len("".join(words_split)) | |
chars_all_count += chars_split_count | |
audio_split_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) | |
audio_total_length_seconds += audio_split_length_seconds | |
speech_rate = round(chars_split_count / audio_split_length_seconds, 2) | |
#print(split, speech_rate) | |
out_dict[split] = speech_rate | |
# add number of samples for all splits | |
out_dict["all_splits"] = round(chars_all_count / audio_total_length_seconds, 2) | |
return out_dict | |
# distribution of speaker age | |
def meta_distribution_text(dataset_hf, meta_field): | |
no_meta=False | |
if meta_field == 'speaker_age': | |
buckets = buckets_age | |
if meta_field == 'speaker_sex': | |
buckets = buckets_sex | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict = {} | |
metric = "distribution_" + meta_field | |
print("Calculating {}".format(metric)) | |
values_count_total = {} | |
for bucket in buckets: | |
values_count_total[bucket]=0 | |
for split in dataset_hf.keys(): | |
out_dict[split] = {} | |
# extract speakers from file_id | |
meta_info = dataset_hf[split][meta_field] | |
meta_info_not_null = [x for x in meta_info if x != "N/A"] | |
if len(meta_info_not_null) == 0: | |
out_dict[split]="N/A" | |
no_meta=True | |
continue | |
for bucket in buckets: | |
values_count = meta_info_not_null.count(bucket) | |
values_count_total[bucket] += values_count | |
out_dict[split][bucket] = round(values_count/len(meta_info_not_null),2) | |
#print(split, out_dict[split]) | |
# add number of samples for all splits | |
if (no_meta): | |
out_dict["all_splits"] = "N/A" | |
return out_dict | |
out_dict["all_splits"] = {} | |
# calculate total number of samples in values_count_total | |
for bucket in buckets: | |
total_samples = sum(values_count_total.values()) | |
out_dict["all_splits"][bucket] = round(values_count_total[bucket]/total_samples,2) | |
return out_dict | |
def recordings_per_speaker(dataset_hf): | |
recordings_per_speaker_stats_dict = {} | |
# input - huggingface dataset object | |
# output - dictionary with statistics about audio duration per split | |
out_dict_stats = {} | |
out_dict_contents = {} | |
metric = "recordings_per_speaker" | |
print("Calculating {}".format(metric)) | |
recordings_per_speaker_stats_dict_all = {} | |
recordings_total=0 | |
speakers_total = 0 | |
for split in dataset_hf.keys(): | |
# extract speakers from file_id | |
audiopaths = dataset_hf[split]["audioname"] | |
speaker_prefixes = [str(fileid).split("-")[0:5] for fileid in audiopaths] | |
speakers_dict_split = {} | |
# create dictionary with list of audio paths matching speaker prefix | |
# Create initial dictionary keys from speaker prefixes | |
for speaker_prefix in speaker_prefixes: | |
speaker_prefix_str = "-".join(speaker_prefix) | |
speakers_dict_split[speaker_prefix_str] = [] | |
# Populate the dictionary with matching audio paths | |
for audio_path in audiopaths: | |
for speaker_prefix_str in speakers_dict_split.keys(): | |
if speaker_prefix_str in audio_path: | |
speakers_dict_split[speaker_prefix_str].append(audio_path) | |
# iterate of speaker_dict prefixes and calculate number of recordings per speaker. | |
recordings_per_speaker_stats_dict_split = {} | |
for speaker_prefix_str in speakers_dict_split.keys(): | |
recordings_per_speaker_stats_dict_split[speaker_prefix_str] = len(speakers_dict_split[speaker_prefix_str]) | |
out_dict_contents[split] = {} | |
out_dict_contents[split] = recordings_per_speaker_stats_dict_split | |
# use recordings_per_speaker_stats to calculate statistics like min, max, avg, median, std | |
out_dict_stats[split] = {} | |
speakers_split = len(list(recordings_per_speaker_stats_dict_split.keys())) | |
speakers_total += speakers_split | |
recordings_split = len(audiopaths) | |
recordings_total += recordings_split | |
average_recordings_per_speaker = round( recordings_split / speakers_split,2) | |
out_dict_stats[split]["average"] = average_recordings_per_speaker | |
out_dict_stats[split]["std"] = round(np.std(list(recordings_per_speaker_stats_dict_split.values())),2) | |
out_dict_stats[split]["median"] = np.median(list(recordings_per_speaker_stats_dict_split.values())) | |
out_dict_stats[split]["min"] = min(recordings_per_speaker_stats_dict_split.values()) | |
out_dict_stats[split]["max"] = max(recordings_per_speaker_stats_dict_split.values()) | |
recordings_per_speaker_stats_dict_all = recordings_per_speaker_stats_dict_all | recordings_per_speaker_stats_dict_split | |
# add number of samples for all splits | |
average_recordings_per_speaker_all = round( recordings_total / speakers_total , 2) | |
out_dict_stats["all_splits"] = {} | |
out_dict_stats["all_splits"]["average"] = average_recordings_per_speaker_all | |
out_dict_stats["all_splits"]["std"] = round(np.std(list(recordings_per_speaker_stats_dict_all.values())),2) | |
out_dict_stats["all_splits"]["median"] = np.median(list(recordings_per_speaker_stats_dict_all.values())) | |
out_dict_stats["all_splits"]["min"] = min(recordings_per_speaker_stats_dict_all.values()) | |
out_dict_stats["all_splits"]["max"] = max(recordings_per_speaker_stats_dict_all.values()) | |
out_dict_contents["all_splits"] = recordings_per_speaker_stats_dict_all | |
return out_dict_stats, out_dict_contents | |
def meta_distribution_bar_plot(dataset_hf, output_dir, dimension = "speaker_sex"): | |
pass | |
def meta_distribution_violin_plot(dataset_hf, output_dir, metric = "audio_duration_seconds", dimension = "speaker_sex"): | |
# input - huggingface dataset object | |
# output - figure with distribution of audio duration per sex | |
out_dict = {} | |
print("Generating violin plat for metric {} for dimension {}".format(metric, dimension)) | |
# drop samples for which dimension column values are equal to "N/A" | |
for split in dataset_hf.keys(): | |
df_dataset = pd.DataFrame(dataset_hf[split]) | |
# remove values equal to "N/A" for column dimension | |
df_filtered = df_dataset[df_dataset[dimension] != "N/A"] | |
df_filtered = df_filtered[df_filtered[dimension] != "other"] | |
df_filtered = df_filtered[df_filtered[dimension] != "unknown"] | |
if df_filtered.empty: | |
print("No data for split {} and dimension {}".format(split, dimension)) | |
continue | |
if (len(df_filtered)>=5000): | |
sample_size = 5000 | |
print("Selecting sample of size {}".format(sample_size)) | |
else: | |
sample_size = len(df_filtered) | |
print("Selecting full split of size {}".format(sample_size)) | |
df = df_filtered.sample(sample_size) | |
# if df_filtered is empty, skip violin plot generation for this split and dimension | |
print("Generating plot") | |
plt.figure(figsize=(20, 15)) | |
plot = sns.violinplot(data = df, hue=dimension, x='dataset', y=metric, split=True, fill = False,inner = 'quart', legend='auto', common_norm=True) | |
plot.set_xticklabels(plot.get_xticklabels(), rotation = 30, horizontalalignment = 'right') | |
plt.title('Violin plot of {} by {} for split {}'.format(metric, dimension, split)) | |
plt.xlabel(dimension) | |
plt.ylabel(metric) | |
#plt.show( | |
# save figure to file | |
os.makedirs(output_dir, exist_ok=True) | |
output_fn = os.path.join(output_dir, metric + "-" + dimension + "-" + split + ".png") | |
plt.savefig(output_fn) | |
print("Plot generation completed") | |
def read_reports(dataset_name): | |
json_contents = "./reports/{}/dataset_contents.json".format(dataset_name) | |
json_stats = "reports/{}/dataset_statistics.json".format(dataset_name) | |
with open(json_contents, 'r') as file: | |
contents_dict = json.load(file) | |
with open(json_stats, 'r') as file: | |
stats_dict = json.load(file) | |
return(stats_dict, contents_dict) | |
def add_test_split_stats_from_secret_dataset(stats_dict_public, stats_dict_secret): | |
# merge contents if dictionaries for fields utts, words, words_unique, chars, chars_unique and speech_rate | |
for dataset in stats_dict_public.keys(): | |
print(dataset) | |
for metric in stats_dict_secret[dataset].keys(): | |
for split in stats_dict_secret[dataset][metric].keys(): | |
if split == "test": | |
stats_dict_public[dataset][metric][split] = stats_dict_secret[dataset][metric][split] | |
return(stats_dict_public) | |
def dict_to_multindex_df(dict_in, all_splits=False): | |
# Creating a MultiIndex DataFrame | |
rows = [] | |
for dataset, metrics in dict_in.items(): | |
if (dataset == "all"): | |
continue | |
for metric, splits in metrics.items(): | |
for split, value in splits.items(): | |
if (all_splits): | |
if (split == "all_splits"): | |
rows.append((dataset, metric, split, value)) | |
else: | |
if (split == "all_splits"): | |
continue | |
rows.append((dataset, metric, split, value)) | |
# Convert to DataFrame | |
df = pd.DataFrame(rows, columns=['dataset', 'metric', 'split', 'value']) | |
df.set_index(['dataset', 'metric', 'split'], inplace=True) | |
return(df) | |
def dict_to_multindex_df_all_splits(dict_in): | |
# Creating a MultiIndex DataFrame | |
rows = [] | |
for dataset, metrics in dict_in.items(): | |
if (dataset == "all"): | |
continue | |
for metric, splits in metrics.items(): | |
for split, value in splits.items(): | |
if (split == "all_splits"): | |
rows.append((dataset, metric, split, value)) | |
# Convert to DataFrame | |
df = pd.DataFrame(rows, columns=['dataset', 'metric', 'split', 'value']) | |
df.set_index(['dataset', 'metric', 'split'], inplace=True) | |
return(df) | |
def extract_stats_to_agg(df_multindex_per_split, metrics): | |
# input - multiindex dataframe has three indexes - dataset, metric, split | |
# select only relevant metrics | |
df_agg_splits = df_multindex_per_split.loc[(slice(None), metrics), :] | |
# unstack - move rows per split to columns | |
df_agg_splits = df_agg_splits.unstack(level ='split') | |
# aggregate values for all splits | |
df_agg_splits['value', 'total'] = df_agg_splits['value'].sum(axis=1) | |
# drop columns with splits | |
df_agg_splits.columns = df_agg_splits.columns.droplevel(0) | |
columns_to_drop = ['test', 'train', 'validation'] | |
df_agg_splits.drop(columns = columns_to_drop, inplace = True) | |
# move rows corresponding to specific metrics into specific columns | |
df_agg_splits = df_agg_splits.unstack(level ='metric') | |
df_agg_splits.columns = df_agg_splits.columns.droplevel(0) | |
return(df_agg_splits) | |
def extract_stats_all_splits(df_multiindex_all_splits, metrics): | |
df_all_splits = df_multiindex_all_splits.loc[(slice(None), metrics), :] | |
df_all_splits = df_all_splits.unstack(level ='metric') | |
df_all_splits.columns = df_all_splits.columns.droplevel(0) | |
#print(df_all_splits) | |
df_all_splits = df_all_splits.droplevel('split', axis=0) | |
return(df_all_splits) | |
def extract_stats_for_dataset_card(df_multindex_per_split, subset, metrics, add_total=False): | |
print(df_multindex_per_split) | |
df_metrics_subset = df_multindex_per_split | |
df_metrics_subset = df_metrics_subset.unstack(level ='split') | |
df_metrics_subset.columns = df_metrics_subset.columns.droplevel(0) | |
df_metrics_subset = df_metrics_subset.loc[(slice(None), metrics), :] | |
df_metrics_subset = df_metrics_subset.query("dataset == '{}'".format(subset)) | |
# change order of columns to train validation test | |
df_metrics_subset.reset_index(inplace=True) | |
if (add_total): | |
new_columns = ['metric', 'train', 'validation', 'test', 'total'] | |
total = df_metrics_subset[['train', 'validation','test']].sum(axis=1) | |
df_metrics_subset['total'] = total | |
else: | |
new_columns = ['metric', 'train', 'validation', 'test'] | |
df_metrics_subset = df_metrics_subset.reindex(columns=new_columns) | |
df_metrics_subset.set_index('metric', inplace=True) | |
return(df_metrics_subset) |