|
|
|
from typing import Optional, Sequence |
|
from torch import nn, Tensor |
|
from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModel |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AutoEncoderConfig(PretrainedConfig): |
|
""" |
|
Configuration class for AutoEncoder. This class stores the parameters for the autoencoder model. |
|
|
|
Attributes: |
|
input_dim (int): The dimensionality of the input data (default: 128). |
|
latent_dim (int): The dimensionality of the latent representation (default: 64). |
|
layer_types (str): The type of layers used, e.g., 'linear', 'lstm', 'gru', 'rnn' (default: 'linear'). |
|
dropout_rate (float): The dropout rate applied after each layer (except for the last layer) (default: 0.1). |
|
num_layers (int): The number of layers in the encoder/decoder (default: 3). |
|
compression_rate (float): Factor by which to compress the dimensions through layers (default: 0.5). |
|
bidirectional (bool): Whether the sequence layers should be bidirectional (default: False). |
|
""" |
|
model_type = "autoencoder" |
|
|
|
def __init__( |
|
self, |
|
input_dim: int = 128, |
|
latent_dim: int = 64, |
|
layer_types: str = 'linear', |
|
dropout_rate: float = 0.1, |
|
num_layers: int = 3, |
|
compression_rate: float = 0.5, |
|
bidirectional: bool = False, |
|
**kwargs |
|
): |
|
super().__init__(**kwargs) |
|
self.input_dim = input_dim |
|
self.latent_dim = latent_dim |
|
self.layer_types = layer_types |
|
self.dropout_rate = dropout_rate |
|
self.num_layers = num_layers |
|
self.compression_rate = compression_rate |
|
self.bidirectional = bidirectional |
|
|
|
def create_layers( |
|
model_section: str, |
|
layer_types: str, |
|
input_dim: int, |
|
latent_dim: int, |
|
num_layers: int, |
|
dropout_rate: float, |
|
compression_rate: float, |
|
bidirectional: bool |
|
) -> nn.Sequential: |
|
""" |
|
Creates a sequence of layers for the encoder or decoder part of the autoencoder. |
|
|
|
Args: |
|
model_section (str): A string indicating whether this is for 'encoder' or 'decoder'. |
|
layer_types (str): The type of layers to include in the sequence. |
|
input_dim (int): The input dimension for the first layer. |
|
latent_dim (int): The target dimension for the latent representation. |
|
num_layers (int): The number of layers to create. |
|
dropout_rate (float): The dropout rate to apply between layers. |
|
compression_rate (float): The compression rate for reducing dimensions through layers. |
|
bidirectional (bool): Whether the RNN layers should be bidirectional. |
|
|
|
Returns: |
|
A nn.Sequential module containing the created layers. |
|
""" |
|
layers = [] |
|
current_dim = input_dim |
|
|
|
input_dimensions = [] |
|
output_dimensions = [] |
|
|
|
for _ in range(num_layers): |
|
input_dimensions.append(current_dim) |
|
next_dim = max(int(current_dim * compression_rate), latent_dim) |
|
current_dim = next_dim |
|
output_dimensions.append(current_dim) |
|
|
|
output_dimensions[num_layers - 1] = latent_dim |
|
|
|
if model_section == "decoder": |
|
input_dimensions, output_dimensions = output_dimensions, input_dimensions |
|
input_dimensions.reverse() |
|
output_dimensions.reverse() |
|
|
|
if bidirectional and (layer_types in ['lstm', 'rnn', 'gru']): |
|
output_dimensions = [2 * value for value in output_dimensions] |
|
|
|
for idx, (input_dim, output_dim) in enumerate(zip(input_dimensions, output_dimensions)): |
|
if layer_types == 'linear': |
|
layers.append(nn.Linear(input_dim, output_dim)) |
|
elif layer_types == 'lstm': |
|
layers.append(nn.LSTM(input_dim, output_dim // (2 if bidirectional else 1), batch_first=True, bidirectional=bidirectional)) |
|
elif layer_types == 'rnn': |
|
layers.append(nn.RNN(input_dim, output_dim // (2 if bidirectional else 1), batch_first=True, bidirectional=bidirectional)) |
|
elif layer_types == 'gru': |
|
layers.append(nn.GRU(input_dim, output_dim // (2 if bidirectional else 1), batch_first=True, bidirectional=bidirectional)) |
|
if (idx != num_layers - 1) and (dropout_rate is not None): |
|
layers.append(nn.Dropout(dropout_rate)) |
|
return nn.Sequential(*layers) |
|
|
|
class AutoEncoder(PreTrainedModel): |
|
""" |
|
AutoEncoder model for creating an encoder-decoder architecture. |
|
|
|
Inherits from PreTrainedModel to utilize its pretrained model features from the Hugging Face library. |
|
|
|
Args: |
|
config (AutoEncoderConfig): The configuration instance with all model parameters. |
|
""" |
|
config_class = AutoEncoderConfig |
|
|
|
def __init__(self, config: AutoEncoderConfig): |
|
super(AutoEncoder, self).__init__(config) |
|
|
|
self.encoder = create_layers( |
|
"encoder", |
|
config.layer_types, config.input_dim, config.latent_dim, |
|
config.num_layers, config.dropout_rate, config.compression_rate, |
|
config.bidirectional |
|
) |
|
|
|
self.decoder = create_layers( |
|
"decoder", |
|
config.layer_types, config.input_dim, config.latent_dim, |
|
config.num_layers, config.dropout_rate, config.compression_rate, |
|
config.bidirectional |
|
) |
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
""" |
|
Forward pass through the autoencoder. |
|
|
|
Args: |
|
x (Tensor): The input tensor to encode and decode. |
|
|
|
Returns: |
|
A Tensor that is the output of the decoder. |
|
""" |
|
|
|
|
|
if self.config.layer_types in ['lstm', 'rnn', 'gru']: |
|
for layer in self.encoder: |
|
if isinstance(layer, nn.LSTM): |
|
x, (h_n, c_n) = layer(x) |
|
elif isinstance(layer, nn.RNN) or isinstance(layer, nn.GRU): |
|
x, h_o = layer(x) |
|
else: |
|
x = layer(x) |
|
|
|
for layer in self.decoder: |
|
if isinstance(layer, nn.LSTM): |
|
x, (h_n, c_n) = layer(x) |
|
elif isinstance(layer, nn.RNN) or isinstance(layer, nn.GRU): |
|
x, h_o = layer(x) |
|
else: |
|
x = layer(x) |
|
else: |
|
x = self.encoder(x) |
|
x = self.decoder(x) |
|
|
|
return x |
|
|