Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import torch | |
import torch.nn as nn | |
import functools | |
import torch.nn.functional as F | |
def hinge_d_loss(logits_real, logits_fake): | |
loss_real = torch.mean(F.relu(1.0 - logits_real)) | |
loss_fake = torch.mean(F.relu(1.0 + logits_fake)) | |
d_loss = 0.5 * (loss_real + loss_fake) | |
return d_loss | |
def vanilla_d_loss(logits_real, logits_fake): | |
d_loss = 0.5 * ( | |
torch.mean(F.softplus(-logits_real)) + torch.mean(F.softplus(logits_fake)) | |
) | |
return d_loss | |
def adopt_weight(weight, global_step, threshold=0, value=0.0): | |
if global_step < threshold: | |
weight = value | |
return weight | |
class ActNorm(nn.Module): | |
def __init__( | |
self, num_features, logdet=False, affine=True, allow_reverse_init=False | |
): | |
assert affine | |
super().__init__() | |
self.logdet = logdet | |
self.loc = nn.Parameter(torch.zeros(1, num_features, 1, 1)) | |
self.scale = nn.Parameter(torch.ones(1, num_features, 1, 1)) | |
self.allow_reverse_init = allow_reverse_init | |
self.register_buffer("initialized", torch.tensor(0, dtype=torch.uint8)) | |
def initialize(self, input): | |
with torch.no_grad(): | |
flatten = input.permute(1, 0, 2, 3).contiguous().view(input.shape[1], -1) | |
mean = ( | |
flatten.mean(1) | |
.unsqueeze(1) | |
.unsqueeze(2) | |
.unsqueeze(3) | |
.permute(1, 0, 2, 3) | |
) | |
std = ( | |
flatten.std(1) | |
.unsqueeze(1) | |
.unsqueeze(2) | |
.unsqueeze(3) | |
.permute(1, 0, 2, 3) | |
) | |
self.loc.data.copy_(-mean) | |
self.scale.data.copy_(1 / (std + 1e-6)) | |
def forward(self, input, reverse=False): | |
if reverse: | |
return self.reverse(input) | |
if len(input.shape) == 2: | |
input = input[:, :, None, None] | |
squeeze = True | |
else: | |
squeeze = False | |
_, _, height, width = input.shape | |
if self.training and self.initialized.item() == 0: | |
self.initialize(input) | |
self.initialized.fill_(1) | |
h = self.scale * (input + self.loc) | |
if squeeze: | |
h = h.squeeze(-1).squeeze(-1) | |
if self.logdet: | |
log_abs = torch.log(torch.abs(self.scale)) | |
logdet = height * width * torch.sum(log_abs) | |
logdet = logdet * torch.ones(input.shape[0]).to(input) | |
return h, logdet | |
return h | |
def reverse(self, output): | |
if self.training and self.initialized.item() == 0: | |
if not self.allow_reverse_init: | |
raise RuntimeError( | |
"Initializing ActNorm in reverse direction is " | |
"disabled by default. Use allow_reverse_init=True to enable." | |
) | |
else: | |
self.initialize(output) | |
self.initialized.fill_(1) | |
if len(output.shape) == 2: | |
output = output[:, :, None, None] | |
squeeze = True | |
else: | |
squeeze = False | |
h = output / self.scale - self.loc | |
if squeeze: | |
h = h.squeeze(-1).squeeze(-1) | |
return h | |
def weights_init(m): | |
classname = m.__class__.__name__ | |
if classname.find("Conv") != -1: | |
nn.init.normal_(m.weight.data, 0.0, 0.02) | |
elif classname.find("BatchNorm") != -1: | |
nn.init.normal_(m.weight.data, 1.0, 0.02) | |
nn.init.constant_(m.bias.data, 0) | |
class NLayerDiscriminator(nn.Module): | |
"""Defines a PatchGAN discriminator as in Pix2Pix | |
--> see https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py | |
""" | |
def __init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False): | |
"""Construct a PatchGAN discriminator | |
Parameters: | |
input_nc (int) -- the number of channels in input images | |
ndf (int) -- the number of filters in the last conv layer | |
n_layers (int) -- the number of conv layers in the discriminator | |
norm_layer -- normalization layer | |
""" | |
super(NLayerDiscriminator, self).__init__() | |
if not use_actnorm: | |
norm_layer = nn.BatchNorm2d | |
else: | |
norm_layer = ActNorm | |
if ( | |
type(norm_layer) == functools.partial | |
): # no need to use bias as BatchNorm2d has affine parameters | |
use_bias = norm_layer.func != nn.BatchNorm2d | |
else: | |
use_bias = norm_layer != nn.BatchNorm2d | |
kw = 4 | |
padw = 1 | |
sequence = [ | |
nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), | |
nn.LeakyReLU(0.2, True), | |
] | |
nf_mult = 1 | |
nf_mult_prev = 1 | |
for n in range(1, n_layers): # gradually increase the number of filters | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n, 8) | |
sequence += [ | |
nn.Conv2d( | |
ndf * nf_mult_prev, | |
ndf * nf_mult, | |
kernel_size=kw, | |
stride=2, | |
padding=padw, | |
bias=use_bias, | |
), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True), | |
] | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n_layers, 8) | |
sequence += [ | |
nn.Conv2d( | |
ndf * nf_mult_prev, | |
ndf * nf_mult, | |
kernel_size=kw, | |
stride=1, | |
padding=padw, | |
bias=use_bias, | |
), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True), | |
] | |
sequence += [ | |
nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw) | |
] # output 1 channel prediction map | |
self.main = nn.Sequential(*sequence) | |
def forward(self, input): | |
"""Standard forward.""" | |
return self.main(input) | |
class AutoencoderLossWithDiscriminator(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.cfg = cfg | |
self.kl_weight = cfg.kl_weight | |
self.logvar = nn.Parameter(torch.ones(size=()) * cfg.logvar_init) | |
self.discriminator = NLayerDiscriminator( | |
input_nc=cfg.disc_in_channels, | |
n_layers=cfg.disc_num_layers, | |
use_actnorm=cfg.use_actnorm, | |
).apply(weights_init) | |
self.discriminator_iter_start = cfg.disc_start | |
self.discriminator_weight = cfg.disc_weight | |
self.disc_factor = cfg.disc_factor | |
self.disc_loss = hinge_d_loss | |
def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer): | |
nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0] | |
g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0] | |
d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4) | |
d_weight = torch.clamp( | |
d_weight, self.cfg.min_adapt_d_weight, self.cfg.max_adapt_d_weight | |
).detach() | |
d_weight = d_weight * self.discriminator_weight | |
return d_weight | |
def forward( | |
self, | |
inputs, | |
reconstructions, | |
posteriors, | |
optimizer_idx, | |
global_step, | |
last_layer, | |
split="train", | |
weights=None, | |
): | |
rec_loss = torch.abs( | |
inputs.contiguous() - reconstructions.contiguous() | |
) # l1 loss | |
nll_loss = rec_loss / torch.exp(self.logvar) + self.logvar | |
weighted_nll_loss = nll_loss | |
if weights is not None: | |
weighted_nll_loss = weights * nll_loss | |
# weighted_nll_loss = torch.sum(weighted_nll_loss) / weighted_nll_loss.shape[0] | |
weighted_nll_loss = torch.mean(weighted_nll_loss) | |
# nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] | |
nll_loss = torch.mean(nll_loss) | |
kl_loss = posteriors.kl() | |
kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] | |
# ? kl_loss = torch.mean(kl_loss) | |
# now the GAN part | |
if optimizer_idx == 0: | |
logits_fake = self.discriminator(reconstructions.contiguous()) | |
g_loss = -torch.mean(logits_fake) | |
if self.disc_factor > 0.0: | |
try: | |
d_weight = self.calculate_adaptive_weight( | |
nll_loss, g_loss, last_layer=last_layer | |
) | |
except RuntimeError: | |
assert not self.training | |
d_weight = torch.tensor(0.0) | |
else: | |
d_weight = torch.tensor(0.0) | |
disc_factor = adopt_weight( | |
self.disc_factor, global_step, threshold=self.discriminator_iter_start | |
) | |
total_loss = ( | |
weighted_nll_loss | |
+ self.kl_weight * kl_loss | |
+ d_weight * disc_factor * g_loss | |
) | |
return { | |
"loss": total_loss, | |
"kl_loss": kl_loss, | |
"rec_loss": rec_loss.mean(), | |
"nll_loss": nll_loss, | |
"g_loss": g_loss, | |
"d_weight": d_weight, | |
"disc_factor": torch.tensor(disc_factor), | |
} | |
if optimizer_idx == 1: | |
logits_real = self.discriminator(inputs.contiguous().detach()) | |
logits_fake = self.discriminator(reconstructions.contiguous().detach()) | |
disc_factor = adopt_weight( | |
self.disc_factor, global_step, threshold=self.discriminator_iter_start | |
) | |
d_loss = disc_factor * self.disc_loss(logits_real, logits_fake) | |
return { | |
"d_loss": d_loss, | |
"logits_real": logits_real.mean(), | |
"logits_fake": logits_fake.mean(), | |
} | |