# Copyright (c) 2023 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import random import torch from torch.nn.utils.rnn import pad_sequence import json import os import numpy as np import librosa from utils.data_utils import * from processors.acoustic_extractor import cal_normalized_mel, load_mel_extrema from processors.content_extractor import ( ContentvecExtractor, WhisperExtractor, WenetExtractor, ) from models.base.base_dataset import ( BaseOfflineDataset, BaseOfflineCollator, BaseOnlineDataset, BaseOnlineCollator, ) from models.base.new_dataset import BaseTestDataset EPS = 1.0e-12 class SVCOfflineDataset(BaseOfflineDataset): def __init__(self, cfg, dataset, is_valid=False): BaseOfflineDataset.__init__(self, cfg, dataset, is_valid=is_valid) cfg = self.cfg if cfg.model.condition_encoder.use_whisper: self.whisper_aligner = WhisperExtractor(self.cfg) self.utt2whisper_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.whisper_dir ) if cfg.model.condition_encoder.use_contentvec: self.contentvec_aligner = ContentvecExtractor(self.cfg) self.utt2contentVec_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.contentvec_dir, ) if cfg.model.condition_encoder.use_mert: self.utt2mert_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.mert_dir ) if cfg.model.condition_encoder.use_wenet: self.wenet_aligner = WenetExtractor(self.cfg) self.utt2wenet_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.wenet_dir ) def __getitem__(self, index): single_feature = BaseOfflineDataset.__getitem__(self, index) utt_info = self.metadata[index] dataset = utt_info["Dataset"] uid = utt_info["Uid"] utt = "{}_{}".format(dataset, uid) if self.cfg.model.condition_encoder.use_whisper: assert "target_len" in single_feature.keys() aligned_whisper_feat = ( self.whisper_aligner.offline_resolution_transformation( np.load(self.utt2whisper_path[utt]), single_feature["target_len"] ) ) single_feature["whisper_feat"] = aligned_whisper_feat if self.cfg.model.condition_encoder.use_contentvec: assert "target_len" in single_feature.keys() aligned_contentvec = ( self.contentvec_aligner.offline_resolution_transformation( np.load(self.utt2contentVec_path[utt]), single_feature["target_len"] ) ) single_feature["contentvec_feat"] = aligned_contentvec if self.cfg.model.condition_encoder.use_mert: assert "target_len" in single_feature.keys() aligned_mert_feat = align_content_feature_length( np.load(self.utt2mert_path[utt]), single_feature["target_len"], source_hop=self.cfg.preprocess.mert_hop_size, ) single_feature["mert_feat"] = aligned_mert_feat if self.cfg.model.condition_encoder.use_wenet: assert "target_len" in single_feature.keys() aligned_wenet_feat = self.wenet_aligner.offline_resolution_transformation( np.load(self.utt2wenet_path[utt]), single_feature["target_len"] ) single_feature["wenet_feat"] = aligned_wenet_feat # print(single_feature.keys()) # for k, v in single_feature.items(): # if type(v) in [torch.Tensor, np.ndarray]: # print(k, v.shape) # else: # print(k, v) # exit() return self.clip_if_too_long(single_feature) def __len__(self): return len(self.metadata) def random_select(self, feature_seq_len, max_seq_len, ending_ts=2812): """ ending_ts: to avoid invalid whisper features for over 30s audios 2812 = 30 * 24000 // 256 """ ts = max(feature_seq_len - max_seq_len, 0) ts = min(ts, ending_ts - max_seq_len) start = random.randint(0, ts) end = start + max_seq_len return start, end def clip_if_too_long(self, sample, max_seq_len=512): """ sample : { 'spk_id': (1,), 'target_len': int 'mel': (seq_len, dim), 'frame_pitch': (seq_len,) 'frame_energy': (seq_len,) 'content_vector_feat': (seq_len, dim) } """ if sample["target_len"] <= max_seq_len: return sample start, end = self.random_select(sample["target_len"], max_seq_len) sample["target_len"] = end - start for k in sample.keys(): if k == "audio": # audio should be clipped in hop_size scale sample[k] = sample[k][ start * self.cfg.preprocess.hop_size : end * self.cfg.preprocess.hop_size ] elif k == "audio_len": sample[k] = (end - start) * self.cfg.preprocess.hop_size elif k not in ["spk_id", "target_len"]: sample[k] = sample[k][start:end] return sample class SVCOnlineDataset(BaseOnlineDataset): def __init__(self, cfg, dataset, is_valid=False): super().__init__(cfg, dataset, is_valid=is_valid) # Audio pretrained models' sample rates self.all_sample_rates = {self.sample_rate} if self.cfg.model.condition_encoder.use_whisper: self.all_sample_rates.add(self.cfg.preprocess.whisper_sample_rate) if self.cfg.model.condition_encoder.use_contentvec: self.all_sample_rates.add(self.cfg.preprocess.contentvec_sample_rate) if self.cfg.model.condition_encoder.use_wenet: self.all_sample_rates.add(self.cfg.preprocess.wenet_sample_rate) self.highest_sample_rate = max(list(self.all_sample_rates)) # The maximum duration (seconds) for one training sample self.max_duration = 6.0 self.max_n_frames = int(self.max_duration * self.highest_sample_rate) def random_select(self, wav, duration, wav_path): """ wav: (T,) """ if duration <= self.max_duration: return wav ts_frame = int((duration - self.max_duration) * self.highest_sample_rate) start = random.randint(0, ts_frame) end = start + self.max_n_frames if (wav[start:end] == 0).all(): print("*" * 20) print("Warning! The wav file {} has a lot of silience.".format(wav_path)) # There should be at least some frames that are not silience. Then we select them. assert (wav != 0).any() start = np.where(wav != 0)[0][0] end = start + self.max_n_frames return wav[start:end] def __getitem__(self, index): """ single_feature: dict, wav: (T,) wav_len: int target_len: int mask: (n_frames, 1) spk_id wav_{sr}: (T,) wav_{sr}_len: int """ single_feature = dict() utt_item = self.metadata[index] wav_path = utt_item["Path"] ### Use the highest sampling rate to load and randomly select ### highest_sr_wav, _ = librosa.load(wav_path, sr=self.highest_sample_rate) highest_sr_wav = self.random_select( highest_sr_wav, utt_item["Duration"], wav_path ) ### Waveforms under all the sample rates ### for sr in self.all_sample_rates: # Resample to the required sample rate if sr != self.highest_sample_rate: wav_sr = librosa.resample( highest_sr_wav, orig_sr=self.highest_sample_rate, target_sr=sr ) else: wav_sr = highest_sr_wav wav_sr = torch.as_tensor(wav_sr, dtype=torch.float32) single_feature["wav_{}".format(sr)] = wav_sr single_feature["wav_{}_len".format(sr)] = len(wav_sr) # For target sample rate if sr == self.sample_rate: wav_len = len(wav_sr) frame_len = wav_len // self.hop_size single_feature["wav"] = wav_sr single_feature["wav_len"] = wav_len single_feature["target_len"] = frame_len single_feature["mask"] = torch.ones(frame_len, 1, dtype=torch.long) ### Speaker ID ### if self.cfg.preprocess.use_spkid: utt = "{}_{}".format(utt_item["Dataset"], utt_item["Uid"]) single_feature["spk_id"] = torch.tensor( [self.spk2id[self.utt2spk[utt]]], dtype=torch.int32 ) return single_feature def __len__(self): return len(self.metadata) class SVCOfflineCollator(BaseOfflineCollator): def __init__(self, cfg): super().__init__(cfg) def __call__(self, batch): parsed_batch_features = super().__call__(batch) return parsed_batch_features class SVCOnlineCollator(BaseOnlineCollator): def __init__(self, cfg): super().__init__(cfg) def __call__(self, batch): """ SVCOnlineDataset.__getitem__: wav: (T,) wav_len: int target_len: int mask: (n_frames, 1) spk_id: (1) wav_{sr}: (T,) wav_{sr}_len: int Returns: wav: (B, T), torch.float32 wav_len: (B), torch.long target_len: (B), torch.long mask: (B, n_frames, 1), torch.long spk_id: (B, 1), torch.int32 wav_{sr}: (B, T) wav_{sr}_len: (B), torch.long """ packed_batch_features = dict() for key in batch[0].keys(): if "_len" in key: packed_batch_features[key] = torch.LongTensor([b[key] for b in batch]) else: packed_batch_features[key] = pad_sequence( [b[key] for b in batch], batch_first=True, padding_value=0 ) return packed_batch_features class SVCTestDataset(BaseTestDataset): def __init__(self, args, cfg, infer_type): BaseTestDataset.__init__(self, args, cfg, infer_type) self.metadata = self.get_metadata() target_singer = args.target_singer self.cfg = cfg self.trans_key = args.trans_key assert type(target_singer) == str self.target_singer = target_singer.split("_")[-1] self.target_dataset = target_singer.replace( "_{}".format(self.target_singer), "" ) if cfg.preprocess.mel_min_max_norm: if self.cfg.preprocess.features_extraction_mode == "online": # TODO: Change the hard code # Using an empirical mel extrema to normalize self.target_mel_extrema = load_mel_extrema(cfg.preprocess, "vctk") else: self.target_mel_extrema = load_mel_extrema( cfg.preprocess, self.target_dataset ) self.target_mel_extrema = torch.as_tensor( self.target_mel_extrema[0] ), torch.as_tensor(self.target_mel_extrema[1]) ######### Load source acoustic features ######### if cfg.preprocess.use_spkid: spk2id_path = os.path.join(args.acoustics_dir, cfg.preprocess.spk2id) # utt2sp_path = os.path.join(self.data_root, cfg.preprocess.utt2spk) with open(spk2id_path, "r", encoding="utf-8") as f: self.spk2id = json.load(f) # print("self.spk2id", self.spk2id) if cfg.preprocess.use_uv: self.utt2uv_path = { f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( cfg.preprocess.processed_dir, utt_info["Dataset"], cfg.preprocess.uv_dir, utt_info["Uid"] + ".npy", ) for utt_info in self.metadata } if cfg.preprocess.use_frame_pitch: self.utt2frame_pitch_path = { f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( cfg.preprocess.processed_dir, utt_info["Dataset"], cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy", ) for utt_info in self.metadata } # Target F0 median target_f0_statistics_path = os.path.join( cfg.preprocess.processed_dir, self.target_dataset, cfg.preprocess.pitch_dir, "statistics.json", ) self.target_pitch_median = json.load( open(target_f0_statistics_path, "r", encoding="utf-8") )[f"{self.target_dataset}_{self.target_singer}"]["voiced_positions"][ "median" ] # Source F0 median (if infer from file) if infer_type == "from_file": source_audio_name = cfg.inference.source_audio_name source_f0_statistics_path = os.path.join( cfg.preprocess.processed_dir, source_audio_name, cfg.preprocess.pitch_dir, "statistics.json", ) self.source_pitch_median = json.load( open(source_f0_statistics_path, "r", encoding="utf-8") )[f"{source_audio_name}_{source_audio_name}"]["voiced_positions"][ "median" ] else: self.source_pitch_median = None if cfg.preprocess.use_frame_energy: self.utt2frame_energy_path = { f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( cfg.preprocess.processed_dir, utt_info["Dataset"], cfg.preprocess.energy_dir, utt_info["Uid"] + ".npy", ) for utt_info in self.metadata } if cfg.preprocess.use_mel: self.utt2mel_path = { f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( cfg.preprocess.processed_dir, utt_info["Dataset"], cfg.preprocess.mel_dir, utt_info["Uid"] + ".npy", ) for utt_info in self.metadata } ######### Load source content features' path ######### if cfg.model.condition_encoder.use_whisper: self.whisper_aligner = WhisperExtractor(cfg) self.utt2whisper_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.whisper_dir ) if cfg.model.condition_encoder.use_contentvec: self.contentvec_aligner = ContentvecExtractor(cfg) self.utt2contentVec_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.contentvec_dir, ) if cfg.model.condition_encoder.use_mert: self.utt2mert_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.mert_dir ) if cfg.model.condition_encoder.use_wenet: self.wenet_aligner = WenetExtractor(cfg) self.utt2wenet_path = load_content_feature_path( self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.wenet_dir ) def __getitem__(self, index): single_feature = {} utt_info = self.metadata[index] dataset = utt_info["Dataset"] uid = utt_info["Uid"] utt = "{}_{}".format(dataset, uid) source_dataset = self.metadata[index]["Dataset"] if self.cfg.preprocess.use_spkid: single_feature["spk_id"] = np.array( [self.spk2id[f"{self.target_dataset}_{self.target_singer}"]], dtype=np.int32, ) ######### Get Acoustic Features Item ######### if self.cfg.preprocess.use_mel: mel = np.load(self.utt2mel_path[utt]) assert mel.shape[0] == self.cfg.preprocess.n_mel # [n_mels, T] if self.cfg.preprocess.use_min_max_norm_mel: # mel norm mel = cal_normalized_mel(mel, source_dataset, self.cfg.preprocess) if "target_len" not in single_feature.keys(): single_feature["target_len"] = mel.shape[1] single_feature["mel"] = mel.T # [T, n_mels] if self.cfg.preprocess.use_frame_pitch: frame_pitch_path = self.utt2frame_pitch_path[utt] frame_pitch = np.load(frame_pitch_path) if self.trans_key: try: self.trans_key = int(self.trans_key) except: pass if type(self.trans_key) == int: frame_pitch = transpose_key(frame_pitch, self.trans_key) elif self.trans_key: assert self.target_singer frame_pitch = pitch_shift_to_target( frame_pitch, self.target_pitch_median, self.source_pitch_median ) if "target_len" not in single_feature.keys(): single_feature["target_len"] = len(frame_pitch) aligned_frame_pitch = align_length( frame_pitch, single_feature["target_len"] ) single_feature["frame_pitch"] = aligned_frame_pitch if self.cfg.preprocess.use_uv: frame_uv_path = self.utt2uv_path[utt] frame_uv = np.load(frame_uv_path) aligned_frame_uv = align_length(frame_uv, single_feature["target_len"]) aligned_frame_uv = [ 0 if frame_uv else 1 for frame_uv in aligned_frame_uv ] aligned_frame_uv = np.array(aligned_frame_uv) single_feature["frame_uv"] = aligned_frame_uv if self.cfg.preprocess.use_frame_energy: frame_energy_path = self.utt2frame_energy_path[utt] frame_energy = np.load(frame_energy_path) if "target_len" not in single_feature.keys(): single_feature["target_len"] = len(frame_energy) aligned_frame_energy = align_length( frame_energy, single_feature["target_len"] ) single_feature["frame_energy"] = aligned_frame_energy ######### Get Content Features Item ######### if self.cfg.model.condition_encoder.use_whisper: assert "target_len" in single_feature.keys() aligned_whisper_feat = ( self.whisper_aligner.offline_resolution_transformation( np.load(self.utt2whisper_path[utt]), single_feature["target_len"] ) ) single_feature["whisper_feat"] = aligned_whisper_feat if self.cfg.model.condition_encoder.use_contentvec: assert "target_len" in single_feature.keys() aligned_contentvec = ( self.contentvec_aligner.offline_resolution_transformation( np.load(self.utt2contentVec_path[utt]), single_feature["target_len"] ) ) single_feature["contentvec_feat"] = aligned_contentvec if self.cfg.model.condition_encoder.use_mert: assert "target_len" in single_feature.keys() aligned_mert_feat = align_content_feature_length( np.load(self.utt2mert_path[utt]), single_feature["target_len"], source_hop=self.cfg.preprocess.mert_hop_size, ) single_feature["mert_feat"] = aligned_mert_feat if self.cfg.model.condition_encoder.use_wenet: assert "target_len" in single_feature.keys() aligned_wenet_feat = self.wenet_aligner.offline_resolution_transformation( np.load(self.utt2wenet_path[utt]), single_feature["target_len"] ) single_feature["wenet_feat"] = aligned_wenet_feat return single_feature def __len__(self): return len(self.metadata) class SVCTestCollator: """Zero-pads model inputs and targets based on number of frames per step""" def __init__(self, cfg): self.cfg = cfg def __call__(self, batch): packed_batch_features = dict() # mel: [b, T, n_mels] # frame_pitch, frame_energy: [1, T] # target_len: [1] # spk_id: [b, 1] # mask: [b, T, 1] for key in batch[0].keys(): if key == "target_len": packed_batch_features["target_len"] = torch.LongTensor( [b["target_len"] for b in batch] ) masks = [ torch.ones((b["target_len"], 1), dtype=torch.long) for b in batch ] packed_batch_features["mask"] = pad_sequence( masks, batch_first=True, padding_value=0 ) else: values = [torch.from_numpy(b[key]) for b in batch] packed_batch_features[key] = pad_sequence( values, batch_first=True, padding_value=0 ) return packed_batch_features