Spaces:
Sleeping
Sleeping
import os | |
import clip | |
import pytorch_lightning as pl | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from rewards.base_reward import BaseRewardLoss | |
class AestheticLoss(BaseRewardLoss): | |
"""CLIP reward loss function for optimization.""" | |
def __init__( | |
self, | |
weigthing: float, | |
dtype: torch.dtype, | |
device: torch.device, | |
cache_dir: str, | |
memsave: bool = False, | |
): | |
self.clip_model, self.preprocess_fn = clip.load( | |
"ViT-L/14", device=device, download_root=cache_dir | |
) | |
self.clip_model = self.clip_model.to(device, dtype=dtype) | |
self.mlp = MLP(768).to(device, dtype=dtype) | |
s = torch.load( | |
f"{os.getcwd()}/ckpts/aesthetic-model.pth" | |
) # load the model you trained previously or the model available in this repo | |
self.mlp.load_state_dict(s) | |
self.clip_model.eval() | |
if memsave: | |
import memsave_torch.nn | |
self.mlp = memsave_torch.nn.convert_to_memory_saving(self.mlp) | |
self.clip_model = memsave_torch.nn.convert_to_memory_saving( | |
self.clip_model | |
).to(device, dtype=dtype) | |
self.freeze_parameters(self.clip_model.parameters()) | |
self.freeze_parameters(self.mlp.parameters()) | |
super().__init__("Aesthetic", weigthing) | |
def get_image_features(self, image: torch.Tensor) -> torch.Tensor: | |
with torch.autocast("cuda"): | |
clip_img_features = self.clip_model.encode_image(image) | |
l2 = torch.norm(clip_img_features, p=2, dim=-1, keepdim=True) | |
l2 = torch.where( | |
l2 == 0, | |
torch.tensor( | |
1.0, device=clip_img_features.device, dtype=clip_img_features.dtype | |
), | |
l2, | |
) | |
clip_img_features = clip_img_features / l2 | |
return clip_img_features | |
def get_text_features(self, prompt: str) -> torch.Tensor: | |
return None | |
def compute_loss( | |
self, image_features: torch.Tensor, text_features: torch.Tensor | |
) -> torch.Tensor: | |
return None | |
def __call__(self, image: torch.Tensor, prompt: torch.Tensor) -> torch.Tensor: | |
if self.memsave: | |
image = image.to(torch.float32) | |
image_features = self.get_image_features(image) | |
image_features_normed = self.process_features(image_features.to(torch.float16)) | |
aesthetic_loss = 10.0 - self.mlp(image_features_normed).mean() | |
return aesthetic_loss | |
class MLP(pl.LightningModule): | |
def __init__(self, input_size, xcol="emb", ycol="avg_rating"): | |
super().__init__() | |
self.input_size = input_size | |
self.xcol = xcol | |
self.ycol = ycol | |
self.layers = nn.Sequential( | |
nn.Linear(self.input_size, 1024), | |
# nn.ReLU(), | |
nn.Dropout(0.2), | |
nn.Linear(1024, 128), | |
# nn.ReLU(), | |
nn.Dropout(0.2), | |
nn.Linear(128, 64), | |
# nn.ReLU(), | |
nn.Dropout(0.1), | |
nn.Linear(64, 16), | |
# nn.ReLU(), | |
nn.Linear(16, 1), | |
) | |
def forward(self, x): | |
return self.layers(x) | |
def training_step(self, batch, batch_idx): | |
x = batch[self.xcol] | |
y = batch[self.ycol].reshape(-1, 1) | |
x_hat = self.layers(x) | |
loss = F.mse_loss(x_hat, y) | |
return loss | |
def validation_step(self, batch, batch_idx): | |
x = batch[self.xcol] | |
y = batch[self.ycol].reshape(-1, 1) | |
x_hat = self.layers(x) | |
loss = F.mse_loss(x_hat, y) | |
return loss | |
def configure_optimizers(self): | |
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3) | |
return optimizer | |