|
|
|
|
|
|
|
|
|
|
|
""" This code is modified from https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/performance.html""" |
|
|
|
import os |
|
import subprocess |
|
from multiprocessing import Pool |
|
from tqdm import tqdm |
|
import torchaudio |
|
from pathlib import Path |
|
|
|
|
|
def remove_empty_dirs(path): |
|
"""remove empty directories in a given path""" |
|
|
|
if not os.path.isdir(path): |
|
print(f"{path} is not a directory") |
|
return |
|
|
|
|
|
for root, dirs, _ in os.walk(path, topdown=False): |
|
for dir in dirs: |
|
dir_path = os.path.join(root, dir) |
|
|
|
if not os.listdir(dir_path): |
|
os.rmdir(dir_path) |
|
|
|
|
|
def process_single_wav_file(task): |
|
"""process a single wav file""" |
|
wav_file, output_dir = task |
|
speaker_id, book_name, filename = Path(wav_file).parts[-3:] |
|
|
|
output_book_dir = Path(output_dir, speaker_id) |
|
output_book_dir.mkdir(parents=True, exist_ok=True) |
|
new_filename = f"{speaker_id}_{book_name}_{filename}" |
|
|
|
new_wav_file = Path(output_book_dir, new_filename) |
|
command = [ |
|
"ffmpeg", |
|
"-nostdin", |
|
"-hide_banner", |
|
"-loglevel", |
|
"error", |
|
"-nostats", |
|
"-i", |
|
wav_file, |
|
"-acodec", |
|
"pcm_s16le", |
|
"-ar", |
|
"16000", |
|
new_wav_file, |
|
] |
|
subprocess.check_call( |
|
command |
|
) |
|
os.remove(wav_file) |
|
|
|
|
|
def process_wav_files(wav_files, output_dir, n_process): |
|
"""process wav files in parallel""" |
|
tasks = [(wav_file, output_dir) for wav_file in wav_files] |
|
print(f"Processing {len(tasks)} files") |
|
with Pool(processes=n_process) as pool: |
|
for _ in tqdm( |
|
pool.imap_unordered(process_single_wav_file, tasks), total=len(tasks) |
|
): |
|
pass |
|
print("Removing empty directories...") |
|
remove_empty_dirs(output_dir) |
|
print("Done!") |
|
|
|
|
|
def get_wav_files(dataset_path): |
|
"""get all wav files in the dataset""" |
|
wav_files = [] |
|
for speaker_id in os.listdir(dataset_path): |
|
speaker_dir = os.path.join(dataset_path, speaker_id) |
|
if not os.path.isdir(speaker_dir): |
|
continue |
|
for book_name in os.listdir(speaker_dir): |
|
book_dir = os.path.join(speaker_dir, book_name) |
|
if not os.path.isdir(book_dir): |
|
continue |
|
for file in os.listdir(book_dir): |
|
if file.endswith(".wav"): |
|
wav_files.append(os.path.join(book_dir, file)) |
|
print("Found {} wav files".format(len(wav_files))) |
|
return wav_files |
|
|
|
|
|
def filter_wav_files_by_length(wav_files, max_len_sec=15): |
|
"""filter wav files by length""" |
|
print("original wav files: {}".format(len(wav_files))) |
|
filtered_wav_files = [] |
|
for audio_file in wav_files: |
|
metadata = torchaudio.info(str(audio_file)) |
|
audio_length = metadata.num_frames / metadata.sample_rate |
|
if audio_length <= max_len_sec: |
|
filtered_wav_files.append(audio_file) |
|
else: |
|
os.remove(audio_file) |
|
print("filtered wav files: {}".format(len(filtered_wav_files))) |
|
return filtered_wav_files |
|
|
|
|
|
if __name__ == "__main__": |
|
dataset_path = "/path/to/output/directory" |
|
n_process = 16 |
|
max_len_sec = 15 |
|
wav_files = get_wav_files(dataset_path) |
|
filtered_wav_files = filter_wav_files_by_length(wav_files, max_len_sec) |
|
process_wav_files(filtered_wav_files, dataset_path, n_process) |
|
|