Spaces:
Sleeping
Sleeping
import torch | |
import lightning | |
# from torch.utils.data import Dataset | |
# from typing import Any, Dict | |
# import argparse | |
from pydantic import BaseModel | |
# from get_dataset_dictionaries import get_dict_pair | |
# import os | |
# import shutil | |
# import optuna | |
# from optuna.integration import PyTorchLightningPruningCallback | |
# from functools import partial | |
class FFNModule(torch.nn.Module): | |
""" | |
A pytorch module that regresses from a hidden state representation of a word | |
to its continuous linguistic feature norm vector. | |
It is a FFN with the general structure of: | |
input -> (linear -> nonlinearity -> dropout) x (num_layers - 1) -> linear -> output | |
""" | |
def __init__( | |
self, | |
input_size: int, | |
output_size: int, | |
hidden_size: int, | |
num_layers: int, | |
dropout: float, | |
): | |
super(FFNModule, self).__init__() | |
layers = [] | |
for _ in range(num_layers - 1): | |
layers.append(torch.nn.Linear(input_size, hidden_size)) | |
layers.append(torch.nn.ReLU()) | |
layers.append(torch.nn.Dropout(dropout)) | |
# changes input size to hidden size after first layer | |
input_size = hidden_size | |
layers.append(torch.nn.Linear(hidden_size, output_size)) | |
self.network = torch.nn.Sequential(*layers) | |
def forward(self, x): | |
return self.network(x) | |
class FFNParams(BaseModel): | |
input_size: int | |
output_size: int | |
hidden_size: int | |
num_layers: int | |
dropout: float | |
class TrainingParams(BaseModel): | |
num_epochs: int | |
batch_size: int | |
learning_rate: float | |
weight_decay: float | |
class FeatureNormPredictor(lightning.LightningModule): | |
def __init__(self, ffn_params : FFNParams, training_params : TrainingParams): | |
super().__init__() | |
self.save_hyperparameters() | |
self.ffn_params = ffn_params | |
self.training_params = training_params | |
self.model = FFNModule(**ffn_params.model_dump()) | |
self.loss_function = torch.nn.MSELoss() | |
self.training_params = training_params | |
def training_step(self, batch, batch_idx): | |
x,y = batch | |
outputs = self.model(x) | |
loss = self.loss_function(outputs, y) | |
self.log("train_loss", loss) | |
return loss | |
def validation_step(self, batch, batch_idx): | |
x,y = batch | |
outputs = self.model(x) | |
loss = self.loss_function(outputs, y) | |
self.log("val_loss", loss, on_epoch=True, prog_bar=True) | |
return loss | |
def test_step(self, batch, batch_idx): | |
return self.model(batch) | |
def predict(self, batch): | |
return self.model(batch) | |
def __call__(self, input): | |
return self.model(input) | |
def configure_optimizers(self): | |
optimizer = torch.optim.Adam( | |
self.parameters(), | |
lr=self.training_params.learning_rate, | |
weight_decay=self.training_params.weight_decay, | |
) | |
return optimizer | |
def save_model(self, path: str): | |
torch.save(self.model.state_dict(), path) | |
def load_model(self, path: str): | |
self.model.load_state_dict(torch.load(path)) | |
# class HiddenStateFeatureNormDataset(Dataset): | |
# def __init__( | |
# self, | |
# input_embeddings: Dict[str, torch.Tensor], | |
# feature_norms: Dict[str, torch.Tensor], | |
# ): | |
# # Invariant: input_embeddings and target_feature_norms have exactly the same keys | |
# # this should be done by the train/test split and upstream data processing | |
# assert(input_embeddings.keys() == feature_norms.keys()) | |
# self.words = list(input_embeddings.keys()) | |
# self.input_embeddings = torch.stack([ | |
# input_embeddings[word] for word in self.words | |
# ]) | |
# self.feature_norms = torch.stack([ | |
# feature_norms[word] for word in self.words | |
# ]) | |
# def __len__(self): | |
# return len(self.words) | |
# def __getitem__(self, idx): | |
# return self.input_embeddings[idx], self.feature_norms[idx] | |
# # this is used when not optimizing | |
# def train(args : Dict[str, Any]): | |
# # input_embeddings = torch.load(args.input_embeddings) | |
# # feature_norms = torch.load(args.feature_norms) | |
# # words = list(input_embeddings.keys()) | |
# input_embeddings, feature_norms, norm_list = get_dict_pair( | |
# args.norm, | |
# args.embedding_dir, | |
# args.lm_layer, | |
# translated= False if args.raw_buchanan else True, | |
# normalized= True if args.normal_buchanan else False | |
# ) | |
# norms_file = open(args.save_dir+"/"+args.save_model_name+'.txt','w') | |
# norms_file.write("\n".join(norm_list)) | |
# norms_file.close() | |
# words = list(input_embeddings.keys()) | |
# model = FeatureNormPredictor( | |
# FFNParams( | |
# input_size=input_embeddings[words[0]].shape[0], | |
# output_size=feature_norms[words[0]].shape[0], | |
# hidden_size=args.hidden_size, | |
# num_layers=args.num_layers, | |
# dropout=args.dropout, | |
# ), | |
# TrainingParams( | |
# num_epochs=args.num_epochs, | |
# batch_size=args.batch_size, | |
# learning_rate=args.learning_rate, | |
# weight_decay=args.weight_decay, | |
# ), | |
# ) | |
# # train/val split | |
# train_size = int(len(words) * 0.8) | |
# valid_size = len(words) - train_size | |
# train_words, validation_words = torch.utils.data.random_split(words, [train_size, valid_size]) | |
# # TODO: Methodology Decision: should we be normalizing the hidden states/feature norms? | |
# train_embeddings = {word: input_embeddings[word] for word in train_words} | |
# train_feature_norms = {word: feature_norms[word] for word in train_words} | |
# validation_embeddings = {word: input_embeddings[word] for word in validation_words} | |
# validation_feature_norms = {word: feature_norms[word] for word in validation_words} | |
# train_dataset = HiddenStateFeatureNormDataset(train_embeddings, train_feature_norms) | |
# train_dataloader = torch.utils.data.DataLoader( | |
# train_dataset, | |
# batch_size=args.batch_size, | |
# shuffle=True, | |
# ) | |
# validation_dataset = HiddenStateFeatureNormDataset(validation_embeddings, validation_feature_norms) | |
# validation_dataloader = torch.utils.data.DataLoader( | |
# validation_dataset, | |
# batch_size=args.batch_size, | |
# shuffle=True, | |
# ) | |
# callbacks = [ | |
# lightning.pytorch.callbacks.ModelCheckpoint( | |
# save_last=True, | |
# dirpath=args.save_dir, | |
# filename=args.save_model_name, | |
# ), | |
# ] | |
# if args.early_stopping is not None: | |
# callbacks.append(lightning.pytorch.callbacks.EarlyStopping( | |
# monitor="val_loss", | |
# patience=args.early_stopping, | |
# mode='min', | |
# min_delta=0.0 | |
# )) | |
# #TODO Design Decision - other trainer args? Is device necessary? | |
# # cpu is fine for the scale of this model - only a few layers and a few hundred words | |
# trainer = lightning.Trainer( | |
# max_epochs=args.num_epochs, | |
# callbacks=callbacks, | |
# accelerator="cpu", | |
# log_every_n_steps=7 | |
# ) | |
# trainer.fit(model, train_dataloader, validation_dataloader) | |
# trainer.validate(model, validation_dataloader) | |
# return model | |
# # this is used when optimizing | |
# def objective(trial: optuna.trial.Trial, args: Dict[str, Any]) -> float: | |
# # optimizing hidden size, batch size, and learning rate | |
# input_embeddings, feature_norms, norm_list = get_dict_pair( | |
# args.norm, | |
# args.embedding_dir, | |
# args.lm_layer, | |
# translated= False if args.raw_buchanan else True, | |
# normalized= True if args.normal_buchanan else False | |
# ) | |
# norms_file = open(args.save_dir+"/"+args.save_model_name+'.txt','w') | |
# norms_file.write("\n".join(norm_list)) | |
# norms_file.close() | |
# words = list(input_embeddings.keys()) | |
# input_size=input_embeddings[words[0]].shape[0] | |
# output_size=feature_norms[words[0]].shape[0] | |
# min_size = min(output_size, input_size) | |
# max_size = min(output_size, 2*input_size)if min_size == input_size else min(2*output_size, input_size) | |
# hidden_size = trial.suggest_int("hidden_size", min_size, max_size, log=True) | |
# batch_size = trial.suggest_int("batch_size", 16, 128, log=True) | |
# learning_rate = trial.suggest_float("learning_rate", 1e-6, 1, log=True) | |
# model = FeatureNormPredictor( | |
# FFNParams( | |
# input_size=input_size, | |
# output_size=output_size, | |
# hidden_size=hidden_size, | |
# num_layers=args.num_layers, | |
# dropout=args.dropout, | |
# ), | |
# TrainingParams( | |
# num_epochs=args.num_epochs, | |
# batch_size=batch_size, | |
# learning_rate=learning_rate, | |
# weight_decay=args.weight_decay, | |
# ), | |
# ) | |
# # train/val split | |
# train_size = int(len(words) * 0.8) | |
# valid_size = len(words) - train_size | |
# train_words, validation_words = torch.utils.data.random_split(words, [train_size, valid_size]) | |
# train_embeddings = {word: input_embeddings[word] for word in train_words} | |
# train_feature_norms = {word: feature_norms[word] for word in train_words} | |
# validation_embeddings = {word: input_embeddings[word] for word in validation_words} | |
# validation_feature_norms = {word: feature_norms[word] for word in validation_words} | |
# train_dataset = HiddenStateFeatureNormDataset(train_embeddings, train_feature_norms) | |
# train_dataloader = torch.utils.data.DataLoader( | |
# train_dataset, | |
# batch_size=args.batch_size, | |
# shuffle=True, | |
# ) | |
# validation_dataset = HiddenStateFeatureNormDataset(validation_embeddings, validation_feature_norms) | |
# validation_dataloader = torch.utils.data.DataLoader( | |
# validation_dataset, | |
# batch_size=args.batch_size, | |
# shuffle=True, | |
# ) | |
# callbacks = [ | |
# # all trial models will be saved in temporary directory | |
# lightning.pytorch.callbacks.ModelCheckpoint( | |
# save_last=True, | |
# dirpath=os.path.join(args.save_dir,'optuna_trials'), | |
# filename="{}".format(trial.number) | |
# ), | |
# ] | |
# if args.prune is not None: | |
# callbacks.append(PyTorchLightningPruningCallback( | |
# trial, | |
# monitor='val_loss' | |
# )) | |
# if args.early_stopping is not None: | |
# callbacks.append(lightning.pytorch.callbacks.EarlyStopping( | |
# monitor="val_loss", | |
# patience=args.early_stopping, | |
# mode='min', | |
# min_delta=0.0 | |
# )) | |
# # note that if optimizing is chosen, will automatically not implement vanilla early stopping | |
# #TODO Design Decision - other trainer args? Is device necessary? | |
# # cpu is fine for the scale of this model - only a few layers and a few hundred words | |
# trainer = lightning.Trainer( | |
# max_epochs=args.num_epochs, | |
# callbacks=callbacks, | |
# accelerator="cpu", | |
# log_every_n_steps=7, | |
# # enable_checkpointing=False | |
# ) | |
# trainer.fit(model, train_dataloader, validation_dataloader) | |
# trainer.validate(model, validation_dataloader) | |
# return trainer.callback_metrics['val_loss'].item() | |
# if __name__ == "__main__": | |
# # parse args | |
# parser = argparse.ArgumentParser() | |
# #TODO: Design Decision: Should we input paths, to the pre-extracted layers, or the model/layer we want to generate them from | |
# # required inputs | |
# parser.add_argument("--norm", type=str, required=True, help="feature norm set to use") | |
# parser.add_argument("--embedding_dir", type=str, required=True, help=" directory containing embeddings") | |
# parser.add_argument("--lm_layer", type=int, required=True, help="layer of embeddings to use") | |
# # if user selects optimize, hidden_size, batch_size and learning_rate will be optimized. | |
# parser.add_argument("--optimize", action="store_true", help="optimize hyperparameters for training") | |
# parser.add_argument("--prune", action="store_true", help="prune unpromising trials when optimizing") | |
# # optional hyperparameter specs | |
# parser.add_argument("--num_layers", type=int, default=2, help="number of layers in FFN") | |
# parser.add_argument("--hidden_size", type=int, default=100, help="hidden size of FFN") | |
# parser.add_argument("--dropout", type=float, default=0.1, help="dropout rate of FFN") | |
# # set this to at least 100 if doing early stopping | |
# parser.add_argument("--num_epochs", type=int, default=10, help="number of epochs to train for") | |
# parser.add_argument("--batch_size", type=int, default=32, help="batch size for training") | |
# parser.add_argument("--learning_rate", type=float, default=0.001, help="learning rate for training") | |
# parser.add_argument("--weight_decay", type=float, default=0.0, help="weight decay for training") | |
# parser.add_argument("--early_stopping", type=int, default=None, help="number of epochs to wait for early stopping") | |
# # optional dataset specs, for buchanan really | |
# parser.add_argument('--raw_buchanan', action="store_true", help="do not use translated values for buchanan") | |
# parser.add_argument('--normal_buchanan', action="store_true", help="use normalized features for buchanan") | |
# # required for output | |
# parser.add_argument("--save_dir", type=str, required=True, help="directory to save model to") | |
# parser.add_argument("--save_model_name", type=str, required=True, help="name of model to save") | |
# args = parser.parse_args() | |
# if args.early_stopping is not None: | |
# args.num_epochs = max(50, args.num_epochs) | |
# torch.manual_seed(10) | |
# if args.optimize: | |
# # call optimizer code here | |
# print("optimizing for learning rate, batch size, and hidden size") | |
# pruner = optuna.pruners.MedianPruner() if args.prune else optuna.pruners.NopPruner() | |
# sampler = optuna.samplers.TPESampler(seed=10) | |
# study = optuna.create_study(direction='minimize', pruner=pruner, sampler=sampler) | |
# study.optimize(partial(objective, args=args), n_trials = 100, timeout=600) | |
# other_params = { | |
# "num_layers": args.num_layers, | |
# "num_epochs": args.num_epochs, | |
# "dropout": args.dropout, | |
# "weight_decay": args.weight_decay, | |
# } | |
# print("Number of finished trials: {}".format(len(study.trials))) | |
# trial = study.best_trial | |
# print("Best trial: "+str(trial.number)) | |
# print(" Validation Loss: {}".format(trial.value)) | |
# print(" Optimized Params: ") | |
# for key, value in trial.params.items(): | |
# print(" {}: {}".format(key, value)) | |
# print(" User Defined Params: ") | |
# for key, value in other_params.items(): | |
# print(" {}: {}".format(key, value)) | |
# print('saving best trial') | |
# for filename in os.listdir(os.path.join(args.save_dir,'optuna_trials')): | |
# if filename == "{}.ckpt".format(trial.number): | |
# shutil.move(os.path.join(args.save_dir,'optuna_trials',filename), os.path.join(args.save_dir, "{}.ckpt".format(args.save_model_name))) | |
# shutil.rmtree(os.path.join(args.save_dir,'optuna_trials')) | |
# else: | |
# model = train(args) | |