|
|
|
|
|
|
|
|
|
|
|
import math |
|
import random |
|
import os |
|
import json |
|
|
|
import numpy as np |
|
import parselmouth |
|
import torch |
|
import torchaudio |
|
from tqdm import tqdm |
|
|
|
from audiomentations import TimeStretch |
|
|
|
from pedalboard import ( |
|
Pedalboard, |
|
HighShelfFilter, |
|
LowShelfFilter, |
|
PeakFilter, |
|
PitchShift, |
|
) |
|
|
|
from utils.util import has_existed |
|
|
|
PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT = 0.0 |
|
PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT = 1.0 |
|
PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT = 1.0 |
|
PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT = 1.0 |
|
PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT = 1.0 |
|
|
|
|
|
def wav_to_Sound(wav, sr: int) -> parselmouth.Sound: |
|
"""Convert a waveform to a parselmouth.Sound object |
|
|
|
Args: |
|
wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) |
|
sr (int, optional): sampling rate. |
|
|
|
Returns: |
|
parselmouth.Sound: a parselmouth.Sound object |
|
""" |
|
assert wav.shape == (1, len(wav[0])), "wav must be of shape (1, n_samples)" |
|
sound = None |
|
if isinstance(wav, np.ndarray): |
|
sound = parselmouth.Sound(wav[0], sampling_frequency=sr) |
|
elif isinstance(wav, torch.Tensor): |
|
sound = parselmouth.Sound(wav[0].numpy(), sampling_frequency=sr) |
|
assert sound is not None, "wav must be either np.ndarray or torch.Tensor" |
|
return sound |
|
|
|
|
|
def get_pitch_median(wav, sr: int): |
|
"""Get the median pitch of a waveform |
|
|
|
Args: |
|
wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) |
|
sr (int, optional): sampling rate. |
|
|
|
Returns: |
|
parselmouth.Pitch, float: a parselmouth.Pitch object and the median pitch |
|
""" |
|
if not isinstance(wav, parselmouth.Sound): |
|
sound = wav_to_Sound(wav, sr) |
|
else: |
|
sound = wav |
|
pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
|
|
|
|
|
pitch = parselmouth.praat.call(sound, "To Pitch", 0.8 / 75, 75, 600) |
|
|
|
pitch_median = parselmouth.praat.call(pitch, "Get quantile", 0.0, 0.0, 0.5, "Hertz") |
|
|
|
return pitch, pitch_median |
|
|
|
|
|
def change_gender( |
|
sound, |
|
pitch=None, |
|
formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, |
|
new_pitch_median: float = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT, |
|
pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, |
|
duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, |
|
) -> parselmouth.Sound: |
|
"""Invoke change gender function in praat |
|
|
|
Args: |
|
sound (parselmouth.Sound): a parselmouth.Sound object |
|
pitch (parselmouth.Pitch, optional): a parselmouth.Pitch object. Defaults to None. |
|
formant_shift_ratio (float, optional): formant shift ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch. Less than 1.0 means lower pitch. |
|
new_pitch_median (float, optional): new pitch median. |
|
pitch_range_ratio (float, optional): pitch range ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch range. Less than 1.0 means lower pitch range. |
|
duration_factor (float, optional): duration factor. A value of 1.0 means no change. Greater than 1.0 means longer duration. Less than 1.0 means shorter duration. |
|
|
|
Returns: |
|
parselmouth.Sound: a parselmouth.Sound object |
|
""" |
|
if pitch is None: |
|
new_sound = parselmouth.praat.call( |
|
sound, |
|
"Change gender", |
|
75, |
|
600, |
|
formant_shift_ratio, |
|
new_pitch_median, |
|
pitch_range_ratio, |
|
duration_factor, |
|
) |
|
else: |
|
new_sound = parselmouth.praat.call( |
|
(sound, pitch), |
|
"Change gender", |
|
formant_shift_ratio, |
|
new_pitch_median, |
|
pitch_range_ratio, |
|
duration_factor, |
|
) |
|
return new_sound |
|
|
|
|
|
def apply_formant_and_pitch_shift( |
|
sound: parselmouth.Sound, |
|
formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, |
|
pitch_shift_ratio: float = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT, |
|
pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, |
|
duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, |
|
) -> parselmouth.Sound: |
|
"""use Praat "Changer gender" command to manipulate pitch and formant |
|
"Change gender": Praat -> Sound Object -> Convert -> Change gender |
|
refer to Help of Praat for more details |
|
# https://github.com/YannickJadoul/Parselmouth/issues/25#issuecomment-608632887 might help |
|
""" |
|
pitch = None |
|
new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
|
if pitch_shift_ratio != 1.0: |
|
pitch, pitch_median = get_pitch_median(sound, sound.sampling_frequency) |
|
new_pitch_median = pitch_median * pitch_shift_ratio |
|
|
|
|
|
pitch_minimum = parselmouth.praat.call( |
|
pitch, "Get minimum", 0.0, 0.0, "Hertz", "Parabolic" |
|
) |
|
new_median = pitch_median * pitch_shift_ratio |
|
scaled_minimum = pitch_minimum * pitch_shift_ratio |
|
result_minimum = new_median + (scaled_minimum - new_median) * pitch_range_ratio |
|
if result_minimum < 0: |
|
new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
|
pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
|
|
|
if math.isnan(new_pitch_median): |
|
new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
|
pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
|
|
|
new_sound = change_gender( |
|
sound, |
|
pitch, |
|
formant_shift_ratio, |
|
new_pitch_median, |
|
pitch_range_ratio, |
|
duration_factor, |
|
) |
|
return new_sound |
|
|
|
|
|
|
|
def pedalboard_equalizer(wav: np.ndarray, sr: int) -> np.ndarray: |
|
"""Use pedalboard to do equalizer""" |
|
board = Pedalboard() |
|
|
|
cutoff_low_freq = 60 |
|
cutoff_high_freq = 10000 |
|
|
|
q_min = 2 |
|
q_max = 5 |
|
|
|
random_all_freq = True |
|
num_filters = 10 |
|
if random_all_freq: |
|
key_freqs = [random.uniform(1, 12000) for _ in range(num_filters)] |
|
else: |
|
key_freqs = [ |
|
power_ratio(float(z) / (num_filters - 1), cutoff_low_freq, cutoff_high_freq) |
|
for z in range(num_filters) |
|
] |
|
q_values = [ |
|
power_ratio(random.uniform(0, 1), q_min, q_max) for _ in range(num_filters) |
|
] |
|
gains = [random.uniform(-12, 12) for _ in range(num_filters)] |
|
|
|
board.append( |
|
LowShelfFilter( |
|
cutoff_frequency_hz=key_freqs[0], gain_db=gains[0], q=q_values[0] |
|
) |
|
) |
|
|
|
for i in range(1, 9): |
|
board.append( |
|
PeakFilter( |
|
cutoff_frequency_hz=key_freqs[i], gain_db=gains[i], q=q_values[i] |
|
) |
|
) |
|
|
|
board.append( |
|
HighShelfFilter( |
|
cutoff_frequency_hz=key_freqs[9], gain_db=gains[9], q=q_values[9] |
|
) |
|
) |
|
|
|
|
|
processed_audio = board(wav, sr) |
|
return processed_audio |
|
|
|
|
|
def power_ratio(r: float, a: float, b: float): |
|
return a * math.pow((b / a), r) |
|
|
|
|
|
def audiomentations_time_stretch(wav: np.ndarray, sr: int) -> np.ndarray: |
|
"""Use audiomentations to do time stretch""" |
|
transform = TimeStretch( |
|
min_rate=0.8, max_rate=1.25, leave_length_unchanged=False, p=1.0 |
|
) |
|
augmented_wav = transform(wav, sample_rate=sr) |
|
return augmented_wav |
|
|
|
|
|
def formant_and_pitch_shift( |
|
sound: parselmouth.Sound, fs: bool, ps: bool |
|
) -> parselmouth.Sound: |
|
""" """ |
|
formant_shift_ratio = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT |
|
pitch_shift_ratio = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT |
|
pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
|
|
|
assert fs != ps, "fs, ps are mutually exclusive" |
|
|
|
if fs: |
|
formant_shift_ratio = random.uniform(1.0, 1.4) |
|
use_reciprocal = random.uniform(-1, 1) > 0 |
|
if use_reciprocal: |
|
formant_shift_ratio = 1.0 / formant_shift_ratio |
|
|
|
new_sound = apply_formant_and_pitch_shift( |
|
sound, |
|
formant_shift_ratio=formant_shift_ratio, |
|
) |
|
return new_sound |
|
|
|
if ps: |
|
board = Pedalboard() |
|
board.append(PitchShift(random.uniform(-12, 12))) |
|
wav_numpy = sound.values |
|
wav_numpy = board(wav_numpy, sound.sampling_frequency) |
|
|
|
new_sound = parselmouth.Sound( |
|
wav_numpy, sampling_frequency=sound.sampling_frequency |
|
) |
|
return new_sound |
|
|
|
|
|
def wav_manipulation( |
|
wav: torch.Tensor, |
|
sr: int, |
|
aug_type: str = "None", |
|
formant_shift: bool = False, |
|
pitch_shift: bool = False, |
|
time_stretch: bool = False, |
|
equalizer: bool = False, |
|
) -> torch.Tensor: |
|
assert aug_type == "None" or aug_type in [ |
|
"formant_shift", |
|
"pitch_shift", |
|
"time_stretch", |
|
"equalizer", |
|
], "aug_type must be one of formant_shift, pitch_shift, time_stretch, equalizer" |
|
|
|
assert aug_type == "None" or ( |
|
formant_shift == False |
|
and pitch_shift == False |
|
and time_stretch == False |
|
and equalizer == False |
|
), "if aug_type is specified, other argument must be False" |
|
|
|
if aug_type != "None": |
|
if aug_type == "formant_shift": |
|
formant_shift = True |
|
if aug_type == "pitch_shift": |
|
pitch_shift = True |
|
if aug_type == "equalizer": |
|
equalizer = True |
|
if aug_type == "time_stretch": |
|
time_stretch = True |
|
|
|
wav_numpy = wav.numpy() |
|
|
|
if equalizer: |
|
wav_numpy = pedalboard_equalizer(wav_numpy, sr) |
|
|
|
if time_stretch: |
|
wav_numpy = audiomentations_time_stretch(wav_numpy, sr) |
|
|
|
sound = wav_to_Sound(wav_numpy, sr) |
|
|
|
if formant_shift or pitch_shift: |
|
sound = formant_and_pitch_shift(sound, formant_shift, pitch_shift) |
|
|
|
wav = torch.from_numpy(sound.values).float() |
|
|
|
return wav |
|
|
|
|
|
def augment_dataset(cfg, dataset) -> list: |
|
"""Augment dataset with formant_shift, pitch_shift, time_stretch, equalizer |
|
|
|
Args: |
|
cfg (dict): configuration |
|
dataset (str): dataset name |
|
|
|
Returns: |
|
list: augmented dataset names |
|
""" |
|
|
|
dataset_path = os.path.join(cfg.preprocess.processed_dir, dataset) |
|
split = ["train", "test"] if "eval" not in dataset else ["test"] |
|
augment_datasets = [] |
|
aug_types = [ |
|
"formant_shift" if cfg.preprocess.use_formant_shift else None, |
|
"pitch_shift" if cfg.preprocess.use_pitch_shift else None, |
|
"time_stretch" if cfg.preprocess.use_time_stretch else None, |
|
"equalizer" if cfg.preprocess.use_equalizer else None, |
|
] |
|
aug_types = filter(None, aug_types) |
|
for aug_type in aug_types: |
|
print("Augmenting {} with {}...".format(dataset, aug_type)) |
|
new_dataset = dataset + "_" + aug_type |
|
augment_datasets.append(new_dataset) |
|
new_dataset_path = os.path.join(cfg.preprocess.processed_dir, new_dataset) |
|
|
|
for dataset_type in split: |
|
metadata_path = os.path.join(dataset_path, "{}.json".format(dataset_type)) |
|
augmented_metadata = [] |
|
new_metadata_path = os.path.join( |
|
new_dataset_path, "{}.json".format(dataset_type) |
|
) |
|
os.makedirs(new_dataset_path, exist_ok=True) |
|
new_dataset_wav_dir = os.path.join(new_dataset_path, "wav") |
|
os.makedirs(new_dataset_wav_dir, exist_ok=True) |
|
|
|
if has_existed(new_metadata_path): |
|
continue |
|
|
|
with open(metadata_path, "r") as f: |
|
metadata = json.load(f) |
|
|
|
for utt in tqdm(metadata): |
|
original_wav_path = utt["Path"] |
|
original_wav, sr = torchaudio.load(original_wav_path) |
|
new_wav = wav_manipulation(original_wav, sr, aug_type=aug_type) |
|
new_wav_path = os.path.join(new_dataset_wav_dir, utt["Uid"] + ".wav") |
|
torchaudio.save(new_wav_path, new_wav, sr) |
|
new_utt = { |
|
"Dataset": utt["Dataset"] + "_" + aug_type, |
|
"index": utt["index"], |
|
"Singer": utt["Singer"], |
|
"Uid": utt["Uid"], |
|
"Path": new_wav_path, |
|
"Duration": utt["Duration"], |
|
} |
|
augmented_metadata.append(new_utt) |
|
new_metadata_path = os.path.join( |
|
new_dataset_path, "{}.json".format(dataset_type) |
|
) |
|
with open(new_metadata_path, "w") as f: |
|
json.dump(augmented_metadata, f, indent=4, ensure_ascii=False) |
|
return augment_datasets |
|
|