Spaces:
Running
Running
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import copy | |
from functools import partial | |
from typing import Any, Callable, List, Optional, Union | |
import torch | |
from torch import Tensor, nn | |
from torch.nn import functional as F | |
from modules.norms import AdaptiveLayerNorm, LayerNorm, BalancedBasicNorm, IdentityNorm | |
from modules.transformer import MultiheadAttention | |
from modules.general.scaling import BalancedDoubleSwish | |
class TransformerEncoderLayer(nn.Module): | |
__constants__ = ["batch_first", "norm_first"] | |
def __init__( | |
self, | |
d_model: int, | |
nhead: int, | |
dim_feedforward: int = 2048, | |
dropout: float = 0.1, | |
activation: Union[str, Callable[[Tensor], Tensor]] = F.relu, | |
batch_first: bool = False, | |
norm_first: bool = False, | |
device=None, | |
dtype=None, | |
linear1_self_attention_cls: nn.Module = nn.Linear, | |
linear2_self_attention_cls: nn.Module = nn.Linear, | |
linear1_feedforward_cls: nn.Module = nn.Linear, | |
linear2_feedforward_cls: nn.Module = nn.Linear, | |
layer_norm_cls: nn.Module = LayerNorm, | |
layer_norm_eps: float = 1e-5, | |
adaptive_layer_norm=False, | |
) -> None: | |
factory_kwargs = {"device": device, "dtype": dtype} | |
super(TransformerEncoderLayer, self).__init__() | |
self.self_attn = MultiheadAttention( | |
d_model, | |
nhead, | |
dropout=dropout, | |
batch_first=batch_first, | |
linear1_cls=linear1_self_attention_cls, | |
linear2_cls=linear2_self_attention_cls, | |
**factory_kwargs, | |
) | |
# Implementation of Feedforward model | |
self.linear1 = linear1_feedforward_cls( | |
d_model, dim_feedforward, **factory_kwargs | |
) | |
self.dropout = nn.Dropout(dropout) | |
self.linear2 = linear2_feedforward_cls( | |
dim_feedforward, d_model, **factory_kwargs | |
) | |
self.norm_first = norm_first | |
self.dropout1 = nn.Dropout(dropout) | |
self.dropout2 = nn.Dropout(dropout) | |
if isinstance(activation, str): | |
activation = _get_activation_fn(activation) | |
elif isinstance(activation, partial): | |
activation = activation(d_model) | |
elif activation == BalancedDoubleSwish: | |
activation = BalancedDoubleSwish(d_model) | |
self.activation = activation | |
norm1 = layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs) | |
if layer_norm_cls == IdentityNorm: | |
norm2 = BalancedBasicNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
else: | |
norm2 = layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs) | |
if adaptive_layer_norm: | |
self.norm1 = AdaptiveLayerNorm(d_model, norm1) | |
self.norm2 = AdaptiveLayerNorm(d_model, norm2) | |
else: | |
self.norm1 = norm1 | |
self.norm2 = norm2 | |
def __setstate__(self, state): | |
super(TransformerEncoderLayer, self).__setstate__(state) | |
if not hasattr(self, "activation"): | |
self.activation = F.relu | |
def forward( | |
self, | |
src: Tensor, | |
src_mask: Optional[Tensor] = None, | |
src_key_padding_mask: Optional[Tensor] = None, | |
) -> Tensor: | |
r"""Pass the input through the encoder layer. | |
Args: | |
src: the sequence to the encoder layer (required). | |
src_mask: the mask for the src sequence (optional). | |
src_key_padding_mask: the mask for the src keys per batch (optional). | |
Shape: | |
see the docs in Transformer class. | |
""" | |
x, stage_embedding = src, None | |
is_src_tuple = False | |
if isinstance(src, tuple): | |
x, stage_embedding = src | |
is_src_tuple = True | |
if src_key_padding_mask is not None: | |
_skpm_dtype = src_key_padding_mask.dtype | |
if _skpm_dtype != torch.bool and not torch.is_floating_point( | |
src_key_padding_mask | |
): | |
raise AssertionError( | |
"only bool and floating types of key_padding_mask are supported" | |
) | |
if self.norm_first: | |
x = x + self._sa_block( | |
self.norm1(x, stage_embedding), | |
src_mask, | |
src_key_padding_mask, | |
) | |
x = x + self._ff_block(self.norm2(x, stage_embedding)) | |
else: | |
x = self.norm1( | |
x + self._sa_block(x, src_mask, src_key_padding_mask), | |
stage_embedding, | |
) | |
x = self.norm2(x + self._ff_block(x), stage_embedding) | |
if is_src_tuple: | |
return (x, stage_embedding) | |
return x | |
def _sa_block( | |
self, | |
x: Tensor, | |
attn_mask: Optional[Tensor], | |
key_padding_mask: Optional[Tensor], | |
) -> Tensor: | |
x = self.self_attn( | |
x, | |
x, | |
x, | |
attn_mask=attn_mask, | |
key_padding_mask=key_padding_mask, | |
need_weights=False, | |
)[0] | |
return self.dropout1(x) | |
def _ff_block(self, x: Tensor) -> Tensor: | |
x = self.linear2(self.dropout(self.activation(self.linear1(x)))) | |
return self.dropout2(x) | |
class TransformerEncoder(nn.Module): | |
"""TransformerEncoder is a stack of N encoder layers.""" | |
def __init__(self, encoder_layer, num_layers, norm=None): | |
super(TransformerEncoder, self).__init__() | |
self.layers = _get_clones(encoder_layer, num_layers) | |
self.num_layers = num_layers | |
self.norm = norm | |
def forward( | |
self, | |
src: Tensor, | |
mask: Optional[Tensor] = None, | |
src_key_padding_mask: Optional[Tensor] = None, | |
return_layer_states: bool = False, | |
) -> Tensor: | |
# Pass the input through the encoder layers | |
output = src | |
layer_states = [] if return_layer_states else None | |
for mod in self.layers: | |
output = self._apply_module( | |
mod, output, mask, src_key_padding_mask, layer_states | |
) | |
if self.norm is not None: | |
output = self.norm(output) | |
return (layer_states, output) if return_layer_states else output | |
def _apply_module(self, module, output, mask, key_padding_mask, layer_states): | |
# Apply a single transformer module | |
output = module(output, src_mask=mask, src_key_padding_mask=key_padding_mask) | |
if layer_states is not None: | |
layer_states.append(output) | |
return output | |
class TransformerDecoderLayer(nn.Module): | |
__constants__ = ["batch_first", "norm_first"] | |
def __init__( | |
self, | |
d_model: int, | |
nhead: int, | |
dim_feedforward: int = 2048, | |
dropout: float = 0.1, | |
activation: Union[str, Callable[[Tensor], Tensor]] = F.relu, | |
linear1_self_attention_cls: nn.Module = nn.Linear, | |
linear2_self_attention_cls: nn.Module = nn.Linear, | |
linear1_feedforward_cls: nn.Module = nn.Linear, | |
linear2_feedforward_cls: nn.Module = nn.Linear, | |
batch_first: bool = False, | |
norm_first: bool = False, | |
device=None, | |
dtype=None, | |
layer_norm_cls: nn.Module = LayerNorm, | |
layer_norm_eps: float = 1e-5, | |
adaptive_layer_norm=False, | |
) -> None: | |
factory_kwargs = {"device": device, "dtype": dtype} | |
super(TransformerDecoderLayer, self).__init__() | |
self.self_attn = MultiheadAttention( | |
d_model, | |
nhead, | |
dropout=dropout, | |
batch_first=batch_first, | |
linear1_cls=linear1_self_attention_cls, | |
linear2_cls=linear2_self_attention_cls, | |
**factory_kwargs, | |
) | |
self.multihead_attn = MultiheadAttention( | |
d_model, | |
nhead, | |
dropout=dropout, | |
batch_first=batch_first, | |
linear1_cls=linear1_self_attention_cls, | |
linear2_cls=linear2_self_attention_cls, | |
**factory_kwargs, | |
) | |
self.linear1 = linear1_feedforward_cls( | |
d_model, dim_feedforward, **factory_kwargs | |
) | |
self.dropout = nn.Dropout(dropout) | |
self.linear2 = linear2_feedforward_cls( | |
dim_feedforward, d_model, **factory_kwargs | |
) | |
self.norm_first = norm_first | |
self.dropout1 = nn.Dropout(dropout) | |
self.dropout2 = nn.Dropout(dropout) | |
self.dropout3 = nn.Dropout(dropout) | |
self.activation = self._get_activation_fn(activation) | |
self.norm1, self.norm2, self.norm3 = self._init_norm_layers( | |
d_model, layer_norm_cls, layer_norm_eps, adaptive_layer_norm, factory_kwargs | |
) | |
def forward( | |
self, | |
tgt: Tensor, | |
memory: Tensor, | |
tgt_mask: Optional[Tensor] = None, | |
memory_mask: Optional[Tensor] = None, | |
tgt_key_padding_mask: Optional[Tensor] = None, | |
memory_key_padding_mask: Optional[Tensor] = None, | |
) -> Tensor: | |
r"""Pass the inputs (and mask) through the decoder layer. | |
Args: | |
tgt: the sequence to the decoder layer (required). | |
memory: the sequence from the last layer of the encoder (required). | |
tgt_mask: the mask for the tgt sequence (optional). | |
memory_mask: the mask for the memory sequence (optional). | |
tgt_key_padding_mask: the mask for the tgt keys per batch (optional). | |
memory_key_padding_mask: the mask for the memory keys per batch (optional). | |
Shape: | |
see the docs in Transformer class. | |
""" | |
tgt_is_tuple = False | |
if isinstance(tgt, tuple): | |
x, stage_embedding = tgt | |
tgt_is_tuple = True | |
else: | |
x, stage_embedding = tgt, None | |
if self.norm_first: | |
x = x + self._sa_block( | |
self.norm1(x, stage_embedding), tgt_mask, tgt_key_padding_mask | |
) | |
x = x + self._mha_block( | |
self.norm2(x, stage_embedding), | |
memory, | |
memory_mask, | |
memory_key_padding_mask, | |
) | |
x = x + self._ff_block(self.norm3(x, stage_embedding)) | |
else: | |
x = self.norm1( | |
x + self._sa_block(x, tgt_mask, tgt_key_padding_mask), | |
stage_embedding, | |
) | |
x = self.norm2( | |
x + self._mha_block(x, memory, memory_mask, memory_key_padding_mask), | |
stage_embedding, | |
) | |
x = self.norm3(x + self._ff_block(x), stage_embedding) | |
if tgt_is_tuple: | |
return (x, stage_embedding) | |
return x | |
def _sa_block( | |
self, | |
x: Tensor, | |
attn_mask: Optional[Tensor], | |
key_padding_mask: Optional[Tensor], | |
) -> Tensor: | |
x = self.self_attn( | |
x, | |
x, | |
x, | |
attn_mask=attn_mask, | |
key_padding_mask=key_padding_mask, | |
need_weights=False, | |
)[0] | |
return self.dropout1(x) | |
def _mha_block( | |
self, | |
x: Tensor, | |
mem: Tensor, | |
attn_mask: Optional[Tensor], | |
key_padding_mask: Optional[Tensor], | |
) -> Tensor: | |
x = self.multihead_attn( | |
x, | |
mem, | |
mem, | |
attn_mask=attn_mask, | |
key_padding_mask=key_padding_mask, | |
need_weights=False, | |
)[0] | |
return self.dropout2(x) | |
def _ff_block(self, x: Tensor) -> Tensor: | |
x = self.linear2(self.dropout(self.activation(self.linear1(x)))) | |
return self.dropout3(x) | |
def _get_activation_fn(self, activation): | |
if isinstance(activation, str): | |
return _get_activation_fn(activation) | |
elif callable(activation): | |
return activation | |
else: | |
raise ValueError("Unsupported activation type") | |
def _init_norm_layers( | |
self, | |
d_model, | |
layer_norm_cls, | |
layer_norm_eps, | |
adaptive_layer_norm, | |
factory_kwargs, | |
): | |
if adaptive_layer_norm: | |
return ( | |
AdaptiveLayerNorm( | |
d_model, | |
layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs), | |
), | |
AdaptiveLayerNorm( | |
d_model, | |
layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs), | |
), | |
AdaptiveLayerNorm( | |
d_model, | |
layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs), | |
), | |
) | |
else: | |
return ( | |
layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs), | |
layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs), | |
( | |
layer_norm_cls(d_model, eps=layer_norm_eps, **factory_kwargs) | |
if layer_norm_cls != IdentityNorm | |
else BalancedBasicNorm( | |
d_model, eps=layer_norm_eps, **factory_kwargs | |
) | |
), | |
) | |
def _get_clones(module, N): | |
return nn.ModuleList([copy.deepcopy(module) for i in range(N)]) | |
def _get_activation_fn(activation: str) -> Callable[[Tensor], Tensor]: | |
if activation == "relu": | |
return F.relu | |
elif activation == "gelu": | |
return F.gelu | |
raise RuntimeError("activation should be relu/gelu, not {}".format(activation)) | |
class Transpose(nn.Identity): | |
"""(N, T, D) -> (N, D, T)""" | |
def forward(self, input: torch.Tensor) -> torch.Tensor: | |
return input.transpose(1, 2) | |