Spaces:
Runtime error
Runtime error
import torch | |
import torch.distributions as dist | |
from torch import nn | |
from modules.commons.normalizing_flow.glow_modules import Glow | |
from modules.tts.portaspeech.portaspeech import PortaSpeech | |
class PortaSpeechFlow(PortaSpeech): | |
def __init__(self, ph_dict_size, word_dict_size, hparams, out_dims=None): | |
super().__init__(ph_dict_size, word_dict_size, hparams, out_dims) | |
cond_hs = 80 | |
if hparams.get('use_txt_cond', True): | |
cond_hs = cond_hs + hparams['hidden_size'] | |
if hparams.get('use_latent_cond', False): | |
cond_hs = cond_hs + hparams['latent_size'] | |
if hparams['use_cond_proj']: | |
self.g_proj = nn.Conv1d(cond_hs, 160, 5, padding=2) | |
cond_hs = 160 | |
self.post_flow = Glow( | |
80, hparams['post_glow_hidden'], hparams['post_glow_kernel_size'], 1, | |
hparams['post_glow_n_blocks'], hparams['post_glow_n_block_layers'], | |
n_split=4, n_sqz=2, | |
gin_channels=cond_hs, | |
share_cond_layers=hparams['post_share_cond_layers'], | |
share_wn_layers=hparams['share_wn_layers'], | |
sigmoid_scale=hparams['sigmoid_scale'] | |
) | |
self.prior_dist = dist.Normal(0, 1) | |
def forward(self, txt_tokens, word_tokens, ph2word, word_len, mel2word=None, mel2ph=None, | |
spk_embed=None, spk_id=None, pitch=None, infer=False, tgt_mels=None, | |
forward_post_glow=True, two_stage=True, global_step=None): | |
is_training = self.training | |
train_fvae = not (forward_post_glow and two_stage) | |
if not train_fvae: | |
self.eval() | |
with torch.set_grad_enabled(mode=train_fvae): | |
ret = super(PortaSpeechFlow, self).forward( | |
txt_tokens, word_tokens, ph2word, word_len, mel2word, mel2ph, | |
spk_embed, spk_id, pitch, infer, tgt_mels, global_step) | |
if (forward_post_glow or not two_stage) and self.hparams['use_post_flow']: | |
self.run_post_glow(tgt_mels, infer, is_training, ret) | |
return ret | |
def run_post_glow(self, tgt_mels, infer, is_training, ret): | |
x_recon = ret['mel_out'].transpose(1, 2) | |
g = x_recon | |
B, _, T = g.shape | |
if self.hparams.get('use_txt_cond', True): | |
g = torch.cat([g, ret['decoder_inp'].transpose(1, 2)], 1) | |
if self.hparams.get('use_latent_cond', False): | |
g_z = ret['z_p'][:, :, :, None].repeat(1, 1, 1, 4).reshape(B, -1, T) | |
g = torch.cat([g, g_z], 1) | |
if self.hparams['use_cond_proj']: | |
g = self.g_proj(g) | |
prior_dist = self.prior_dist | |
if not infer: | |
if is_training: | |
self.post_flow.train() | |
nonpadding = ret['nonpadding'].transpose(1, 2) | |
y_lengths = nonpadding.sum(-1) | |
if self.hparams['detach_postflow_input']: | |
g = g.detach() | |
tgt_mels = tgt_mels.transpose(1, 2) | |
z_postflow, ldj = self.post_flow(tgt_mels, nonpadding, g=g) | |
ldj = ldj / y_lengths / 80 | |
ret['z_pf'], ret['ldj_pf'] = z_postflow, ldj | |
ret['postflow'] = -prior_dist.log_prob(z_postflow).mean() - ldj.mean() | |
if torch.isnan(ret['postflow']): | |
ret['postflow'] = None | |
else: | |
nonpadding = torch.ones_like(x_recon[:, :1, :]) | |
z_post = prior_dist.sample(x_recon.shape).to(g.device) * self.hparams['noise_scale'] | |
x_recon, _ = self.post_flow(z_post, nonpadding, g, reverse=True) | |
ret['mel_out'] = x_recon.transpose(1, 2) | |