mj-new
Working analysis of size and text/audio derived basic features
32fbd07
raw
history blame
23 kB
import json
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import os
import numpy as np
# move to consts
buckets_age=['teens','twenties', 'thirties', 'fourties', 'fifties', 'sixties', 'seventies', 'eighties', 'nineties']
buckets_sex=["male", "female"]
def load_bigos_analyzer_report(fp:str)->dict:
with open(fp, 'r') as f:
data = json.load(f)
return data
def num_of_samples_per_split(dataset_hf):
# input - huggingface dataset object
# output - dictionary with statistics about number of samples per split
out_dict = {}
# number of samples per subset and split
metric = "samples"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
samples = dataset_hf[split].num_rows
##print(split, samples)
out_dict[split] = samples
# add number of samples for all splits
out_dict["all_splits"] = sum(out_dict.values())
return out_dict
def audio_duration_per_split(dataset_hf):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "audio[h]"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
#sampling_rate = dataset_hf[split]["sampling_rate"][0]
#audio_total_length_samples = 0
#audio_total_length_samples = sum(len(audio_file["array"]) for audio_file in dataset_hf["test"]["audio"])
audio_total_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"])
audio_total_length_hours = round(audio_total_length_seconds / 3600,2)
out_dict[split] = audio_total_length_hours
#print(split, audio_total_length_hours)
# add number of samples for all splits
out_dict["all_splits"] = sum(out_dict.values())
return out_dict
def speakers_per_split(dataset_hf):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "speakers"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
# extract speakers from file_id
speakers_ids_all = [str(fileid).split("-")[4] for fileid in dataset_hf[split]["audioname"]]
speakers_ids_uniq = list(set(speakers_ids_all))
speakers_count = len(speakers_ids_uniq)
#print(split, speakers_count)
out_dict[split] = speakers_count
# add number of samples for all splits
out_dict["all_splits"] = sum(out_dict.values())
return out_dict
def uniq_utts_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "utts_unique"
print("Calculating {}".format(metric))
utts_all = []
for split in dataset_hf.keys():
# extract speakers from file_id
if (split == "test"):
utts_split = dataset_hf_secret[split]["ref_orig"]
else:
utts_split = dataset_hf[split]["ref_orig"]
utts_all = utts_all + utts_split
utts_uniq = list(set(utts_split))
utts_uniq_count = len(utts_uniq)
#print(split, utts_uniq_count)
out_dict[split] = utts_uniq_count
# add number of samples for all splits
out_dict["all_splits"] = len(list(set(utts_all)))
return out_dict,utts_all
def words_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "words"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
# extract speakers from file_id
if (split == "test"):
utts_all = dataset_hf_secret[split]["ref_orig"]
else:
utts_all = dataset_hf[split]["ref_orig"]
utts_lenghts = [len(utt.split(" ")) for utt in utts_all]
words_all_count = sum(utts_lenghts)
#print(split, words_all_count)
out_dict[split] = words_all_count
# add number of samples for all splits
out_dict["all_splits"] = sum(out_dict.values())
return out_dict
def uniq_words_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
out_words_list = []
metric = "words_unique"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
# extract speakers from file_id
if (split == "test"):
utts_all = dataset_hf_secret[split]["ref_orig"]
else:
utts_all = dataset_hf[split]["ref_orig"]
words_all = " ".join(utts_all).split(" ")
words_uniq = list(set(words_all))
out_words_list = out_words_list + words_uniq
words_uniq_count = len(words_uniq)
#print(split, words_uniq_count)
out_dict[split] = words_uniq_count
# add number of samples for all splits
out_words_uniq = list(set((out_words_list)))
out_words_uniq_count = len(out_words_uniq)
out_dict["all_splits"] = out_words_uniq_count
#print("all", out_words_uniq_count)
return out_dict, out_words_uniq
def chars_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "chars"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
# extract speakers from file_id
if (split=="test"):
utts_all = dataset_hf_secret[split]["ref_orig"]
else:
utts_all = dataset_hf[split]["ref_orig"]
words_all = " ".join(utts_all).split(" ")
chars_all = " ".join(words_all)
chars_all_count = len(chars_all)
#print(split, chars_all_count)
out_dict[split] = chars_all_count
# add number of samples for all splits
out_dict["all_splits"] = sum(out_dict.values())
return out_dict
def uniq_chars_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
out_chars_list = []
metric = "chars_unique"
print("Calculating {}".format(metric))
for split in dataset_hf.keys():
# extract speakers from file_id
if(split == "test"):
utts_all = dataset_hf_secret[split]["ref_orig"]
else:
utts_all = dataset_hf[split]["ref_orig"]
words_all = " ".join(utts_all).split(" ")
words_uniq = list(set(words_all))
chars_uniq = list(set("".join(words_uniq)))
chars_uniq_count = len(chars_uniq)
#print(split, chars_uniq_count)
out_dict[split] = chars_uniq_count
out_chars_list = out_chars_list + chars_uniq
# add number of samples for all splits
out_chars_uniq = list(set((out_chars_list)))
out_chars_uniq_count = len(out_chars_uniq)
out_dict["all_splits"] = out_chars_uniq_count
#print("all", out_chars_uniq_count)
return out_dict, out_chars_uniq
def meta_cov_per_split(dataset_hf, meta_field):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
no_meta=False
# TODO move to config
if meta_field == 'speaker_age':
buckets = buckets_age
if meta_field == 'speaker_sex':
buckets = buckets_sex
out_dict = {}
metric = "meta_cov_" + meta_field
print("Calculating {}".format(metric))
meta_info_all = 0
meta_info_not_null_all = 0
for split in dataset_hf.keys():
# extract speakers from file_id
meta_info = dataset_hf[split][meta_field]
meta_info_count = len(meta_info)
meta_info_all += meta_info_count
# calculate coverage
meta_info_not_null_count = len([x for x in meta_info if x != "N/A"])
if meta_info_not_null_count == 0:
out_dict[split] = "N/A"
continue
meta_info_not_null_all += meta_info_not_null_count
meta_info_coverage = round(meta_info_not_null_count / meta_info_count, 2)
#print(split, meta_info_coverage)
# add number of samples for all splits
out_dict[split] = meta_info_coverage
# add number of samples for all splits
if (meta_info_not_null_all == 0):
out_dict["all_splits"] = "N/A"
else:
out_dict["all_splits"] = round(meta_info_not_null_all/meta_info_all,2 )
return out_dict
def speech_rate_words_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "words_per_second"
print("Calculating {}".format(metric))
words_all_count = 0
audio_total_length_seconds = 0
for split in dataset_hf.keys():
# extract speakers from file_id
if (split == "test"):
utts_split = dataset_hf_secret[split]["ref_orig"]
else:
utts_split = dataset_hf[split]["ref_orig"]
words_split = " ".join(utts_split).split(" ")
words_split_count = len(words_split)
words_all_count += words_split_count
audio_split_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"])
audio_total_length_seconds += audio_split_length_seconds
speech_rate = round(words_split_count / audio_split_length_seconds, 2)
#print(split, speech_rate)
out_dict[split] = speech_rate
# add number of samples for all splits
out_dict["all_splits"] = round(words_all_count / audio_total_length_seconds, 2)
return out_dict
def speech_rate_chars_per_split(dataset_hf, dataset_hf_secret):
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "chars_per_second"
print("Calculating {}".format(metric))
chars_all_count = 0
audio_total_length_seconds = 0
for split in dataset_hf.keys():
# extract speakers from file_id
if (split == "test"):
utts_split = dataset_hf_secret[split]["ref_orig"]
else:
utts_split = dataset_hf[split]["ref_orig"]
words_split = " ".join(utts_split).split(" ")
chars_split_count = len("".join(words_split))
chars_all_count += chars_split_count
audio_split_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"])
audio_total_length_seconds += audio_split_length_seconds
speech_rate = round(chars_split_count / audio_split_length_seconds, 2)
#print(split, speech_rate)
out_dict[split] = speech_rate
# add number of samples for all splits
out_dict["all_splits"] = round(chars_all_count / audio_total_length_seconds, 2)
return out_dict
# distribution of speaker age
def meta_distribution_text(dataset_hf, meta_field):
no_meta=False
if meta_field == 'speaker_age':
buckets = buckets_age
if meta_field == 'speaker_sex':
buckets = buckets_sex
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict = {}
metric = "distribution_" + meta_field
print("Calculating {}".format(metric))
values_count_total = {}
for bucket in buckets:
values_count_total[bucket]=0
for split in dataset_hf.keys():
out_dict[split] = {}
# extract speakers from file_id
meta_info = dataset_hf[split][meta_field]
meta_info_not_null = [x for x in meta_info if x != "N/A"]
if len(meta_info_not_null) == 0:
out_dict[split]="N/A"
no_meta=True
continue
for bucket in buckets:
values_count = meta_info_not_null.count(bucket)
values_count_total[bucket] += values_count
out_dict[split][bucket] = round(values_count/len(meta_info_not_null),2)
#print(split, out_dict[split])
# add number of samples for all splits
if (no_meta):
out_dict["all_splits"] = "N/A"
return out_dict
out_dict["all_splits"] = {}
# calculate total number of samples in values_count_total
for bucket in buckets:
total_samples = sum(values_count_total.values())
out_dict["all_splits"][bucket] = round(values_count_total[bucket]/total_samples,2)
return out_dict
def recordings_per_speaker(dataset_hf):
recordings_per_speaker_stats_dict = {}
# input - huggingface dataset object
# output - dictionary with statistics about audio duration per split
out_dict_stats = {}
out_dict_contents = {}
metric = "recordings_per_speaker"
print("Calculating {}".format(metric))
recordings_per_speaker_stats_dict_all = {}
recordings_total=0
speakers_total = 0
for split in dataset_hf.keys():
# extract speakers from file_id
audiopaths = dataset_hf[split]["audioname"]
speaker_prefixes = [str(fileid).split("-")[0:5] for fileid in audiopaths]
speakers_dict_split = {}
# create dictionary with list of audio paths matching speaker prefix
# Create initial dictionary keys from speaker prefixes
for speaker_prefix in speaker_prefixes:
speaker_prefix_str = "-".join(speaker_prefix)
speakers_dict_split[speaker_prefix_str] = []
# Populate the dictionary with matching audio paths
for audio_path in audiopaths:
for speaker_prefix_str in speakers_dict_split.keys():
if speaker_prefix_str in audio_path:
speakers_dict_split[speaker_prefix_str].append(audio_path)
# iterate of speaker_dict prefixes and calculate number of recordings per speaker.
recordings_per_speaker_stats_dict_split = {}
for speaker_prefix_str in speakers_dict_split.keys():
recordings_per_speaker_stats_dict_split[speaker_prefix_str] = len(speakers_dict_split[speaker_prefix_str])
out_dict_contents[split] = {}
out_dict_contents[split] = recordings_per_speaker_stats_dict_split
# use recordings_per_speaker_stats to calculate statistics like min, max, avg, median, std
out_dict_stats[split] = {}
speakers_split = len(list(recordings_per_speaker_stats_dict_split.keys()))
speakers_total += speakers_split
recordings_split = len(audiopaths)
recordings_total += recordings_split
average_recordings_per_speaker = round( recordings_split / speakers_split,2)
out_dict_stats[split]["average"] = average_recordings_per_speaker
out_dict_stats[split]["std"] = round(np.std(list(recordings_per_speaker_stats_dict_split.values())),2)
out_dict_stats[split]["median"] = np.median(list(recordings_per_speaker_stats_dict_split.values()))
out_dict_stats[split]["min"] = min(recordings_per_speaker_stats_dict_split.values())
out_dict_stats[split]["max"] = max(recordings_per_speaker_stats_dict_split.values())
recordings_per_speaker_stats_dict_all = recordings_per_speaker_stats_dict_all | recordings_per_speaker_stats_dict_split
# add number of samples for all splits
average_recordings_per_speaker_all = round( recordings_total / speakers_total , 2)
out_dict_stats["all_splits"] = {}
out_dict_stats["all_splits"]["average"] = average_recordings_per_speaker_all
out_dict_stats["all_splits"]["std"] = round(np.std(list(recordings_per_speaker_stats_dict_all.values())),2)
out_dict_stats["all_splits"]["median"] = np.median(list(recordings_per_speaker_stats_dict_all.values()))
out_dict_stats["all_splits"]["min"] = min(recordings_per_speaker_stats_dict_all.values())
out_dict_stats["all_splits"]["max"] = max(recordings_per_speaker_stats_dict_all.values())
out_dict_contents["all_splits"] = recordings_per_speaker_stats_dict_all
return out_dict_stats, out_dict_contents
def meta_distribution_bar_plot(dataset_hf, output_dir, dimension = "speaker_sex"):
pass
def meta_distribution_violin_plot(dataset_hf, output_dir, metric = "audio_duration_seconds", dimension = "speaker_sex"):
# input - huggingface dataset object
# output - figure with distribution of audio duration per sex
out_dict = {}
print("Generating violin plat for metric {} for dimension {}".format(metric, dimension))
# drop samples for which dimension column values are equal to "N/A"
for split in dataset_hf.keys():
df_dataset = pd.DataFrame(dataset_hf[split])
# remove values equal to "N/A" for column dimension
df_filtered = df_dataset[df_dataset[dimension] != "N/A"]
df_filtered = df_filtered[df_filtered[dimension] != "other"]
df_filtered = df_filtered[df_filtered[dimension] != "unknown"]
if df_filtered.empty:
print("No data for split {} and dimension {}".format(split, dimension))
continue
if (len(df_filtered)>=5000):
sample_size = 5000
print("Selecting sample of size {}".format(sample_size))
else:
sample_size = len(df_filtered)
print("Selecting full split of size {}".format(sample_size))
df = df_filtered.sample(sample_size)
# if df_filtered is empty, skip violin plot generation for this split and dimension
print("Generating plot")
plt.figure(figsize=(20, 15))
plot = sns.violinplot(data = df, hue=dimension, x='dataset', y=metric, split=True, fill = False,inner = 'quart', legend='auto', common_norm=True)
plot.set_xticklabels(plot.get_xticklabels(), rotation = 30, horizontalalignment = 'right')
plt.title('Violin plot of {} by {} for split {}'.format(metric, dimension, split))
plt.xlabel(dimension)
plt.ylabel(metric)
#plt.show(
# save figure to file
os.makedirs(output_dir, exist_ok=True)
output_fn = os.path.join(output_dir, metric + "-" + dimension + "-" + split + ".png")
plt.savefig(output_fn)
print("Plot generation completed")
def read_reports(dataset_name):
json_contents = "./reports/{}/dataset_contents.json".format(dataset_name)
json_stats = "reports/{}/dataset_statistics.json".format(dataset_name)
with open(json_contents, 'r') as file:
contents_dict = json.load(file)
with open(json_stats, 'r') as file:
stats_dict = json.load(file)
return(stats_dict, contents_dict)
def add_test_split_stats_from_secret_dataset(stats_dict_public, stats_dict_secret):
# merge contents if dictionaries for fields utts, words, words_unique, chars, chars_unique and speech_rate
for dataset in stats_dict_public.keys():
print(dataset)
for metric in stats_dict_secret[dataset].keys():
for split in stats_dict_secret[dataset][metric].keys():
if split == "test":
stats_dict_public[dataset][metric][split] = stats_dict_secret[dataset][metric][split]
return(stats_dict_public)
def dict_to_multindex_df(dict_in, all_splits=False):
# Creating a MultiIndex DataFrame
rows = []
for dataset, metrics in dict_in.items():
if (dataset == "all"):
continue
for metric, splits in metrics.items():
for split, value in splits.items():
if (all_splits):
if (split == "all_splits"):
rows.append((dataset, metric, split, value))
else:
if (split == "all_splits"):
continue
rows.append((dataset, metric, split, value))
# Convert to DataFrame
df = pd.DataFrame(rows, columns=['dataset', 'metric', 'split', 'value'])
df.set_index(['dataset', 'metric', 'split'], inplace=True)
return(df)
def dict_to_multindex_df_all_splits(dict_in):
# Creating a MultiIndex DataFrame
rows = []
for dataset, metrics in dict_in.items():
if (dataset == "all"):
continue
for metric, splits in metrics.items():
for split, value in splits.items():
if (split == "all_splits"):
rows.append((dataset, metric, split, value))
# Convert to DataFrame
df = pd.DataFrame(rows, columns=['dataset', 'metric', 'split', 'value'])
df.set_index(['dataset', 'metric', 'split'], inplace=True)
return(df)
def extract_stats_to_agg(df_multindex_per_split, metrics):
# input - multiindex dataframe has three indexes - dataset, metric, split
# select only relevant metrics
df_agg_splits = df_multindex_per_split.loc[(slice(None), metrics), :]
# unstack - move rows per split to columns
df_agg_splits = df_agg_splits.unstack(level ='split')
# aggregate values for all splits
df_agg_splits['value', 'total'] = df_agg_splits['value'].sum(axis=1)
# drop columns with splits
df_agg_splits.columns = df_agg_splits.columns.droplevel(0)
columns_to_drop = ['test', 'train', 'validation']
df_agg_splits.drop(columns = columns_to_drop, inplace = True)
# move rows corresponding to specific metrics into specific columns
df_agg_splits = df_agg_splits.unstack(level ='metric')
df_agg_splits.columns = df_agg_splits.columns.droplevel(0)
return(df_agg_splits)
def extract_stats_all_splits(df_multiindex_all_splits, metrics):
df_all_splits = df_multiindex_all_splits.loc[(slice(None), metrics), :]
df_all_splits = df_all_splits.unstack(level ='metric')
df_all_splits.columns = df_all_splits.columns.droplevel(0)
#print(df_all_splits)
df_all_splits = df_all_splits.droplevel('split', axis=0)
return(df_all_splits)
def extract_stats_for_dataset_card(df_multindex_per_split, subset, metrics, add_total=False):
print(df_multindex_per_split)
df_metrics_subset = df_multindex_per_split
df_metrics_subset = df_metrics_subset.unstack(level ='split')
df_metrics_subset.columns = df_metrics_subset.columns.droplevel(0)
df_metrics_subset = df_metrics_subset.loc[(slice(None), metrics), :]
df_metrics_subset = df_metrics_subset.query("dataset == '{}'".format(subset))
# change order of columns to train validation test
df_metrics_subset.reset_index(inplace=True)
if (add_total):
new_columns = ['metric', 'train', 'validation', 'test', 'total']
total = df_metrics_subset[['train', 'validation','test']].sum(axis=1)
df_metrics_subset['total'] = total
else:
new_columns = ['metric', 'train', 'validation', 'test']
df_metrics_subset = df_metrics_subset.reindex(columns=new_columns)
df_metrics_subset.set_index('metric', inplace=True)
return(df_metrics_subset)