# Copyright (c) 2023 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. """ This code is modified from https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/performance.html""" import os import subprocess from multiprocessing import Pool from tqdm import tqdm import torchaudio from pathlib import Path def remove_empty_dirs(path): """remove empty directories in a given path""" # Check if the given path is a directory if not os.path.isdir(path): print(f"{path} is not a directory") return # Walk through all directories and subdirectories for root, dirs, _ in os.walk(path, topdown=False): for dir in dirs: dir_path = os.path.join(root, dir) # Check if the directory is empty if not os.listdir(dir_path): os.rmdir(dir_path) # "Removed empty directory def process_single_wav_file(task): """process a single wav file""" wav_file, output_dir = task speaker_id, book_name, filename = Path(wav_file).parts[-3:] output_book_dir = Path(output_dir, speaker_id) output_book_dir.mkdir(parents=True, exist_ok=True) new_filename = f"{speaker_id}_{book_name}_{filename}" new_wav_file = Path(output_book_dir, new_filename) command = [ "ffmpeg", "-nostdin", "-hide_banner", "-loglevel", "error", "-nostats", "-i", wav_file, "-acodec", "pcm_s16le", "-ar", "16000", new_wav_file, ] subprocess.check_call( command ) # Run the command to convert the file to 16kHz and 16-bit PCM os.remove(wav_file) def process_wav_files(wav_files, output_dir, n_process): """process wav files in parallel""" tasks = [(wav_file, output_dir) for wav_file in wav_files] print(f"Processing {len(tasks)} files") with Pool(processes=n_process) as pool: for _ in tqdm( pool.imap_unordered(process_single_wav_file, tasks), total=len(tasks) ): pass print("Removing empty directories...") remove_empty_dirs(output_dir) print("Done!") def get_wav_files(dataset_path): """get all wav files in the dataset""" wav_files = [] for speaker_id in os.listdir(dataset_path): speaker_dir = os.path.join(dataset_path, speaker_id) if not os.path.isdir(speaker_dir): continue for book_name in os.listdir(speaker_dir): book_dir = os.path.join(speaker_dir, book_name) if not os.path.isdir(book_dir): continue for file in os.listdir(book_dir): if file.endswith(".wav"): wav_files.append(os.path.join(book_dir, file)) print("Found {} wav files".format(len(wav_files))) return wav_files def filter_wav_files_by_length(wav_files, max_len_sec=15): """filter wav files by length""" print("original wav files: {}".format(len(wav_files))) filtered_wav_files = [] for audio_file in wav_files: metadata = torchaudio.info(str(audio_file)) audio_length = metadata.num_frames / metadata.sample_rate if audio_length <= max_len_sec: filtered_wav_files.append(audio_file) else: os.remove(audio_file) print("filtered wav files: {}".format(len(filtered_wav_files))) return filtered_wav_files if __name__ == "__main__": dataset_path = "/path/to/output/directory" n_process = 16 max_len_sec = 15 wav_files = get_wav_files(dataset_path) filtered_wav_files = filter_wav_files_by_length(wav_files, max_len_sec) process_wav_files(filtered_wav_files, dataset_path, n_process)