Spaces:
Running
Running
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import json | |
from tqdm import tqdm | |
import os | |
import torchaudio | |
import torch | |
from utils.mfa_prepare import ( | |
process_wav_files, | |
get_wav_files, | |
filter_wav_files_by_length, | |
) | |
from utils.cut_by_vad import cut_segments | |
from utils.whisper_transcription import asr_main | |
from utils.util import has_existed | |
import subprocess | |
import random | |
from collections import defaultdict | |
from glob import glob | |
import shutil | |
def librilight_statistics(data_dir): | |
"""Get statistics for librilight dataset""" | |
distribution2speakers2utts = defaultdict(lambda: defaultdict(list)) | |
distribution_infos = glob(data_dir + "/*") | |
for distribution_info in distribution_infos: | |
distribution = distribution_info.split("/")[-1] | |
print(distribution) | |
speaker_infos = glob(distribution_info + "/*") | |
if len(speaker_infos) == 0: | |
continue | |
for speaker_info in speaker_infos: | |
speaker = speaker_info.split("/")[-1] | |
utts = glob(speaker_info + "/*.wav") | |
for utt in utts: | |
uid = utt.split("/")[-1].split(".")[0] | |
distribution2speakers2utts[distribution][speaker].append(uid) | |
return distribution2speakers2utts | |
def get_speakers_from_directory(directory): | |
return [ | |
d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d)) | |
] | |
def split_dataset_by_speaker(base_dir, train_ratio=0.8, dev_ratio=0.1): | |
train_dir = os.path.join(base_dir, "train") | |
dev_dir = os.path.join(base_dir, "dev") | |
eval_dir = os.path.join(base_dir, "eval") | |
# Check if dataset is already split | |
if has_existed(train_dir) or has_existed(dev_dir) or has_existed(eval_dir): | |
print("Dataset already split. Calculating speakers...") | |
train_speakers = get_speakers_from_directory(train_dir) | |
dev_speakers = get_speakers_from_directory(dev_dir) | |
eval_speakers = get_speakers_from_directory(eval_dir) | |
all_speakers = train_speakers + dev_speakers + eval_speakers | |
unique_speakers = list(set(all_speakers)) | |
unique_speakers.sort() | |
return unique_speakers | |
# List all directories in the base directory | |
all_speakers = [ | |
d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)) | |
] | |
random.shuffle(all_speakers) | |
# Calculate split sizes | |
total_speakers = len(all_speakers) | |
train_size = int(total_speakers * train_ratio) | |
dev_size = int(total_speakers * dev_ratio) | |
eval_size = total_speakers - train_size - dev_size | |
print("Total speakers:", total_speakers) | |
print("Train speakers:", train_size) | |
print("Dev speakers:", dev_size) | |
print("Eval speakers:", eval_size) | |
# Split directories | |
train_speakers = all_speakers[:train_size] | |
dev_speakers = all_speakers[train_size : train_size + dev_size] | |
eval_speakers = all_speakers[train_size + dev_size :] | |
# Function to move directories | |
def move_speakers(speakers, target_dir): | |
for speaker in speakers: | |
shutil.move( | |
os.path.join(base_dir, speaker), os.path.join(target_dir, speaker) | |
) | |
# Move directories | |
print("Moving directories...") | |
print("Moving Train speakers...") | |
move_speakers(train_speakers, train_dir) | |
print("Moving Dev speakers...") | |
move_speakers(dev_speakers, dev_dir) | |
print("Moving Eval speakers...") | |
move_speakers(eval_speakers, eval_dir) | |
unique_speakers = list(set(all_speakers)) | |
unique_speakers.sort() | |
return unique_speakers | |
def save_meta_data(save_dir, processed_dir, distribution2speakers2utts, speakers): | |
"""Save metadata for librilight dataset""" | |
os.makedirs(save_dir, exist_ok=True) | |
train_output_file = os.path.join(save_dir, "train.json") | |
valid_output_file = os.path.join(save_dir, "dev.json") | |
test_output_file = os.path.join(save_dir, "eval.json") | |
singer_dict_file = os.path.join(save_dir, "singers.json") | |
utt2singer_file = os.path.join(save_dir, "utt2singer") | |
utt2singer = open(utt2singer_file, "w") | |
if has_existed(train_output_file): | |
print("Metadata already exists. Skipping...") | |
return | |
train = [] | |
test = [] | |
valid = [] | |
train_index_count = 0 | |
test_index_count = 0 | |
valid_index_count = 0 | |
train_total_duration = 0 | |
test_total_duration = 0 | |
valid_total_duration = 0 | |
# Save metadata | |
for distribution, speakers2utts in tqdm(distribution2speakers2utts.items()): | |
for speaker, utts in tqdm(speakers2utts.items()): | |
for chosen_uid in utts: | |
res = { | |
"Dataset": "librilight", | |
"Singer": speaker, | |
"Uid": "{}#{}#{}".format(distribution, speaker, chosen_uid), | |
} | |
res["Path"] = "{}/{}/{}.wav".format(distribution, speaker, chosen_uid) | |
res["Path"] = os.path.join(processed_dir, res["Path"]) | |
assert os.path.exists(res["Path"]) | |
text_file_path = os.path.join( | |
processed_dir, | |
distribution, | |
speaker, | |
chosen_uid + ".txt", | |
) | |
with open(text_file_path, "r") as f: | |
lines = f.readlines() | |
assert len(lines) == 1 | |
text = lines[0].strip() | |
res["Text"] = text | |
waveform, sample_rate = torchaudio.load(res["Path"]) | |
duration = waveform.size(-1) / sample_rate | |
res["Duration"] = duration | |
if "train" in distribution: | |
res["index"] = train_index_count | |
train_total_duration += duration | |
train.append(res) | |
train_index_count += 1 | |
elif "dev" in distribution: | |
res["index"] = valid_index_count | |
valid_total_duration += duration | |
valid.append(res) | |
valid_index_count += 1 | |
elif "eval" in distribution: | |
res["index"] = test_index_count | |
test_total_duration += duration | |
test.append(res) | |
test_index_count += 1 | |
utt2singer.write("{}\t{}\n".format(res["Uid"], res["Singer"])) | |
print("Done!") | |
print( | |
"Utterance count: train = {}, dev = {}, eval = {}".format( | |
len(train), len(valid), len(test) | |
) | |
) | |
print( | |
"#Train duration= {}, #Dev duration= {}, #Eval duration= {}".format( | |
train_total_duration / 3600, | |
valid_total_duration / 3600, | |
test_total_duration / 3600, | |
) | |
) | |
with open(train_output_file, "w") as f: | |
json.dump(train, f, indent=4, ensure_ascii=False) | |
with open(test_output_file, "w") as f: | |
json.dump(test, f, indent=4, ensure_ascii=False) | |
with open(valid_output_file, "w") as f: | |
json.dump(valid, f, indent=4, ensure_ascii=False) | |
utt2singer.close() | |
singer_lut = {name: i for i, name in enumerate(speakers)} | |
with open(singer_dict_file, "w") as f: | |
json.dump(singer_lut, f, indent=4, ensure_ascii=False) | |
print("Metadata saved to", save_dir) | |
def main(output_path, dataset_path, cfg): | |
"""Preprocess librilight dataset""" | |
n_cpus = cfg.n_cpus # number of cpus to use for preprocessing | |
n_gpus = cfg.n_gpus # number of gpus to use for transcription | |
cut_length = cfg.cut_length # target length of utterance in seconds | |
max_length = cfg.max_length # max length of utterance in seconds | |
# MFA files | |
mfa_config_path = cfg.mfa_config_path # path to mfa config file | |
mfa_dict_path = cfg.mfa_dict_path # path to mfa dict file | |
mfa_model_path = cfg.mfa_model_path # path to mfa model file | |
# check if mfa files exist | |
if ( | |
not os.path.exists(mfa_dict_path) | |
or not os.path.exists(mfa_model_path) | |
or not os.path.exists(mfa_config_path) | |
): | |
raise Exception("MFA files not found.") | |
# Whisper model id | |
model_id = cfg.whisper_model_id # id of whisper model to use for transcription | |
subsets = [ | |
d | |
for d in os.listdir(dataset_path) | |
if ( | |
os.path.isdir(os.path.join(dataset_path, d)) | |
and d in ["tiny", "small", "medium", "large"] | |
) | |
] | |
print("Found subsets:", subsets) | |
if len(subsets) == 0: | |
print("No subsets found. Exiting...") | |
return | |
# Preprocess each subset | |
for subset in subsets: | |
# Construct paths based on the base path | |
print("Pre-proccessing Libri-light subset:", subset) | |
raw_dir = f"{dataset_path}/{subset}" | |
save_dir = f"{output_path}/{subset}" | |
processed_dir = f"{dataset_path}/processed/{subset}" | |
os.makedirs(processed_dir, exist_ok=True) | |
os.makedirs(save_dir, exist_ok=True) | |
# Step 1: Segmentation | |
print("-" * 10) | |
print("Step 1: Segmentation") | |
print("Cutting audio files...") | |
cut_segments(raw_dir, processed_dir, cut_length, n_cpus) | |
# Steps 2 & 3: Filter and Preprocess | |
print("-" * 10) | |
print("Step 2 & 3: Filter and Preprocess") | |
print("Filtering and preprocessing audio files...") | |
wav_files = get_wav_files(processed_dir) | |
filtered_wav_files = filter_wav_files_by_length(wav_files, max_length) | |
process_wav_files(filtered_wav_files, processed_dir, n_cpus) | |
# Step 4 & 5: Transcription & Text-preprocess | |
print("-" * 10) | |
print("Step 4 & 5: Transcription & Text-preprocess") | |
print("Transcribing audio files...") | |
n_gpus = min(n_gpus, torch.cuda.device_count()) | |
asr_main(processed_dir, n_gpus, model_id) | |
# Step 6: MFA Align | |
print("-" * 10) | |
print("Step 6: MFA Align") | |
print("Aligning audio files...") | |
command = [ | |
"mfa", | |
"align", | |
"-v", | |
"-j", | |
str(n_cpus), | |
"-c", | |
mfa_config_path, | |
processed_dir, | |
mfa_dict_path, | |
mfa_model_path, | |
processed_dir, | |
"--output_format", | |
"long_textgrid", | |
"--clean", | |
"--overwrite", | |
] | |
subprocess.run(command, text=True) | |
# Step 7: train/dev/eval split | |
print("-" * 10) | |
print("Step 7: train/dev/eval split") | |
print("Splitting dataset by speaker...") | |
speakers = split_dataset_by_speaker(processed_dir) | |
# Step 8: Statistics | |
print("-" * 10) | |
print("Step 8: Statistics") | |
print("Calculating statistics...") | |
distribution2speakers2utts = librilight_statistics(processed_dir) | |
# Step 9: Save metadata | |
print("-" * 10) | |
print("Step 9: Save metadata") | |
print("Preparing Metadata for Librilight...") | |
save_meta_data(save_dir, processed_dir, distribution2speakers2utts, speakers) | |
print("Preprocessing subset", subset, "done!") | |
print("-" * 10) | |
if __name__ == "__main__": | |
dataset_path = "/path/to/dataset/librilight" | |
output_path = "/path/to/output" | |
main(output_path, dataset_path) | |