File size: 5,748 Bytes
c968fc3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# Copyright (c) 2023 Amphion.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import json
import pickle
import glob
from collections import defaultdict
from tqdm import tqdm
# Train: male 20 hours, female 10 hours
TRAIN_MALE_MAX_SECONDS = 20 * 3600
TRAIN_FEMALE_MAX_SECONDS = 10 * 3600
TEST_MAX_NUM_EVERY_PERSON = 5
def select_sample_idxs():
chosen_speakers = get_chosen_speakers()
with open(os.path.join(vctk_dir, "train.json"), "r") as f:
raw_train = json.load(f)
with open(os.path.join(vctk_dir, "test.json"), "r") as f:
raw_test = json.load(f)
train_idxs, test_idxs = [], []
# =========== Test ===========
test_nums = defaultdict(int)
for utt in tqdm(raw_train):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON:
test_nums[singer] += 1
test_idxs.append("train_{}".format(idx))
for utt in tqdm(raw_test):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON:
test_nums[singer] += 1
test_idxs.append("test_{}".format(idx))
# =========== Train ===========
for utt in tqdm(raw_train):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and "train_{}".format(idx) not in test_idxs:
train_idxs.append("train_{}".format(idx))
for utt in tqdm(raw_test):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and "test_{}".format(idx) not in test_idxs:
train_idxs.append("test_{}".format(idx))
train_idxs.sort()
test_idxs.sort()
return train_idxs, test_idxs, raw_train, raw_test
def statistics_of_speakers():
speaker2time = defaultdict(float)
sex2time = defaultdict(float)
with open(os.path.join(vctk_dir, "train.json"), "r") as f:
train = json.load(f)
with open(os.path.join(vctk_dir, "test.json"), "r") as f:
test = json.load(f)
for utt in train + test:
# minutes
speaker2time[utt["Singer"]] += utt["Duration"]
# hours
sex2time[utt["Singer"].split("_")[0]] += utt["Duration"]
print(
"Female: {:.2f} hours, Male: {:.2f} hours.\n".format(
sex2time["female"] / 3600, sex2time["male"] / 3600
)
)
speaker2time = sorted(speaker2time.items(), key=lambda x: x[-1], reverse=True)
for singer, seconds in speaker2time:
print("{}\t{:.2f} mins".format(singer, seconds / 60))
return speaker2time
def get_chosen_speakers():
speaker2time = statistics_of_speakers()
chosen_time = defaultdict(float)
chosen_speaker = defaultdict(list)
train_constrait = {
"male": TRAIN_MALE_MAX_SECONDS,
"female": TRAIN_FEMALE_MAX_SECONDS,
}
for speaker, seconds in speaker2time:
sex = speaker.split("_")[0]
if chosen_time[sex] < train_constrait[sex]:
chosen_time[sex] += seconds
chosen_speaker[sex].append(speaker)
speaker2time = dict(speaker2time)
chosen_speaker = chosen_speaker["male"] + chosen_speaker["female"]
print("\n#Chosen speakers = {}".format(len(chosen_speaker)))
for spk in chosen_speaker:
print("{}\t{:.2f} mins".format(spk, speaker2time[spk] / 60))
return chosen_speaker
if __name__ == "__main__":
root_path = ""
vctk_dir = os.path.join(root_path, "vctk")
fewspeaker_dir = os.path.join(root_path, "vctkfewspeaker")
os.makedirs(fewspeaker_dir, exist_ok=True)
train_idxs, test_idxs, raw_train, raw_test = select_sample_idxs()
print("#Train = {}, #Test = {}".format(len(train_idxs), len(test_idxs)))
# There are no data leakage
assert len(set(train_idxs).intersection(set(test_idxs))) == 0
for idx in train_idxs + test_idxs:
# No test data of raw vctk
assert "test_" not in idx
for split, chosen_idxs in zip(["train", "test"], [train_idxs, test_idxs]):
print("{}: #chosen idx = {}\n".format(split, len(chosen_idxs)))
# Select features
feat_files = glob.glob("**/train.pkl", root_dir=vctk_dir, recursive=True)
for file in tqdm(feat_files):
raw_file = os.path.join(vctk_dir, file)
new_file = os.path.join(
fewspeaker_dir, file.replace("train.pkl", "{}.pkl".format(split))
)
new_dir = "/".join(new_file.split("/")[:-1])
os.makedirs(new_dir, exist_ok=True)
if "mel_min" in file or "mel_max" in file:
os.system("cp {} {}".format(raw_file, new_file))
continue
with open(raw_file, "rb") as f:
raw_feats = pickle.load(f)
print("file: {}, #raw_feats = {}".format(file, len(raw_feats)))
new_feats = []
for idx in chosen_idxs:
chosen_split_is_train, raw_idx = idx.split("_")
assert chosen_split_is_train == "train"
new_feats.append(raw_feats[int(raw_idx)])
with open(new_file, "wb") as f:
pickle.dump(new_feats, f)
print("New file: {}, #new_feats = {}".format(new_file, len(new_feats)))
# Utterance re-index
news_utts = [raw_train[int(idx.split("_")[-1])] for idx in chosen_idxs]
for i, utt in enumerate(news_utts):
utt["Dataset"] = "vctkfewsinger"
utt["index"] = i
with open(os.path.join(fewspeaker_dir, "{}.json".format(split)), "w") as f:
json.dump(news_utts, f, indent=4)
|