VADER / Core /weather_scorer.py
QinOwen
add-vader-videocrafter
824b515
# Copy from Cheng An Hsieh, et. al.: https://github.com/RewardMultiverse/reward-multiverse
import torch
import torch.nn as nn
import torchvision
from transformers import CLIPModel, CLIPProcessor
class SimpleCNN(nn.Module): # parameter = 6333513
def __init__(self, num_class = None):
super(SimpleCNN, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer3 = nn.Sequential(
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer4 = nn.Sequential(
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc1 = nn.Linear(128 * 32 * 32, 1000)
self.fc2 = nn.Linear(1000, num_class)
def forward(self, x):
x = self.layer1(x)
# print("x1", x.shape)
x = self.layer2(x)
# print("x2", x.shape)
x = self.layer3(x)
# print("x3", x.shape)
x = self.layer4(x)
# print("x4", x.shape)
x = x.reshape(x.size(0), -1)
# print("x reshape", x.shape)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential( # regression
nn.Linear(768, 1024),
nn.Dropout(0.2),
nn.Linear(1024, 128),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.Dropout(0.1),
nn.Linear(64, 16),
nn.Linear(16, 1),
nn.Sigmoid()
)
# self.layers = nn.Sequential( # classification
# nn.Linear(768, 1024),
# nn.Dropout(0.2),
# nn.Linear(1024, 128),
# nn.Dropout(0.2),
# nn.Linear(128, 64),
# nn.Dropout(0.1),
# nn.Linear(64, 16),
# nn.Linear(16, 2)
# )
def forward(self, embed):
return self.layers(embed)
class MLP_Resnet(nn.Module):
def __init__(self, num_class):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(1000, 128),
# nn.Dropout(0.2),
nn.Linear(128, 64),
# nn.Dropout(0.2),
nn.Linear(64, 16),
nn.Linear(16, num_class),
)
def forward(self, embed):
return self.layers(embed)
def weather_loss_fn(target=None, # TODO: use config.task to decide returned loss_fn
grad_scale=0,
device=None,
accelerator=None,
torch_dtype=None,
reward_model_resume_from=None,
num_of_labels=None):
scorer = WeatherScorer(dtype=torch_dtype, model_path=reward_model_resume_from, num_class=num_of_labels).to(device, dtype=torch_dtype)
scorer.requires_grad_(False)
scorer.eval()
def loss_fn(im_pix_un):
if accelerator.mixed_precision == "fp16":
with accelerator.autocast():
rewards = scorer(im_pix_un)
else:
rewards = scorer(im_pix_un)
target_tensors = torch.full((rewards.shape[0],), target).to(rewards.device, dtype=rewards.dtype) # regression
criterion = torch.nn.MSELoss(reduction = "sum") # regression
# target_tensors = torch.full((rewards.shape[0],), target).to(rewards.device, dtype=torch.long) # classification
# criterion = nn.CrossEntropyLoss(reduction="sum") # classification
loss = criterion(rewards, target_tensors)
return loss * grad_scale, rewards #nn.Softmax(dim=-1)(rewards) # rewards (reg)
return loss_fn
class WeatherModel(nn.Module):
def __init__(self, num_class = None):
super().__init__()
self.embed_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
self.score_model = MLP_Resnet(num_class)
def __call__(self, im):
return self.score_model(self.embed_model(im))
class WeatherScorer(nn.Module): # Reward model
def __init__(self, dtype=None, model_path = None, num_class = None):
super().__init__()
self.clip = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
self.clip.requires_grad_(False)
self.clip.eval()
self.score_generator = MLP()
# self.score_generator = WeatherModel(num_class) # resnet + mlp
if model_path:
state_dict = torch.load(model_path)
self.score_generator.load_state_dict(state_dict)
self.score_generator.requires_grad_(False)
self.score_generator.eval()
# self.clip.requires_grad_(False)
# self.clip.eval()
else:
self.score_generator.requires_grad_(True)
if dtype:
self.dtype = dtype
self.target_size = (224,224) # resnet 224, cnn 512 (use 224 for both...?)
self.normalize = torchvision.transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073],
std=[0.26862954, 0.26130258, 0.27577711])
def set_device(self, device, inference_type):
self.clip.to(device, dtype = inference_type) # uncomment for mlp
self.score_generator.to(device) # dtype = inference_dtype
def __call__(self, images):
device = next(self.parameters()).device
im_pix = torchvision.transforms.Resize(self.target_size)(images)
im_pix = self.normalize(im_pix).to(images.dtype)
embed = self.clip.get_image_features(pixel_values=im_pix)
embed = embed / torch.linalg.vector_norm(embed, dim=-1, keepdim=True)
return self.score_generator(embed).squeeze(1) # CLIP + MLP
# return self.score_generator(im_pix).squeeze(1) # for simpleCNN