# Copyright (c) 2024 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from typing import Optional, Tuple import numpy as np import scipy import torch from torch import nn, view_as_real, view_as_complex from torch import nn from torch.nn.utils import weight_norm, remove_weight_norm from torchaudio.functional.functional import _hz_to_mel, _mel_to_hz def safe_log(x: torch.Tensor, clip_val: float = 1e-7) -> torch.Tensor: """ Computes the element-wise logarithm of the input tensor with clipping to avoid near-zero values. Args: x (Tensor): Input tensor. clip_val (float, optional): Minimum value to clip the input tensor. Defaults to 1e-7. Returns: Tensor: Element-wise logarithm of the input tensor with clipping applied. """ return torch.log(torch.clip(x, min=clip_val)) def symlog(x: torch.Tensor) -> torch.Tensor: return torch.sign(x) * torch.log1p(x.abs()) def symexp(x: torch.Tensor) -> torch.Tensor: return torch.sign(x) * (torch.exp(x.abs()) - 1) class STFT(nn.Module): def __init__( self, n_fft: int, hop_length: int, win_length: int, center=True, ): super().__init__() self.center = center self.n_fft = n_fft self.hop_length = hop_length self.win_length = win_length window = torch.hann_window(win_length) self.register_buffer("window", window) def forward(self, x: torch.Tensor) -> torch.Tensor: # x: (B, T * hop_length) if not self.center: pad = self.win_length - self.hop_length x = torch.nn.functional.pad(x, (pad // 2, pad // 2), mode="reflect") stft_spec = torch.stft( x, self.n_fft, hop_length=self.hop_length, win_length=self.win_length, window=self.window, center=self.center, return_complex=False, ) # (B, n_fft // 2 + 1, T, 2) rea = stft_spec[:, :, :, 0] # (B, n_fft // 2 + 1, T, 2) imag = stft_spec[:, :, :, 1] # (B, n_fft // 2 + 1, T, 2) log_mag = torch.log( torch.abs(torch.sqrt(torch.pow(rea, 2) + torch.pow(imag, 2))) + 1e-5 ) # (B, n_fft // 2 + 1, T) phase = torch.atan2(imag, rea) # (B, n_fft // 2 + 1, T) return log_mag, phase class ISTFT(nn.Module): """ Custom implementation of ISTFT since torch.istft doesn't allow custom padding (other than `center=True`) with windowing. This is because the NOLA (Nonzero Overlap Add) check fails at the edges. See issue: https://github.com/pytorch/pytorch/issues/62323 Specifically, in the context of neural vocoding we are interested in "same" padding analogous to CNNs. The NOLA constraint is met as we trim padded samples anyway. Args: n_fft (int): Size of Fourier transform. hop_length (int): The distance between neighboring sliding window frames. win_length (int): The size of window frame and STFT filter. padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". """ def __init__( self, n_fft: int, hop_length: int, win_length: int, padding: str = "same" ): super().__init__() if padding not in ["center", "same"]: raise ValueError("Padding must be 'center' or 'same'.") self.padding = padding self.n_fft = n_fft self.hop_length = hop_length self.win_length = win_length window = torch.hann_window(win_length) self.register_buffer("window", window) def forward(self, spec: torch.Tensor) -> torch.Tensor: """ Compute the Inverse Short Time Fourier Transform (ISTFT) of a complex spectrogram. Args: spec (Tensor): Input complex spectrogram of shape (B, N, T), where B is the batch size, N is the number of frequency bins, and T is the number of time frames. Returns: Tensor: Reconstructed time-domain signal of shape (B, L), where L is the length of the output signal. """ if self.padding == "center": # Fallback to pytorch native implementation return torch.istft( spec, self.n_fft, self.hop_length, self.win_length, self.window, center=True, ) elif self.padding == "same": pad = (self.win_length - self.hop_length) // 2 else: raise ValueError("Padding must be 'center' or 'same'.") assert spec.dim() == 3, "Expected a 3D tensor as input" B, N, T = spec.shape # Inverse FFT ifft = torch.fft.irfft(spec, self.n_fft, dim=1, norm="backward") ifft = ifft * self.window[None, :, None] # Overlap and Add output_size = (T - 1) * self.hop_length + self.win_length y = torch.nn.functional.fold( ifft, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length), )[:, 0, 0, pad:-pad] # Window envelope window_sq = self.window.square().expand(1, T, -1).transpose(1, 2) window_envelope = torch.nn.functional.fold( window_sq, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length), ).squeeze()[pad:-pad] # Normalize assert (window_envelope > 1e-11).all() y = y / window_envelope return y class MDCT(nn.Module): """ Modified Discrete Cosine Transform (MDCT) module. Args: frame_len (int): Length of the MDCT frame. padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". """ def __init__(self, frame_len: int, padding: str = "same"): super().__init__() if padding not in ["center", "same"]: raise ValueError("Padding must be 'center' or 'same'.") self.padding = padding self.frame_len = frame_len N = frame_len // 2 n0 = (N + 1) / 2 window = torch.from_numpy(scipy.signal.cosine(frame_len)).float() self.register_buffer("window", window) pre_twiddle = torch.exp(-1j * torch.pi * torch.arange(frame_len) / frame_len) post_twiddle = torch.exp(-1j * torch.pi * n0 * (torch.arange(N) + 0.5) / N) # view_as_real: NCCL Backend does not support ComplexFloat data type # https://github.com/pytorch/pytorch/issues/71613 self.register_buffer("pre_twiddle", view_as_real(pre_twiddle)) self.register_buffer("post_twiddle", view_as_real(post_twiddle)) def forward(self, audio: torch.Tensor) -> torch.Tensor: """ Apply the Modified Discrete Cosine Transform (MDCT) to the input audio. Args: audio (Tensor): Input audio waveform of shape (B, T), where B is the batch size and T is the length of the audio. Returns: Tensor: MDCT coefficients of shape (B, L, N), where L is the number of output frames and N is the number of frequency bins. """ if self.padding == "center": audio = torch.nn.functional.pad( audio, (self.frame_len // 2, self.frame_len // 2) ) elif self.padding == "same": # hop_length is 1/2 frame_len audio = torch.nn.functional.pad( audio, (self.frame_len // 4, self.frame_len // 4) ) else: raise ValueError("Padding must be 'center' or 'same'.") x = audio.unfold(-1, self.frame_len, self.frame_len // 2) N = self.frame_len // 2 x = x * self.window.expand(x.shape) X = torch.fft.fft( x * view_as_complex(self.pre_twiddle).expand(x.shape), dim=-1 )[..., :N] res = X * view_as_complex(self.post_twiddle).expand(X.shape) * np.sqrt(1 / N) return torch.real(res) * np.sqrt(2) class IMDCT(nn.Module): """ Inverse Modified Discrete Cosine Transform (IMDCT) module. Args: frame_len (int): Length of the MDCT frame. padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". """ def __init__(self, frame_len: int, padding: str = "same"): super().__init__() if padding not in ["center", "same"]: raise ValueError("Padding must be 'center' or 'same'.") self.padding = padding self.frame_len = frame_len N = frame_len // 2 n0 = (N + 1) / 2 window = torch.from_numpy(scipy.signal.cosine(frame_len)).float() self.register_buffer("window", window) pre_twiddle = torch.exp(1j * torch.pi * n0 * torch.arange(N * 2) / N) post_twiddle = torch.exp(1j * torch.pi * (torch.arange(N * 2) + n0) / (N * 2)) self.register_buffer("pre_twiddle", view_as_real(pre_twiddle)) self.register_buffer("post_twiddle", view_as_real(post_twiddle)) def forward(self, X: torch.Tensor) -> torch.Tensor: """ Apply the Inverse Modified Discrete Cosine Transform (IMDCT) to the input MDCT coefficients. Args: X (Tensor): Input MDCT coefficients of shape (B, L, N), where B is the batch size, L is the number of frames, and N is the number of frequency bins. Returns: Tensor: Reconstructed audio waveform of shape (B, T), where T is the length of the audio. """ B, L, N = X.shape Y = torch.zeros((B, L, N * 2), dtype=X.dtype, device=X.device) Y[..., :N] = X Y[..., N:] = -1 * torch.conj(torch.flip(X, dims=(-1,))) y = torch.fft.ifft( Y * view_as_complex(self.pre_twiddle).expand(Y.shape), dim=-1 ) y = ( torch.real(y * view_as_complex(self.post_twiddle).expand(y.shape)) * np.sqrt(N) * np.sqrt(2) ) result = y * self.window.expand(y.shape) output_size = (1, (L + 1) * N) audio = torch.nn.functional.fold( result.transpose(1, 2), output_size=output_size, kernel_size=(1, self.frame_len), stride=(1, self.frame_len // 2), )[:, 0, 0, :] if self.padding == "center": pad = self.frame_len // 2 elif self.padding == "same": pad = self.frame_len // 4 else: raise ValueError("Padding must be 'center' or 'same'.") audio = audio[:, pad:-pad] return audio class FourierHead(nn.Module): """Base class for inverse fourier modules.""" def forward(self, x: torch.Tensor) -> torch.Tensor: """ Args: x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, L is the sequence length, and H denotes the model dimension. Returns: Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. """ raise NotImplementedError("Subclasses must implement the forward method.") class ISTFTHead(FourierHead): """ ISTFT Head module for predicting STFT complex coefficients. Args: dim (int): Hidden dimension of the model. n_fft (int): Size of Fourier transform. hop_length (int): The distance between neighboring sliding window frames, which should align with the resolution of the input features. padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". """ def __init__(self, dim: int, n_fft: int, hop_length: int, padding: str = "same"): super().__init__() out_dim = n_fft + 2 self.out = torch.nn.Linear(dim, out_dim) self.istft = ISTFT( n_fft=n_fft, hop_length=hop_length, win_length=n_fft, padding=padding ) def forward(self, x: torch.Tensor) -> torch.Tensor: """ Forward pass of the ISTFTHead module. Args: x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, L is the sequence length, and H denotes the model dimension. Returns: Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. """ x = self.out(x).transpose(1, 2) mag, p = x.chunk(2, dim=1) mag = torch.exp(mag) mag = torch.clip( mag, max=1e2 ) # safeguard to prevent excessively large magnitudes # wrapping happens here. These two lines produce real and imaginary value x = torch.cos(p) y = torch.sin(p) # recalculating phase here does not produce anything new # only costs time # phase = torch.atan2(y, x) # S = mag * torch.exp(phase * 1j) # better directly produce the complex value S = mag * (x + 1j * y) audio = self.istft(S) return audio class IMDCTSymExpHead(FourierHead): """ IMDCT Head module for predicting MDCT coefficients with symmetric exponential function Args: dim (int): Hidden dimension of the model. mdct_frame_len (int): Length of the MDCT frame. padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". sample_rate (int, optional): The sample rate of the audio. If provided, the last layer will be initialized based on perceptual scaling. Defaults to None. clip_audio (bool, optional): Whether to clip the audio output within the range of [-1.0, 1.0]. Defaults to False. """ def __init__( self, dim: int, mdct_frame_len: int, padding: str = "same", sample_rate: Optional[int] = None, clip_audio: bool = False, ): super().__init__() out_dim = mdct_frame_len // 2 self.out = nn.Linear(dim, out_dim) self.imdct = IMDCT(frame_len=mdct_frame_len, padding=padding) self.clip_audio = clip_audio if sample_rate is not None: # optionally init the last layer following mel-scale m_max = _hz_to_mel(sample_rate // 2) m_pts = torch.linspace(0, m_max, out_dim) f_pts = _mel_to_hz(m_pts) scale = 1 - (f_pts / f_pts.max()) with torch.no_grad(): self.out.weight.mul_(scale.view(-1, 1)) def forward(self, x: torch.Tensor) -> torch.Tensor: """ Forward pass of the IMDCTSymExpHead module. Args: x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, L is the sequence length, and H denotes the model dimension. Returns: Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. """ x = self.out(x) x = symexp(x) x = torch.clip( x, min=-1e2, max=1e2 ) # safeguard to prevent excessively large magnitudes audio = self.imdct(x) if self.clip_audio: audio = torch.clip(x, min=-1.0, max=1.0) return audio class IMDCTCosHead(FourierHead): """ IMDCT Head module for predicting MDCT coefficients with parametrizing MDCT = exp(m) ยท cos(p) Args: dim (int): Hidden dimension of the model. mdct_frame_len (int): Length of the MDCT frame. padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". clip_audio (bool, optional): Whether to clip the audio output within the range of [-1.0, 1.0]. Defaults to False. """ def __init__( self, dim: int, mdct_frame_len: int, padding: str = "same", clip_audio: bool = False, ): super().__init__() self.clip_audio = clip_audio self.out = nn.Linear(dim, mdct_frame_len) self.imdct = IMDCT(frame_len=mdct_frame_len, padding=padding) def forward(self, x: torch.Tensor) -> torch.Tensor: """ Forward pass of the IMDCTCosHead module. Args: x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, L is the sequence length, and H denotes the model dimension. Returns: Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. """ x = self.out(x) m, p = x.chunk(2, dim=2) m = torch.exp(m).clip( max=1e2 ) # safeguard to prevent excessively large magnitudes audio = self.imdct(m * torch.cos(p)) if self.clip_audio: audio = torch.clip(x, min=-1.0, max=1.0) return audio class ConvNeXtBlock(nn.Module): """ConvNeXt Block adapted from https://github.com/facebookresearch/ConvNeXt to 1D audio signal. Args: dim (int): Number of input channels. intermediate_dim (int): Dimensionality of the intermediate layer. layer_scale_init_value (float, optional): Initial value for the layer scale. None means no scaling. Defaults to None. adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm. None means non-conditional LayerNorm. Defaults to None. """ def __init__( self, dim: int, intermediate_dim: int, layer_scale_init_value: float, adanorm_num_embeddings: Optional[int] = None, ): super().__init__() self.dwconv = nn.Conv1d( dim, dim, kernel_size=7, padding=3, groups=dim ) # depthwise conv self.adanorm = adanorm_num_embeddings is not None if adanorm_num_embeddings: self.norm = AdaLayerNorm(adanorm_num_embeddings, dim, eps=1e-6) else: self.norm = nn.LayerNorm(dim, eps=1e-6) self.pwconv1 = nn.Linear( dim, intermediate_dim ) # pointwise/1x1 convs, implemented with linear layers self.act = nn.GELU() self.pwconv2 = nn.Linear(intermediate_dim, dim) self.gamma = ( nn.Parameter(layer_scale_init_value * torch.ones(dim), requires_grad=True) if layer_scale_init_value > 0 else None ) def forward( self, x: torch.Tensor, cond_embedding_id: Optional[torch.Tensor] = None ) -> torch.Tensor: residual = x x = self.dwconv(x) x = x.transpose(1, 2) # (B, C, T) -> (B, T, C) if self.adanorm: assert cond_embedding_id is not None x = self.norm(x, cond_embedding_id) else: x = self.norm(x) x = self.pwconv1(x) x = self.act(x) x = self.pwconv2(x) if self.gamma is not None: x = self.gamma * x x = x.transpose(1, 2) # (B, T, C) -> (B, C, T) x = residual + x return x class AdaLayerNorm(nn.Module): """ Adaptive Layer Normalization module with learnable embeddings per `num_embeddings` classes Args: num_embeddings (int): Number of embeddings. embedding_dim (int): Dimension of the embeddings. """ def __init__(self, num_embeddings: int, embedding_dim: int, eps: float = 1e-6): super().__init__() self.eps = eps self.dim = embedding_dim self.scale = nn.Embedding( num_embeddings=num_embeddings, embedding_dim=embedding_dim ) self.shift = nn.Embedding( num_embeddings=num_embeddings, embedding_dim=embedding_dim ) torch.nn.init.ones_(self.scale.weight) torch.nn.init.zeros_(self.shift.weight) def forward(self, x: torch.Tensor, cond_embedding_id: torch.Tensor) -> torch.Tensor: scale = self.scale(cond_embedding_id) shift = self.shift(cond_embedding_id) x = nn.functional.layer_norm(x, (self.dim,), eps=self.eps) x = x * scale + shift return x class ResBlock1(nn.Module): """ ResBlock adapted from HiFi-GAN V1 (https://github.com/jik876/hifi-gan) with dilated 1D convolutions, but without upsampling layers. Args: dim (int): Number of input channels. kernel_size (int, optional): Size of the convolutional kernel. Defaults to 3. dilation (tuple[int], optional): Dilation factors for the dilated convolutions. Defaults to (1, 3, 5). lrelu_slope (float, optional): Negative slope of the LeakyReLU activation function. Defaults to 0.1. layer_scale_init_value (float, optional): Initial value for the layer scale. None means no scaling. Defaults to None. """ def __init__( self, dim: int, kernel_size: int = 3, dilation: Tuple[int, int, int] = (1, 3, 5), lrelu_slope: float = 0.1, layer_scale_init_value: Optional[float] = None, ): super().__init__() self.lrelu_slope = lrelu_slope self.convs1 = nn.ModuleList( [ weight_norm( nn.Conv1d( dim, dim, kernel_size, 1, dilation=dilation[0], padding=self.get_padding(kernel_size, dilation[0]), ) ), weight_norm( nn.Conv1d( dim, dim, kernel_size, 1, dilation=dilation[1], padding=self.get_padding(kernel_size, dilation[1]), ) ), weight_norm( nn.Conv1d( dim, dim, kernel_size, 1, dilation=dilation[2], padding=self.get_padding(kernel_size, dilation[2]), ) ), ] ) self.convs2 = nn.ModuleList( [ weight_norm( nn.Conv1d( dim, dim, kernel_size, 1, dilation=1, padding=self.get_padding(kernel_size, 1), ) ), weight_norm( nn.Conv1d( dim, dim, kernel_size, 1, dilation=1, padding=self.get_padding(kernel_size, 1), ) ), weight_norm( nn.Conv1d( dim, dim, kernel_size, 1, dilation=1, padding=self.get_padding(kernel_size, 1), ) ), ] ) self.gamma = nn.ParameterList( [ ( nn.Parameter( layer_scale_init_value * torch.ones(dim, 1), requires_grad=True ) if layer_scale_init_value is not None else None ), ( nn.Parameter( layer_scale_init_value * torch.ones(dim, 1), requires_grad=True ) if layer_scale_init_value is not None else None ), ( nn.Parameter( layer_scale_init_value * torch.ones(dim, 1), requires_grad=True ) if layer_scale_init_value is not None else None ), ] ) def forward(self, x: torch.Tensor) -> torch.Tensor: for c1, c2, gamma in zip(self.convs1, self.convs2, self.gamma): xt = torch.nn.functional.leaky_relu(x, negative_slope=self.lrelu_slope) xt = c1(xt) xt = torch.nn.functional.leaky_relu(xt, negative_slope=self.lrelu_slope) xt = c2(xt) if gamma is not None: xt = gamma * xt x = xt + x return x def remove_weight_norm(self): for l in self.convs1: remove_weight_norm(l) for l in self.convs2: remove_weight_norm(l) @staticmethod def get_padding(kernel_size: int, dilation: int = 1) -> int: return int((kernel_size * dilation - dilation) / 2) class Backbone(nn.Module): """Base class for the generator's backbone. It preserves the same temporal resolution across all layers.""" def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor: """ Args: x (Tensor): Input tensor of shape (B, C, L), where B is the batch size, C denotes output features, and L is the sequence length. Returns: Tensor: Output of shape (B, L, H), where B is the batch size, L is the sequence length, and H denotes the model dimension. """ raise NotImplementedError("Subclasses must implement the forward method.") class VocosBackbone(Backbone): """ Vocos backbone module built with ConvNeXt blocks. Supports additional conditioning with Adaptive Layer Normalization Args: input_channels (int): Number of input features channels. dim (int): Hidden dimension of the model. intermediate_dim (int): Intermediate dimension used in ConvNeXtBlock. num_layers (int): Number of ConvNeXtBlock layers. layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to `1 / num_layers`. adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm. None means non-conditional model. Defaults to None. """ def __init__( self, input_channels: int, dim: int, intermediate_dim: int, num_layers: int, layer_scale_init_value: Optional[float] = None, adanorm_num_embeddings: Optional[int] = None, ): super().__init__() self.input_channels = input_channels self.embed = nn.Conv1d(input_channels, dim, kernel_size=7, padding=3) self.adanorm = adanorm_num_embeddings is not None if adanorm_num_embeddings: self.norm = AdaLayerNorm(adanorm_num_embeddings, dim, eps=1e-6) else: self.norm = nn.LayerNorm(dim, eps=1e-6) layer_scale_init_value = layer_scale_init_value or 1 / num_layers self.convnext = nn.ModuleList( [ ConvNeXtBlock( dim=dim, intermediate_dim=intermediate_dim, layer_scale_init_value=layer_scale_init_value, adanorm_num_embeddings=adanorm_num_embeddings, ) for _ in range(num_layers) ] ) self.final_layer_norm = nn.LayerNorm(dim, eps=1e-6) self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, (nn.Conv1d, nn.Linear)): nn.init.trunc_normal_(m.weight, std=0.02) nn.init.constant_(m.bias, 0) def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor: bandwidth_id = kwargs.get("bandwidth_id", None) x = self.embed(x) if self.adanorm: assert bandwidth_id is not None x = self.norm(x.transpose(1, 2), cond_embedding_id=bandwidth_id) else: x = self.norm(x.transpose(1, 2)) x = x.transpose(1, 2) for conv_block in self.convnext: x = conv_block(x, cond_embedding_id=bandwidth_id) x = self.final_layer_norm(x.transpose(1, 2)) return x class VocosResNetBackbone(Backbone): """ Vocos backbone module built with ResBlocks. Args: input_channels (int): Number of input features channels. dim (int): Hidden dimension of the model. num_blocks (int): Number of ResBlock1 blocks. layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to None. """ def __init__( self, input_channels, dim, num_blocks, layer_scale_init_value=None, ): super().__init__() self.input_channels = input_channels self.embed = weight_norm( nn.Conv1d(input_channels, dim, kernel_size=3, padding=1) ) layer_scale_init_value = layer_scale_init_value or 1 / num_blocks / 3 self.resnet = nn.Sequential( *[ ResBlock1(dim=dim, layer_scale_init_value=layer_scale_init_value) for _ in range(num_blocks) ] ) def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor: x = self.embed(x) x = self.resnet(x) x = x.transpose(1, 2) return x class Vocos(nn.Module): def __init__( self, input_channels: int = 256, dim: int = 384, intermediate_dim: int = 1152, num_layers: int = 8, adanorm_num_embeddings: int = 4, n_fft: int = 800, hop_size: int = 200, padding: str = "same", ): super().__init__() self.backbone = VocosBackbone( input_channels=input_channels, dim=dim, intermediate_dim=intermediate_dim, num_layers=num_layers, adanorm_num_embeddings=adanorm_num_embeddings, ) self.head = ISTFTHead(dim, n_fft, hop_size, padding) def forward(self, x): x = self.backbone(x) x = self.head(x) return x[:, None, :]