Spaces:
Build error
Build error
import torch | |
from modules.tts.portaspeech.portaspeech_flow import PortaSpeechFlow | |
from tasks.tts.fs import FastSpeechTask | |
from tasks.tts.ps import PortaSpeechTask | |
from utils.audio.pitch.utils import denorm_f0 | |
from utils.commons.hparams import hparams | |
class PortaSpeechFlowTask(PortaSpeechTask): | |
def __init__(self): | |
super().__init__() | |
self.training_post_glow = False | |
def build_tts_model(self): | |
ph_dict_size = len(self.token_encoder) | |
word_dict_size = len(self.word_encoder) | |
self.model = PortaSpeechFlow(ph_dict_size, word_dict_size, hparams) | |
def _training_step(self, sample, batch_idx, opt_idx): | |
self.training_post_glow = self.global_step >= hparams['post_glow_training_start'] \ | |
and hparams['use_post_flow'] | |
if hparams['two_stage'] and \ | |
((opt_idx == 0 and self.training_post_glow) or (opt_idx == 1 and not self.training_post_glow)): | |
return None | |
loss_output, _ = self.run_model(sample) | |
total_loss = sum([v for v in loss_output.values() if isinstance(v, torch.Tensor) and v.requires_grad]) | |
loss_output['batch_size'] = sample['txt_tokens'].size()[0] | |
if 'postflow' in loss_output and loss_output['postflow'] is None: | |
return None | |
return total_loss, loss_output | |
def run_model(self, sample, infer=False, *args, **kwargs): | |
if not infer: | |
training_post_glow = self.training_post_glow | |
spk_embed = sample.get('spk_embed') | |
spk_id = sample.get('spk_ids') | |
output = self.model(sample['txt_tokens'], | |
sample['word_tokens'], | |
ph2word=sample['ph2word'], | |
mel2word=sample['mel2word'], | |
mel2ph=sample['mel2ph'], | |
word_len=sample['word_lengths'].max(), | |
tgt_mels=sample['mels'], | |
pitch=sample.get('pitch'), | |
spk_embed=spk_embed, | |
spk_id=spk_id, | |
infer=False, | |
forward_post_glow=training_post_glow, | |
two_stage=hparams['two_stage'], | |
global_step=self.global_step) | |
losses = {} | |
self.add_mel_loss(output['mel_out'], sample['mels'], losses) | |
if (training_post_glow or not hparams['two_stage']) and hparams['use_post_flow']: | |
losses['postflow'] = output['postflow'] | |
losses['l1'] = losses['l1'].detach() | |
losses['ssim'] = losses['ssim'].detach() | |
if not training_post_glow or not hparams['two_stage'] or not self.training: | |
losses['kl'] = output['kl'] | |
if self.global_step < hparams['kl_start_steps']: | |
losses['kl'] = losses['kl'].detach() | |
else: | |
losses['kl'] = torch.clamp(losses['kl'], min=hparams['kl_min']) | |
losses['kl'] = losses['kl'] * hparams['lambda_kl'] | |
if hparams['dur_level'] == 'word': | |
self.add_dur_loss( | |
output['dur'], sample['mel2word'], sample['word_lengths'], sample['txt_tokens'], losses) | |
self.get_attn_stats(output['attn'], sample, losses) | |
else: | |
super().add_dur_loss(output['dur'], sample['mel2ph'], sample['txt_tokens'], losses) | |
return losses, output | |
else: | |
use_gt_dur = kwargs.get('infer_use_gt_dur', hparams['use_gt_dur']) | |
forward_post_glow = self.global_step >= hparams['post_glow_training_start'] + 1000 \ | |
and hparams['use_post_flow'] | |
spk_embed = sample.get('spk_embed') | |
spk_id = sample.get('spk_ids') | |
output = self.model( | |
sample['txt_tokens'], | |
sample['word_tokens'], | |
ph2word=sample['ph2word'], | |
word_len=sample['word_lengths'].max(), | |
pitch=sample.get('pitch'), | |
mel2ph=sample['mel2ph'] if use_gt_dur else None, | |
mel2word=sample['mel2word'] if hparams['profile_infer'] or hparams['use_gt_dur'] else None, | |
infer=True, | |
forward_post_glow=forward_post_glow, | |
spk_embed=spk_embed, | |
spk_id=spk_id, | |
two_stage=hparams['two_stage'] | |
) | |
return output | |
def validation_step(self, sample, batch_idx): | |
self.training_post_glow = self.global_step >= hparams['post_glow_training_start'] \ | |
and hparams['use_post_flow'] | |
return super().validation_step(sample, batch_idx) | |
def save_valid_result(self, sample, batch_idx, model_out): | |
super(PortaSpeechFlowTask, self).save_valid_result(sample, batch_idx, model_out) | |
sr = hparams['audio_sample_rate'] | |
f0_gt = None | |
if sample.get('f0') is not None: | |
f0_gt = denorm_f0(sample['f0'][0].cpu(), sample['uv'][0].cpu()) | |
if self.global_step > 0: | |
# save FVAE result | |
if hparams['use_post_flow']: | |
wav_pred = self.vocoder.spec2wav(model_out['mel_out_fvae'][0].cpu(), f0=f0_gt) | |
self.logger.add_audio(f'wav_fvae_{batch_idx}', wav_pred, self.global_step, sr) | |
self.plot_mel(batch_idx, sample['mels'], model_out['mel_out_fvae'][0], | |
f'mel_fvae_{batch_idx}', f0s=f0_gt) | |
def build_optimizer(self, model): | |
if hparams['two_stage'] and hparams['use_post_flow']: | |
self.optimizer = torch.optim.AdamW( | |
[p for name, p in self.model.named_parameters() if 'post_flow' not in name], | |
lr=hparams['lr'], | |
betas=(hparams['optimizer_adam_beta1'], hparams['optimizer_adam_beta2']), | |
weight_decay=hparams['weight_decay']) | |
self.post_flow_optimizer = torch.optim.AdamW( | |
self.model.post_flow.parameters(), | |
lr=hparams['post_flow_lr'], | |
betas=(hparams['optimizer_adam_beta1'], hparams['optimizer_adam_beta2']), | |
weight_decay=hparams['weight_decay']) | |
return [self.optimizer, self.post_flow_optimizer] | |
else: | |
self.optimizer = torch.optim.AdamW( | |
self.model.parameters(), | |
lr=hparams['lr'], | |
betas=(hparams['optimizer_adam_beta1'], hparams['optimizer_adam_beta2']), | |
weight_decay=hparams['weight_decay']) | |
return [self.optimizer] | |
def build_scheduler(self, optimizer): | |
return FastSpeechTask.build_scheduler(self, optimizer[0]) |