import warnings warnings.filterwarnings("ignore") import parselmouth import os import torch from skimage.transform import resize from utils.text_encoder import TokenTextEncoder from utils.pitch_utils import f0_to_coarse import struct import webrtcvad from scipy.ndimage.morphology import binary_dilation import librosa import numpy as np from utils import audio import pyloudnorm as pyln import re import json from collections import OrderedDict PUNCS = '!,.?;:' int16_max = (2 ** 15) - 1 def trim_long_silences(path, sr=None, return_raw_wav=False, norm=True, vad_max_silence_length=12): """ Ensures that segments without voice in the waveform remain no longer than a threshold determined by the VAD parameters in params.py. :param wav: the raw waveform as a numpy array of floats :param vad_max_silence_length: Maximum number of consecutive silent frames a segment can have. :return: the same waveform with silences trimmed away (length <= original wav length) """ ## Voice Activation Detection # Window size of the VAD. Must be either 10, 20 or 30 milliseconds. # This sets the granularity of the VAD. Should not need to be changed. sampling_rate = 16000 wav_raw, sr = librosa.core.load(path, sr=sr) if norm: meter = pyln.Meter(sr) # create BS.1770 meter loudness = meter.integrated_loudness(wav_raw) wav_raw = pyln.normalize.loudness(wav_raw, loudness, -20.0) if np.abs(wav_raw).max() > 1.0: wav_raw = wav_raw / np.abs(wav_raw).max() wav = librosa.resample(wav_raw, sr, sampling_rate, res_type='kaiser_best') vad_window_length = 30 # In milliseconds # Number of frames to average together when performing the moving average smoothing. # The larger this value, the larger the VAD variations must be to not get smoothed out. vad_moving_average_width = 8 # Compute the voice detection window size samples_per_window = (vad_window_length * sampling_rate) // 1000 # Trim the end of the audio to have a multiple of the window size wav = wav[:len(wav) - (len(wav) % samples_per_window)] # Convert the float waveform to 16-bit mono PCM pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16)) # Perform voice activation detection voice_flags = [] vad = webrtcvad.Vad(mode=3) for window_start in range(0, len(wav), samples_per_window): window_end = window_start + samples_per_window voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2], sample_rate=sampling_rate)) voice_flags = np.array(voice_flags) # Smooth the voice detection with a moving average def moving_average(array, width): array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2))) ret = np.cumsum(array_padded, dtype=float) ret[width:] = ret[width:] - ret[:-width] return ret[width - 1:] / width audio_mask = moving_average(voice_flags, vad_moving_average_width) audio_mask = np.round(audio_mask).astype(np.bool) # Dilate the voiced regions audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1)) audio_mask = np.repeat(audio_mask, samples_per_window) audio_mask = resize(audio_mask, (len(wav_raw),)) > 0 if return_raw_wav: return wav_raw, audio_mask, sr return wav_raw[audio_mask], audio_mask, sr def process_utterance(wav_path, fft_size=1024, hop_size=256, win_length=1024, window="hann", num_mels=80, fmin=80, fmax=7600, eps=1e-6, sample_rate=22050, loud_norm=False, min_level_db=-100, return_linear=False, trim_long_sil=False, vocoder='pwg'): if isinstance(wav_path, str): if trim_long_sil: wav, _, _ = trim_long_silences(wav_path, sample_rate) else: wav, _ = librosa.core.load(wav_path, sr=sample_rate) else: wav = wav_path if loud_norm: meter = pyln.Meter(sample_rate) # create BS.1770 meter loudness = meter.integrated_loudness(wav) wav = pyln.normalize.loudness(wav, loudness, -22.0) if np.abs(wav).max() > 1: wav = wav / np.abs(wav).max() # get amplitude spectrogram x_stft = librosa.stft(wav, n_fft=fft_size, hop_length=hop_size, win_length=win_length, window=window, pad_mode="constant") spc = np.abs(x_stft) # (n_bins, T) # get mel basis fmin = 0 if fmin == -1 else fmin fmax = sample_rate / 2 if fmax == -1 else fmax mel_basis = librosa.filters.mel(sample_rate, fft_size, num_mels, fmin, fmax) mel = mel_basis @ spc if vocoder == 'pwg': mel = np.log10(np.maximum(eps, mel)) # (n_mel_bins, T) else: assert False, f'"{vocoder}" is not in ["pwg"].' l_pad, r_pad = audio.librosa_pad_lr(wav, fft_size, hop_size, 1) wav = np.pad(wav, (l_pad, r_pad), mode='constant', constant_values=0.0) wav = wav[:mel.shape[1] * hop_size] if not return_linear: return wav, mel else: spc = audio.amp_to_db(spc) spc = audio.normalize(spc, {'min_level_db': min_level_db}) return wav, mel, spc def get_pitch(wav_data, mel, hparams): """ :param wav_data: [T] :param mel: [T, 80] :param hparams: :return: """ time_step = hparams['hop_size'] / hparams['audio_sample_rate'] * 1000 f0_min = 80 f0_max = 750 if hparams['hop_size'] == 128: pad_size = 4 elif hparams['hop_size'] == 256: pad_size = 2 else: assert False f0 = parselmouth.Sound(wav_data, hparams['audio_sample_rate']).to_pitch_ac( time_step=time_step / 1000, voicing_threshold=0.6, pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency'] lpad = pad_size * 2 rpad = len(mel) - len(f0) - lpad f0 = np.pad(f0, [[lpad, rpad]], mode='constant') # mel and f0 are extracted by 2 different libraries. we should force them to have the same length. # Attention: we find that new version of some libraries could cause ``rpad'' to be a negetive value... # Just to be sure, we recommend users to set up the same environments as them in requirements_auto.txt (by Anaconda) delta_l = len(mel) - len(f0) assert np.abs(delta_l) <= 8 if delta_l > 0: f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0) f0 = f0[:len(mel)] pitch_coarse = f0_to_coarse(f0) return f0, pitch_coarse def remove_empty_lines(text): """remove empty lines""" assert (len(text) > 0) assert (isinstance(text, list)) text = [t.strip() for t in text] if "" in text: text.remove("") return text class TextGrid(object): def __init__(self, text): text = remove_empty_lines(text) self.text = text self.line_count = 0 self._get_type() self._get_time_intval() self._get_size() self.tier_list = [] self._get_item_list() def _extract_pattern(self, pattern, inc): """ Parameters ---------- pattern : regex to extract pattern inc : increment of line count after extraction Returns ------- group : extracted info """ try: group = re.match(pattern, self.text[self.line_count]).group(1) self.line_count += inc except AttributeError: raise ValueError("File format error at line %d:%s" % (self.line_count, self.text[self.line_count])) return group def _get_type(self): self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2) def _get_time_intval(self): self.xmin = self._extract_pattern(r"xmin = (.*)", 1) self.xmax = self._extract_pattern(r"xmax = (.*)", 2) def _get_size(self): self.size = int(self._extract_pattern(r"size = (.*)", 2)) def _get_item_list(self): """Only supports IntervalTier currently""" for itemIdx in range(1, self.size + 1): tier = OrderedDict() item_list = [] tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1) tier_class = self._extract_pattern(r"class = \"(.*)\"", 1) if tier_class != "IntervalTier": raise NotImplementedError("Only IntervalTier class is supported currently") tier_name = self._extract_pattern(r"name = \"(.*)\"", 1) tier_xmin = self._extract_pattern(r"xmin = (.*)", 1) tier_xmax = self._extract_pattern(r"xmax = (.*)", 1) tier_size = self._extract_pattern(r"intervals: size = (.*)", 1) for i in range(int(tier_size)): item = OrderedDict() item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1) item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1) item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1) item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1) item_list.append(item) tier["idx"] = tier_idx tier["class"] = tier_class tier["name"] = tier_name tier["xmin"] = tier_xmin tier["xmax"] = tier_xmax tier["size"] = tier_size tier["items"] = item_list self.tier_list.append(tier) def toJson(self): _json = OrderedDict() _json["file_type"] = self.file_type _json["xmin"] = self.xmin _json["xmax"] = self.xmax _json["size"] = self.size _json["tiers"] = self.tier_list return json.dumps(_json, ensure_ascii=False, indent=2) def get_mel2ph(tg_fn, ph, mel, hparams): ph_list = ph.split(" ") with open(tg_fn, "r") as f: tg = f.readlines() tg = remove_empty_lines(tg) tg = TextGrid(tg) tg = json.loads(tg.toJson()) split = np.ones(len(ph_list) + 1, np.float) * -1 tg_idx = 0 ph_idx = 0 tg_align = [x for x in tg['tiers'][-1]['items']] tg_align_ = [] for x in tg_align: x['xmin'] = float(x['xmin']) x['xmax'] = float(x['xmax']) if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']: x['text'] = '' if len(tg_align_) > 0 and tg_align_[-1]['text'] == '': tg_align_[-1]['xmax'] = x['xmax'] continue tg_align_.append(x) tg_align = tg_align_ tg_len = len([x for x in tg_align if x['text'] != '']) ph_len = len([x for x in ph_list if not is_sil_phoneme(x)]) assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, tg_fn) while tg_idx < len(tg_align) or ph_idx < len(ph_list): if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]): split[ph_idx] = 1e8 ph_idx += 1 continue x = tg_align[tg_idx] if x['text'] == '' and ph_idx == len(ph_list): tg_idx += 1 continue assert ph_idx < len(ph_list), (tg_len, ph_len, tg_align, ph_list, tg_fn) ph = ph_list[ph_idx] if x['text'] == '' and not is_sil_phoneme(ph): assert False, (ph_list, tg_align) if x['text'] != '' and is_sil_phoneme(ph): ph_idx += 1 else: assert (x['text'] == '' and is_sil_phoneme(ph)) \ or x['text'].lower() == ph.lower() \ or x['text'].lower() == 'sil', (x['text'], ph) split[ph_idx] = x['xmin'] if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme(ph_list[ph_idx - 1]): split[ph_idx - 1] = split[ph_idx] ph_idx += 1 tg_idx += 1 assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align]) assert ph_idx >= len(ph_list) - 1, (ph_idx, ph_list, len(ph_list), [x['text'] for x in tg_align], tg_fn) mel2ph = np.zeros([mel.shape[0]], np.int) split[0] = 0 split[-1] = 1e8 for i in range(len(split) - 1): assert split[i] != -1 and split[i] <= split[i + 1], (split[:-1],) split = [int(s * hparams['audio_sample_rate'] / hparams['hop_size'] + 0.5) for s in split] for ph_idx in range(len(ph_list)): mel2ph[split[ph_idx]:split[ph_idx + 1]] = ph_idx + 1 mel2ph_torch = torch.from_numpy(mel2ph) T_t = len(ph_list) dur = mel2ph_torch.new_zeros([T_t + 1]).scatter_add(0, mel2ph_torch, torch.ones_like(mel2ph_torch)) dur = dur[1:].numpy() return mel2ph, dur def build_phone_encoder(data_dir): phone_list_file = os.path.join(data_dir, 'phone_set.json') phone_list = json.load(open(phone_list_file)) return TokenTextEncoder(None, vocab_list=phone_list, replace_oov=',') def build_word_encoder(data_dir): word_list_file = os.path.join(data_dir, 'word_set.json') word_list = json.load(open(word_list_file)) return TokenTextEncoder(None, vocab_list=word_list, replace_oov=',') def is_sil_phoneme(p): return not p[0].isalpha() def build_token_encoder(token_list_file): token_list = json.load(open(token_list_file)) return TokenTextEncoder(None, vocab_list=token_list, replace_oov='')