Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import math | |
import random | |
import os | |
import json | |
import numpy as np | |
import parselmouth | |
import torch | |
import torchaudio | |
from tqdm import tqdm | |
from audiomentations import TimeStretch | |
from pedalboard import ( | |
Pedalboard, | |
HighShelfFilter, | |
LowShelfFilter, | |
PeakFilter, | |
PitchShift, | |
) | |
from utils.util import has_existed | |
PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT = 0.0 | |
PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT = 1.0 | |
PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT = 1.0 | |
PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT = 1.0 | |
PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT = 1.0 | |
def wav_to_Sound(wav, sr: int) -> parselmouth.Sound: | |
"""Convert a waveform to a parselmouth.Sound object | |
Args: | |
wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) | |
sr (int, optional): sampling rate. | |
Returns: | |
parselmouth.Sound: a parselmouth.Sound object | |
""" | |
assert wav.shape == (1, len(wav[0])), "wav must be of shape (1, n_samples)" | |
sound = None | |
if isinstance(wav, np.ndarray): | |
sound = parselmouth.Sound(wav[0], sampling_frequency=sr) | |
elif isinstance(wav, torch.Tensor): | |
sound = parselmouth.Sound(wav[0].numpy(), sampling_frequency=sr) | |
assert sound is not None, "wav must be either np.ndarray or torch.Tensor" | |
return sound | |
def get_pitch_median(wav, sr: int): | |
"""Get the median pitch of a waveform | |
Args: | |
wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) | |
sr (int, optional): sampling rate. | |
Returns: | |
parselmouth.Pitch, float: a parselmouth.Pitch object and the median pitch | |
""" | |
if not isinstance(wav, parselmouth.Sound): | |
sound = wav_to_Sound(wav, sr) | |
else: | |
sound = wav | |
pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT | |
# To Pitch: Time step(s)(standard value: 0.0), Pitch floor (Hz)(standard value: 75), Pitch ceiling (Hz)(standard value: 600.0) | |
pitch = parselmouth.praat.call(sound, "To Pitch", 0.8 / 75, 75, 600) | |
# Get quantile: From time (s), To time (s), Quantile(0.5 is then the 50% quantile, i.e., the median), Units (Hertz or Bark) | |
pitch_median = parselmouth.praat.call(pitch, "Get quantile", 0.0, 0.0, 0.5, "Hertz") | |
return pitch, pitch_median | |
def change_gender( | |
sound, | |
pitch=None, | |
formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, | |
new_pitch_median: float = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT, | |
pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, | |
duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, | |
) -> parselmouth.Sound: | |
"""Invoke change gender function in praat | |
Args: | |
sound (parselmouth.Sound): a parselmouth.Sound object | |
pitch (parselmouth.Pitch, optional): a parselmouth.Pitch object. Defaults to None. | |
formant_shift_ratio (float, optional): formant shift ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch. Less than 1.0 means lower pitch. | |
new_pitch_median (float, optional): new pitch median. | |
pitch_range_ratio (float, optional): pitch range ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch range. Less than 1.0 means lower pitch range. | |
duration_factor (float, optional): duration factor. A value of 1.0 means no change. Greater than 1.0 means longer duration. Less than 1.0 means shorter duration. | |
Returns: | |
parselmouth.Sound: a parselmouth.Sound object | |
""" | |
if pitch is None: | |
new_sound = parselmouth.praat.call( | |
sound, | |
"Change gender", | |
75, | |
600, | |
formant_shift_ratio, | |
new_pitch_median, | |
pitch_range_ratio, | |
duration_factor, | |
) | |
else: | |
new_sound = parselmouth.praat.call( | |
(sound, pitch), | |
"Change gender", | |
formant_shift_ratio, | |
new_pitch_median, | |
pitch_range_ratio, | |
duration_factor, | |
) | |
return new_sound | |
def apply_formant_and_pitch_shift( | |
sound: parselmouth.Sound, | |
formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, | |
pitch_shift_ratio: float = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT, | |
pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, | |
duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, | |
) -> parselmouth.Sound: | |
"""use Praat "Changer gender" command to manipulate pitch and formant | |
"Change gender": Praat -> Sound Object -> Convert -> Change gender | |
refer to Help of Praat for more details | |
# https://github.com/YannickJadoul/Parselmouth/issues/25#issuecomment-608632887 might help | |
""" | |
pitch = None | |
new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT | |
if pitch_shift_ratio != 1.0: | |
pitch, pitch_median = get_pitch_median(sound, sound.sampling_frequency) | |
new_pitch_median = pitch_median * pitch_shift_ratio | |
# refer to https://github.com/praat/praat/issues/1926#issuecomment-974909408 | |
pitch_minimum = parselmouth.praat.call( | |
pitch, "Get minimum", 0.0, 0.0, "Hertz", "Parabolic" | |
) | |
new_median = pitch_median * pitch_shift_ratio | |
scaled_minimum = pitch_minimum * pitch_shift_ratio | |
result_minimum = new_median + (scaled_minimum - new_median) * pitch_range_ratio | |
if result_minimum < 0: | |
new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT | |
pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT | |
if math.isnan(new_pitch_median): | |
new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT | |
pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT | |
new_sound = change_gender( | |
sound, | |
pitch, | |
formant_shift_ratio, | |
new_pitch_median, | |
pitch_range_ratio, | |
duration_factor, | |
) | |
return new_sound | |
# Function used in EQ | |
def pedalboard_equalizer(wav: np.ndarray, sr: int) -> np.ndarray: | |
"""Use pedalboard to do equalizer""" | |
board = Pedalboard() | |
cutoff_low_freq = 60 | |
cutoff_high_freq = 10000 | |
q_min = 2 | |
q_max = 5 | |
random_all_freq = True | |
num_filters = 10 | |
if random_all_freq: | |
key_freqs = [random.uniform(1, 12000) for _ in range(num_filters)] | |
else: | |
key_freqs = [ | |
power_ratio(float(z) / (num_filters - 1), cutoff_low_freq, cutoff_high_freq) | |
for z in range(num_filters) | |
] | |
q_values = [ | |
power_ratio(random.uniform(0, 1), q_min, q_max) for _ in range(num_filters) | |
] | |
gains = [random.uniform(-12, 12) for _ in range(num_filters)] | |
# low-shelving filter | |
board.append( | |
LowShelfFilter( | |
cutoff_frequency_hz=key_freqs[0], gain_db=gains[0], q=q_values[0] | |
) | |
) | |
# peaking filters | |
for i in range(1, 9): | |
board.append( | |
PeakFilter( | |
cutoff_frequency_hz=key_freqs[i], gain_db=gains[i], q=q_values[i] | |
) | |
) | |
# high-shelving filter | |
board.append( | |
HighShelfFilter( | |
cutoff_frequency_hz=key_freqs[9], gain_db=gains[9], q=q_values[9] | |
) | |
) | |
# Apply the pedalboard to the audio | |
processed_audio = board(wav, sr) | |
return processed_audio | |
def power_ratio(r: float, a: float, b: float): | |
return a * math.pow((b / a), r) | |
def audiomentations_time_stretch(wav: np.ndarray, sr: int) -> np.ndarray: | |
"""Use audiomentations to do time stretch""" | |
transform = TimeStretch( | |
min_rate=0.8, max_rate=1.25, leave_length_unchanged=False, p=1.0 | |
) | |
augmented_wav = transform(wav, sample_rate=sr) | |
return augmented_wav | |
def formant_and_pitch_shift( | |
sound: parselmouth.Sound, fs: bool, ps: bool | |
) -> parselmouth.Sound: | |
""" """ | |
formant_shift_ratio = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT | |
pitch_shift_ratio = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT | |
pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT | |
assert fs != ps, "fs, ps are mutually exclusive" | |
if fs: | |
formant_shift_ratio = random.uniform(1.0, 1.4) | |
use_reciprocal = random.uniform(-1, 1) > 0 | |
if use_reciprocal: | |
formant_shift_ratio = 1.0 / formant_shift_ratio | |
# only use praat to change formant | |
new_sound = apply_formant_and_pitch_shift( | |
sound, | |
formant_shift_ratio=formant_shift_ratio, | |
) | |
return new_sound | |
if ps: | |
board = Pedalboard() | |
board.append(PitchShift(random.uniform(-12, 12))) | |
wav_numpy = sound.values | |
wav_numpy = board(wav_numpy, sound.sampling_frequency) | |
# use pedalboard to change pitch | |
new_sound = parselmouth.Sound( | |
wav_numpy, sampling_frequency=sound.sampling_frequency | |
) | |
return new_sound | |
def wav_manipulation( | |
wav: torch.Tensor, | |
sr: int, | |
aug_type: str = "None", | |
formant_shift: bool = False, | |
pitch_shift: bool = False, | |
time_stretch: bool = False, | |
equalizer: bool = False, | |
) -> torch.Tensor: | |
assert aug_type == "None" or aug_type in [ | |
"formant_shift", | |
"pitch_shift", | |
"time_stretch", | |
"equalizer", | |
], "aug_type must be one of formant_shift, pitch_shift, time_stretch, equalizer" | |
assert aug_type == "None" or ( | |
formant_shift == False | |
and pitch_shift == False | |
and time_stretch == False | |
and equalizer == False | |
), "if aug_type is specified, other argument must be False" | |
if aug_type != "None": | |
if aug_type == "formant_shift": | |
formant_shift = True | |
if aug_type == "pitch_shift": | |
pitch_shift = True | |
if aug_type == "equalizer": | |
equalizer = True | |
if aug_type == "time_stretch": | |
time_stretch = True | |
wav_numpy = wav.numpy() | |
if equalizer: | |
wav_numpy = pedalboard_equalizer(wav_numpy, sr) | |
if time_stretch: | |
wav_numpy = audiomentations_time_stretch(wav_numpy, sr) | |
sound = wav_to_Sound(wav_numpy, sr) | |
if formant_shift or pitch_shift: | |
sound = formant_and_pitch_shift(sound, formant_shift, pitch_shift) | |
wav = torch.from_numpy(sound.values).float() | |
# shape (1, n_samples) | |
return wav | |
def augment_dataset(cfg, dataset) -> list: | |
"""Augment dataset with formant_shift, pitch_shift, time_stretch, equalizer | |
Args: | |
cfg (dict): configuration | |
dataset (str): dataset name | |
Returns: | |
list: augmented dataset names | |
""" | |
# load metadata | |
dataset_path = os.path.join(cfg.preprocess.processed_dir, dataset) | |
split = ["train", "test"] if "eval" not in dataset else ["test"] | |
augment_datasets = [] | |
aug_types = [ | |
"formant_shift" if cfg.preprocess.use_formant_shift else None, | |
"pitch_shift" if cfg.preprocess.use_pitch_shift else None, | |
"time_stretch" if cfg.preprocess.use_time_stretch else None, | |
"equalizer" if cfg.preprocess.use_equalizer else None, | |
] | |
aug_types = filter(None, aug_types) | |
for aug_type in aug_types: | |
print("Augmenting {} with {}...".format(dataset, aug_type)) | |
new_dataset = dataset + "_" + aug_type | |
augment_datasets.append(new_dataset) | |
new_dataset_path = os.path.join(cfg.preprocess.processed_dir, new_dataset) | |
for dataset_type in split: | |
metadata_path = os.path.join(dataset_path, "{}.json".format(dataset_type)) | |
augmented_metadata = [] | |
new_metadata_path = os.path.join( | |
new_dataset_path, "{}.json".format(dataset_type) | |
) | |
os.makedirs(new_dataset_path, exist_ok=True) | |
new_dataset_wav_dir = os.path.join(new_dataset_path, "wav") | |
os.makedirs(new_dataset_wav_dir, exist_ok=True) | |
if has_existed(new_metadata_path): | |
continue | |
with open(metadata_path, "r") as f: | |
metadata = json.load(f) | |
for utt in tqdm(metadata): | |
original_wav_path = utt["Path"] | |
original_wav, sr = torchaudio.load(original_wav_path) | |
new_wav = wav_manipulation(original_wav, sr, aug_type=aug_type) | |
new_wav_path = os.path.join(new_dataset_wav_dir, utt["Uid"] + ".wav") | |
torchaudio.save(new_wav_path, new_wav, sr) | |
new_utt = { | |
"Dataset": utt["Dataset"] + "_" + aug_type, | |
"index": utt["index"], | |
"Singer": utt["Singer"], | |
"Uid": utt["Uid"], | |
"Path": new_wav_path, | |
"Duration": utt["Duration"], | |
} | |
augmented_metadata.append(new_utt) | |
new_metadata_path = os.path.join( | |
new_dataset_path, "{}.json".format(dataset_type) | |
) | |
with open(new_metadata_path, "w") as f: | |
json.dump(augmented_metadata, f, indent=4, ensure_ascii=False) | |
return augment_datasets | |