File size: 7,160 Bytes
67d041f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import torch
import numpy as np
import logging, yaml, os, sys, argparse, math
import matplotlib.pyplot as plt
from tqdm import tqdm
from librosa import griffinlim

from Modules.Modules import DiffSinger
from Datasets import Inference_Dataset as Dataset, Inference_Collater as Collater
from meldataset import spectral_de_normalize_torch
from Arg_Parser import Recursive_Parse

import matplotlib as mpl
# ์œ ๋‹ˆ์ฝ”๋“œ ๊นจ์งํ˜„์ƒ ํ•ด๊ฒฐ
mpl.rcParams['axes.unicode_minus'] = False
# ๋‚˜๋ˆ”๊ณ ๋”• ํฐํŠธ ์ ์šฉ
plt.rcParams["font.family"] = 'NanumGothic'

logging.basicConfig(
    level=logging.INFO, stream=sys.stdout,
    format= '%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s'
    )

class Inferencer:
    def __init__(
        self,
        hp_path: str,
        checkpoint_path: str,
        batch_size= 1
        ):
        self.device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
        
        self.hp = Recursive_Parse(yaml.load(
            open(hp_path, encoding='utf-8'),
            Loader=yaml.Loader
            ))

        self.model = DiffSinger(self.hp).to(self.device)
        if self.hp.Feature_Type == 'Mel':
            self.vocoder = torch.jit.load('vocoder.pts', map_location='cpu').to(self.device)

        if self.hp.Feature_Type == 'Spectrogram':
            self.feature_range_info_dict = yaml.load(open(self.hp.Spectrogram_Range_Info_Path), Loader=yaml.Loader)
        if self.hp.Feature_Type == 'Mel':
            self.feature_range_info_dict = yaml.load(open(self.hp.Mel_Range_Info_Path), Loader=yaml.Loader)
        self.index_singer_dict = {
            value: key
            for key, value in yaml.load(open(self.hp.Singer_Info_Path), Loader=yaml.Loader).items()
            }

        if self.hp.Feature_Type == 'Spectrogram':
            self.feature_size = self.hp.Sound.N_FFT // 2 + 1
        elif self.hp.Feature_Type == 'Mel':
            self.feature_size = self.hp.Sound.Mel_Dim
        else:
            raise ValueError('Unknown feature type: {}'.format(self.hp.Feature_Type))

        self.Load_Checkpoint(checkpoint_path)
        self.batch_size = batch_size

    def Dataset_Generate(self, message_times_list, lyrics, notes, singers, genres):
        token_dict = yaml.load(open(self.hp.Token_Path), Loader=yaml.Loader)
        singer_info_dict = yaml.load(open(self.hp.Singer_Info_Path), Loader=yaml.Loader)
        genre_info_dict = yaml.load(open(self.hp.Genre_Info_Path), Loader=yaml.Loader)

        return torch.utils.data.DataLoader(
            dataset= Dataset(
                token_dict= token_dict,
                singer_info_dict= singer_info_dict,
                genre_info_dict= genre_info_dict,
                durations= message_times_list,
                lyrics= lyrics,
                notes= notes,
                singers= singers,
                genres= genres,
                sample_rate= self.hp.Sound.Sample_Rate,
                frame_shift= self.hp.Sound.Frame_Shift,
                equality_duration= self.hp.Duration.Equality,
                consonant_duration= self.hp.Duration.Consonant_Duration
                ),
            shuffle= False,
            collate_fn= Collater(
                token_dict= token_dict
                ),
            batch_size= self.batch_size,
            num_workers= 0,
            pin_memory= True
            )

    def Load_Checkpoint(self, path):
        state_dict = torch.load(path, map_location= 'cpu')
        self.model.load_state_dict(state_dict['Model']['DiffSVS'])        
        self.steps = state_dict['Steps']

        self.model.eval()

        logging.info('Checkpoint loaded at {} steps.'.format(self.steps))

    @torch.inference_mode()
    def Inference_Step(self, tokens, notes, durations, lengths, singers, genres, singer_labels, ddim_steps):
        tokens = tokens.to(self.device, non_blocking=True)
        notes = notes.to(self.device, non_blocking=True)
        durations = durations.to(self.device, non_blocking=True)
        lengths = lengths.to(self.device, non_blocking=True)
        singers = singers.to(self.device, non_blocking=True)
        genres = genres.to(self.device, non_blocking=True)
        
        linear_predictions, diffusion_predictions, _, _ = self.model(
            tokens= tokens,
            notes= notes,
            durations= durations,
            lengths= lengths,
            genres= genres,
            singers= singers,
            ddim_steps= ddim_steps
            )
        linear_predictions = linear_predictions.clamp(-1.0, 1.0)
        diffusion_predictions = diffusion_predictions.clamp(-1.0, 1.0)

        linear_prediction_list, diffusion_prediction_list = [], []
        for linear_prediction, diffusion_prediction, singer in zip(linear_predictions, diffusion_predictions, singer_labels):
            feature_max = self.feature_range_info_dict[singer]['Max']
            feature_min = self.feature_range_info_dict[singer]['Min']
            linear_prediction_list.append((linear_prediction + 1.0) / 2.0 * (feature_max - feature_min) + feature_min)
            diffusion_prediction_list.append((diffusion_prediction + 1.0) / 2.0 * (feature_max - feature_min) + feature_min)
        linear_predictions = torch.stack(linear_prediction_list, dim= 0)
        diffusion_predictions = torch.stack(diffusion_prediction_list, dim= 0)
        
        if self.hp.Feature_Type == 'Mel':
            audios = self.vocoder(diffusion_predictions)
            if audios.ndim == 1:    # This is temporal because of the vocoder problem.
                audios = audios.unsqueeze(0)
            audios = [
                audio[:min(length * self.hp.Sound.Frame_Shift, audio.size(0))].cpu().numpy()
                for audio, length in zip(audios, lengths)
                ]
        elif self.hp.Feature_Type == 'Spectrogram':
            audios = []
            for prediction, length in zip(
                diffusion_predictions,
                lengths
                ):
                prediction = spectral_de_normalize_torch(prediction).cpu().numpy()
                audio = griffinlim(prediction)[:min(prediction.size(1), length) * self.hp.Sound.Frame_Shift]
                audio = (audio / np.abs(audio).max() * 32767.5).astype(np.int16)
                audios.append(audio)

        return audios

    def Inference_Epoch(self, message_times_list, lyrics, notes, singers, genres, ddim_steps= None, use_tqdm= True):
        dataloader = self.Dataset_Generate(
            message_times_list= message_times_list,
            lyrics= lyrics,
            notes= notes,
            singers= singers,
            genres= genres
            )
        if use_tqdm:
            dataloader = tqdm(
                dataloader,
                desc='[Inference]',
                total= math.ceil(len(dataloader.dataset) / self.batch_size)
                )
        audios = []
        for tokens, notes, durations, lengths, singers, genres, singer_labels, lyrics in dataloader:
            audios.extend(self.Inference_Step(tokens, notes, durations, lengths, singers, genres, singer_labels, ddim_steps))
        
        return audios