BayesCap / src /utils.py
udion's picture
BayesCap demo to EuroCrypt
5bd623f
raw
history blame
40.2 kB
import random
from typing import Any, Optional
import numpy as np
import os
import cv2
from glob import glob
from PIL import Image, ImageDraw
from tqdm import tqdm
import kornia
import matplotlib.pyplot as plt
import seaborn as sns
import albumentations as albu
import functools
import math
import torch
import torch.nn as nn
from torch import Tensor
import torchvision as tv
import torchvision.models as models
from torchvision import transforms
from torchvision.transforms import functional as F
from losses import TempCombLoss
########### DeblurGAN function
def get_norm_layer(norm_type='instance'):
if norm_type == 'batch':
norm_layer = functools.partial(nn.BatchNorm2d, affine=True)
elif norm_type == 'instance':
norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=True)
else:
raise NotImplementedError('normalization layer [%s] is not found' % norm_type)
return norm_layer
def _array_to_batch(x):
x = np.transpose(x, (2, 0, 1))
x = np.expand_dims(x, 0)
return torch.from_numpy(x)
def get_normalize():
normalize = albu.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
normalize = albu.Compose([normalize], additional_targets={'target': 'image'})
def process(a, b):
r = normalize(image=a, target=b)
return r['image'], r['target']
return process
def preprocess(x: np.ndarray, mask: Optional[np.ndarray]):
x, _ = get_normalize()(x, x)
if mask is None:
mask = np.ones_like(x, dtype=np.float32)
else:
mask = np.round(mask.astype('float32') / 255)
h, w, _ = x.shape
block_size = 32
min_height = (h // block_size + 1) * block_size
min_width = (w // block_size + 1) * block_size
pad_params = {'mode': 'constant',
'constant_values': 0,
'pad_width': ((0, min_height - h), (0, min_width - w), (0, 0))
}
x = np.pad(x, **pad_params)
mask = np.pad(mask, **pad_params)
return map(_array_to_batch, (x, mask)), h, w
def postprocess(x: torch.Tensor) -> np.ndarray:
x, = x
x = x.detach().cpu().float().numpy()
x = (np.transpose(x, (1, 2, 0)) + 1) / 2.0 * 255.0
return x.astype('uint8')
def sorted_glob(pattern):
return sorted(glob(pattern))
###########
def normalize(image: np.ndarray) -> np.ndarray:
"""Normalize the ``OpenCV.imread`` or ``skimage.io.imread`` data.
Args:
image (np.ndarray): The image data read by ``OpenCV.imread`` or ``skimage.io.imread``.
Returns:
Normalized image data. Data range [0, 1].
"""
return image.astype(np.float64) / 255.0
def unnormalize(image: np.ndarray) -> np.ndarray:
"""Un-normalize the ``OpenCV.imread`` or ``skimage.io.imread`` data.
Args:
image (np.ndarray): The image data read by ``OpenCV.imread`` or ``skimage.io.imread``.
Returns:
Denormalized image data. Data range [0, 255].
"""
return image.astype(np.float64) * 255.0
def image2tensor(image: np.ndarray, range_norm: bool, half: bool) -> torch.Tensor:
"""Convert ``PIL.Image`` to Tensor.
Args:
image (np.ndarray): The image data read by ``PIL.Image``
range_norm (bool): Scale [0, 1] data to between [-1, 1]
half (bool): Whether to convert torch.float32 similarly to torch.half type.
Returns:
Normalized image data
Examples:
>>> image = Image.open("image.bmp")
>>> tensor_image = image2tensor(image, range_norm=False, half=False)
"""
tensor = F.to_tensor(image)
if range_norm:
tensor = tensor.mul_(2.0).sub_(1.0)
if half:
tensor = tensor.half()
return tensor
def tensor2image(tensor: torch.Tensor, range_norm: bool, half: bool) -> Any:
"""Converts ``torch.Tensor`` to ``PIL.Image``.
Args:
tensor (torch.Tensor): The image that needs to be converted to ``PIL.Image``
range_norm (bool): Scale [-1, 1] data to between [0, 1]
half (bool): Whether to convert torch.float32 similarly to torch.half type.
Returns:
Convert image data to support PIL library
Examples:
>>> tensor = torch.randn([1, 3, 128, 128])
>>> image = tensor2image(tensor, range_norm=False, half=False)
"""
if range_norm:
tensor = tensor.add_(1.0).div_(2.0)
if half:
tensor = tensor.half()
image = tensor.squeeze_(0).permute(1, 2, 0).mul_(255).clamp_(0, 255).cpu().numpy().astype("uint8")
return image
def convert_rgb_to_y(image: Any) -> Any:
"""Convert RGB image or tensor image data to YCbCr(Y) format.
Args:
image: RGB image data read by ``PIL.Image''.
Returns:
Y image array data.
"""
if type(image) == np.ndarray:
return 16. + (64.738 * image[:, :, 0] + 129.057 * image[:, :, 1] + 25.064 * image[:, :, 2]) / 256.
elif type(image) == torch.Tensor:
if len(image.shape) == 4:
image = image.squeeze_(0)
return 16. + (64.738 * image[0, :, :] + 129.057 * image[1, :, :] + 25.064 * image[2, :, :]) / 256.
else:
raise Exception("Unknown Type", type(image))
def convert_rgb_to_ycbcr(image: Any) -> Any:
"""Convert RGB image or tensor image data to YCbCr format.
Args:
image: RGB image data read by ``PIL.Image''.
Returns:
YCbCr image array data.
"""
if type(image) == np.ndarray:
y = 16. + (64.738 * image[:, :, 0] + 129.057 * image[:, :, 1] + 25.064 * image[:, :, 2]) / 256.
cb = 128. + (-37.945 * image[:, :, 0] - 74.494 * image[:, :, 1] + 112.439 * image[:, :, 2]) / 256.
cr = 128. + (112.439 * image[:, :, 0] - 94.154 * image[:, :, 1] - 18.285 * image[:, :, 2]) / 256.
return np.array([y, cb, cr]).transpose([1, 2, 0])
elif type(image) == torch.Tensor:
if len(image.shape) == 4:
image = image.squeeze(0)
y = 16. + (64.738 * image[0, :, :] + 129.057 * image[1, :, :] + 25.064 * image[2, :, :]) / 256.
cb = 128. + (-37.945 * image[0, :, :] - 74.494 * image[1, :, :] + 112.439 * image[2, :, :]) / 256.
cr = 128. + (112.439 * image[0, :, :] - 94.154 * image[1, :, :] - 18.285 * image[2, :, :]) / 256.
return torch.cat([y, cb, cr], 0).permute(1, 2, 0)
else:
raise Exception("Unknown Type", type(image))
def convert_ycbcr_to_rgb(image: Any) -> Any:
"""Convert YCbCr format image to RGB format.
Args:
image: YCbCr image data read by ``PIL.Image''.
Returns:
RGB image array data.
"""
if type(image) == np.ndarray:
r = 298.082 * image[:, :, 0] / 256. + 408.583 * image[:, :, 2] / 256. - 222.921
g = 298.082 * image[:, :, 0] / 256. - 100.291 * image[:, :, 1] / 256. - 208.120 * image[:, :, 2] / 256. + 135.576
b = 298.082 * image[:, :, 0] / 256. + 516.412 * image[:, :, 1] / 256. - 276.836
return np.array([r, g, b]).transpose([1, 2, 0])
elif type(image) == torch.Tensor:
if len(image.shape) == 4:
image = image.squeeze(0)
r = 298.082 * image[0, :, :] / 256. + 408.583 * image[2, :, :] / 256. - 222.921
g = 298.082 * image[0, :, :] / 256. - 100.291 * image[1, :, :] / 256. - 208.120 * image[2, :, :] / 256. + 135.576
b = 298.082 * image[0, :, :] / 256. + 516.412 * image[1, :, :] / 256. - 276.836
return torch.cat([r, g, b], 0).permute(1, 2, 0)
else:
raise Exception("Unknown Type", type(image))
def center_crop(lr: Any, hr: Any, image_size: int, upscale_factor: int) -> [Any, Any]:
"""Cut ``PIL.Image`` in the center area of the image.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
image_size (int): The size of the captured image area. It should be the size of the high-resolution image.
upscale_factor (int): magnification factor.
Returns:
Randomly cropped low-resolution images and high-resolution images.
"""
w, h = hr.size
left = (w - image_size) // 2
top = (h - image_size) // 2
right = left + image_size
bottom = top + image_size
lr = lr.crop((left // upscale_factor,
top // upscale_factor,
right // upscale_factor,
bottom // upscale_factor))
hr = hr.crop((left, top, right, bottom))
return lr, hr
def random_crop(lr: Any, hr: Any, image_size: int, upscale_factor: int) -> [Any, Any]:
"""Will ``PIL.Image`` randomly capture the specified area of the image.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
image_size (int): The size of the captured image area. It should be the size of the high-resolution image.
upscale_factor (int): magnification factor.
Returns:
Randomly cropped low-resolution images and high-resolution images.
"""
w, h = hr.size
left = torch.randint(0, w - image_size + 1, size=(1,)).item()
top = torch.randint(0, h - image_size + 1, size=(1,)).item()
right = left + image_size
bottom = top + image_size
lr = lr.crop((left // upscale_factor,
top // upscale_factor,
right // upscale_factor,
bottom // upscale_factor))
hr = hr.crop((left, top, right, bottom))
return lr, hr
def random_rotate(lr: Any, hr: Any, angle: int) -> [Any, Any]:
"""Will ``PIL.Image`` randomly rotate the image.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
angle (int): rotation angle, clockwise and counterclockwise rotation.
Returns:
Randomly rotated low-resolution images and high-resolution images.
"""
angle = random.choice((+angle, -angle))
lr = F.rotate(lr, angle)
hr = F.rotate(hr, angle)
return lr, hr
def random_horizontally_flip(lr: Any, hr: Any, p=0.5) -> [Any, Any]:
"""Flip the ``PIL.Image`` image horizontally randomly.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
p (optional, float): rollover probability. (Default: 0.5)
Returns:
Low-resolution image and high-resolution image after random horizontal flip.
"""
if torch.rand(1).item() > p:
lr = F.hflip(lr)
hr = F.hflip(hr)
return lr, hr
def random_vertically_flip(lr: Any, hr: Any, p=0.5) -> [Any, Any]:
"""Turn the ``PIL.Image`` image upside down randomly.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
p (optional, float): rollover probability. (Default: 0.5)
Returns:
Randomly rotated up and down low-resolution images and high-resolution images.
"""
if torch.rand(1).item() > p:
lr = F.vflip(lr)
hr = F.vflip(hr)
return lr, hr
def random_adjust_brightness(lr: Any, hr: Any) -> [Any, Any]:
"""Set ``PIL.Image`` to randomly adjust the image brightness.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
Returns:
Low-resolution image and high-resolution image with randomly adjusted brightness.
"""
# Randomly adjust the brightness gain range.
factor = random.uniform(0.5, 2)
lr = F.adjust_brightness(lr, factor)
hr = F.adjust_brightness(hr, factor)
return lr, hr
def random_adjust_contrast(lr: Any, hr: Any) -> [Any, Any]:
"""Set ``PIL.Image`` to randomly adjust the image contrast.
Args:
lr: Low-resolution image data read by ``PIL.Image``.
hr: High-resolution image data read by ``PIL.Image``.
Returns:
Low-resolution image and high-resolution image with randomly adjusted contrast.
"""
# Randomly adjust the contrast gain range.
factor = random.uniform(0.5, 2)
lr = F.adjust_contrast(lr, factor)
hr = F.adjust_contrast(hr, factor)
return lr, hr
#### metrics to compute -- assumes single images, i.e., tensor of 3 dims
def img_mae(x1, x2):
m = torch.abs(x1-x2).mean()
return m
def img_mse(x1, x2):
m = torch.pow(torch.abs(x1-x2),2).mean()
return m
def img_psnr(x1, x2):
m = kornia.metrics.psnr(x1, x2, 1)
return m
def img_ssim(x1, x2):
m = kornia.metrics.ssim(x1.unsqueeze(0), x2.unsqueeze(0), 5)
m = m.mean()
return m
def show_SR_w_uncer(xLR, xHR, xSR, xSRvar, elim=(0,0.01), ulim=(0,0.15)):
'''
xLR/SR/HR: 3xHxW
xSRvar: 1xHxW
'''
plt.figure(figsize=(30,10))
plt.subplot(1,5,1)
plt.imshow(xLR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1))
plt.axis('off')
plt.subplot(1,5,2)
plt.imshow(xHR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1))
plt.axis('off')
plt.subplot(1,5,3)
plt.imshow(xSR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1))
plt.axis('off')
plt.subplot(1,5,4)
error_map = torch.mean(torch.pow(torch.abs(xSR-xHR),2), dim=0).to('cpu').data.unsqueeze(0)
print('error', error_map.min(), error_map.max())
plt.imshow(error_map.transpose(0,2).transpose(0,1), cmap='jet')
plt.clim(elim[0], elim[1])
plt.axis('off')
plt.subplot(1,5,5)
print('uncer', xSRvar.min(), xSRvar.max())
plt.imshow(xSRvar.to('cpu').data.transpose(0,2).transpose(0,1), cmap='hot')
plt.clim(ulim[0], ulim[1])
plt.axis('off')
plt.subplots_adjust(wspace=0, hspace=0)
plt.show()
def show_SR_w_err(xLR, xHR, xSR, elim=(0,0.01), task=None, xMask=None):
'''
xLR/SR/HR: 3xHxW
'''
plt.figure(figsize=(30,10))
if task != 'm':
plt.subplot(1,4,1)
plt.imshow(xLR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1))
plt.axis('off')
plt.subplot(1,4,2)
plt.imshow(xHR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1))
plt.axis('off')
plt.subplot(1,4,3)
plt.imshow(xSR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1))
plt.axis('off')
else:
plt.subplot(1,4,1)
plt.imshow(xLR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1), cmap='gray')
plt.clim(0,0.9)
plt.axis('off')
plt.subplot(1,4,2)
plt.imshow(xHR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1), cmap='gray')
plt.clim(0,0.9)
plt.axis('off')
plt.subplot(1,4,3)
plt.imshow(xSR.to('cpu').data.clip(0,1).transpose(0,2).transpose(0,1), cmap='gray')
plt.clim(0,0.9)
plt.axis('off')
plt.subplot(1,4,4)
if task == 'inpainting':
error_map = torch.mean(torch.pow(torch.abs(xSR-xHR),2), dim=0).to('cpu').data.unsqueeze(0)*xMask.to('cpu').data
else:
error_map = torch.mean(torch.pow(torch.abs(xSR-xHR),2), dim=0).to('cpu').data.unsqueeze(0)
print('error', error_map.min(), error_map.max())
plt.imshow(error_map.transpose(0,2).transpose(0,1), cmap='jet')
plt.clim(elim[0], elim[1])
plt.axis('off')
plt.subplots_adjust(wspace=0, hspace=0)
plt.show()
def show_uncer4(xSRvar1, xSRvar2, xSRvar3, xSRvar4, ulim=(0,0.15)):
'''
xSRvar: 1xHxW
'''
plt.figure(figsize=(30,10))
plt.subplot(1,4,1)
print('uncer', xSRvar1.min(), xSRvar1.max())
plt.imshow(xSRvar1.to('cpu').data.transpose(0,2).transpose(0,1), cmap='hot')
plt.clim(ulim[0], ulim[1])
plt.axis('off')
plt.subplot(1,4,2)
print('uncer', xSRvar2.min(), xSRvar2.max())
plt.imshow(xSRvar2.to('cpu').data.transpose(0,2).transpose(0,1), cmap='hot')
plt.clim(ulim[0], ulim[1])
plt.axis('off')
plt.subplot(1,4,3)
print('uncer', xSRvar3.min(), xSRvar3.max())
plt.imshow(xSRvar3.to('cpu').data.transpose(0,2).transpose(0,1), cmap='hot')
plt.clim(ulim[0], ulim[1])
plt.axis('off')
plt.subplot(1,4,4)
print('uncer', xSRvar4.min(), xSRvar4.max())
plt.imshow(xSRvar4.to('cpu').data.transpose(0,2).transpose(0,1), cmap='hot')
plt.clim(ulim[0], ulim[1])
plt.axis('off')
plt.subplots_adjust(wspace=0, hspace=0)
plt.show()
def get_UCE(list_err, list_yout_var, num_bins=100):
err_min = np.min(list_err)
err_max = np.max(list_err)
err_len = (err_max-err_min)/num_bins
num_points = len(list_err)
bin_stats = {}
for i in range(num_bins):
bin_stats[i] = {
'start_idx': err_min + i*err_len,
'end_idx': err_min + (i+1)*err_len,
'num_points': 0,
'mean_err': 0,
'mean_var': 0,
}
for e,v in zip(list_err, list_yout_var):
for i in range(num_bins):
if e>=bin_stats[i]['start_idx'] and e<bin_stats[i]['end_idx']:
bin_stats[i]['num_points'] += 1
bin_stats[i]['mean_err'] += e
bin_stats[i]['mean_var'] += v
uce = 0
eps = 1e-8
for i in range(num_bins):
bin_stats[i]['mean_err'] /= bin_stats[i]['num_points'] + eps
bin_stats[i]['mean_var'] /= bin_stats[i]['num_points'] + eps
bin_stats[i]['uce_bin'] = (bin_stats[i]['num_points']/num_points) \
*(np.abs(bin_stats[i]['mean_err'] - bin_stats[i]['mean_var']))
uce += bin_stats[i]['uce_bin']
list_x, list_y = [], []
for i in range(num_bins):
if bin_stats[i]['num_points']>0:
list_x.append(bin_stats[i]['mean_err'])
list_y.append(bin_stats[i]['mean_var'])
# sns.set_style('darkgrid')
# sns.scatterplot(x=list_x, y=list_y)
# sns.regplot(x=list_x, y=list_y, order=1)
# plt.xlabel('MSE', fontsize=34)
# plt.ylabel('Uncertainty', fontsize=34)
# plt.plot(list_x, list_x, color='r')
# plt.xlim(np.min(list_x), np.max(list_x))
# plt.ylim(np.min(list_err), np.max(list_x))
# plt.show()
return bin_stats, uce
##################### training BayesCap
def train_BayesCap(
NetC,
NetG,
train_loader,
eval_loader,
Cri = TempCombLoss(),
device='cuda',
dtype=torch.cuda.FloatTensor(),
init_lr=1e-4,
num_epochs=100,
eval_every=1,
ckpt_path='../ckpt/BayesCap',
T1=1e0,
T2=5e-2,
task=None,
):
NetC.to(device)
NetC.train()
NetG.to(device)
NetG.eval()
optimizer = torch.optim.Adam(list(NetC.parameters()), lr=init_lr)
optim_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, num_epochs)
score = -1e8
all_loss = []
for eph in range(num_epochs):
eph_loss = 0
with tqdm(train_loader, unit='batch') as tepoch:
for (idx, batch) in enumerate(tepoch):
if idx>2000:
break
tepoch.set_description('Epoch {}'.format(eph))
##
xLR, xHR = batch[0].to(device), batch[1].to(device)
xLR, xHR = xLR.type(dtype), xHR.type(dtype)
if task == 'inpainting':
xMask = random_mask(xLR.shape[0], (xLR.shape[2], xLR.shape[3]))
xMask = xMask.to(device).type(dtype)
# pass them through the network
with torch.no_grad():
if task == 'inpainting':
_, xSR1 = NetG(xLR, xMask)
elif task == 'depth':
xSR1 = NetG(xLR)[("disp", 0)]
else:
xSR1 = NetG(xLR)
# with torch.autograd.set_detect_anomaly(True):
xSR = xSR1.clone()
xSRC_mu, xSRC_alpha, xSRC_beta = NetC(xSR)
# print(xSRC_alpha)
optimizer.zero_grad()
if task == 'depth':
loss = Cri(xSRC_mu, xSRC_alpha, xSRC_beta, xSR, T1=T1, T2=T2)
else:
loss = Cri(xSRC_mu, xSRC_alpha, xSRC_beta, xHR, T1=T1, T2=T2)
# print(loss)
loss.backward()
optimizer.step()
##
eph_loss += loss.item()
tepoch.set_postfix(loss=loss.item())
eph_loss /= len(train_loader)
all_loss.append(eph_loss)
print('Avg. loss: {}'.format(eph_loss))
# evaluate and save the models
torch.save(NetC.state_dict(), ckpt_path+'_last.pth')
if eph%eval_every == 0:
curr_score = eval_BayesCap(
NetC,
NetG,
eval_loader,
device=device,
dtype=dtype,
task=task,
)
print('current score: {} | Last best score: {}'.format(curr_score, score))
if curr_score >= score:
score = curr_score
torch.save(NetC.state_dict(), ckpt_path+'_best.pth')
optim_scheduler.step()
#### get different uncertainty maps
def get_uncer_BayesCap(
NetC,
NetG,
xin,
task=None,
xMask=None,
):
with torch.no_grad():
if task == 'inpainting':
_, xSR = NetG(xin, xMask)
else:
xSR = NetG(xin)
xSRC_mu, xSRC_alpha, xSRC_beta = NetC(xSR)
a_map = (1/(xSRC_alpha + 1e-5)).to('cpu').data
b_map = xSRC_beta.to('cpu').data
xSRvar = (a_map**2)*(torch.exp(torch.lgamma(3/(b_map + 1e-2)))/torch.exp(torch.lgamma(1/(b_map + 1e-2))))
return xSRvar
def get_uncer_TTDAp(
NetG,
xin,
p_mag=0.05,
num_runs=50,
task=None,
xMask=None,
):
list_xSR = []
with torch.no_grad():
for z in range(num_runs):
if task == 'inpainting':
_, xSRz = NetG(xin+p_mag*xin.max()*torch.randn_like(xin), xMask)
else:
xSRz = NetG(xin+p_mag*xin.max()*torch.randn_like(xin))
list_xSR.append(xSRz)
xSRmean = torch.mean(torch.cat(list_xSR, dim=0), dim=0).unsqueeze(0)
xSRvar = torch.mean(torch.var(torch.cat(list_xSR, dim=0), dim=0), dim=0).unsqueeze(0).unsqueeze(1)
return xSRvar
def get_uncer_DO(
NetG,
xin,
dop=0.2,
num_runs=50,
task=None,
xMask=None,
):
list_xSR = []
with torch.no_grad():
for z in range(num_runs):
if task == 'inpainting':
_, xSRz = NetG(xin, xMask, dop=dop)
else:
xSRz = NetG(xin, dop=dop)
list_xSR.append(xSRz)
xSRmean = torch.mean(torch.cat(list_xSR, dim=0), dim=0).unsqueeze(0)
xSRvar = torch.mean(torch.var(torch.cat(list_xSR, dim=0), dim=0), dim=0).unsqueeze(0).unsqueeze(1)
return xSRvar
################### Different eval functions
def eval_BayesCap(
NetC,
NetG,
eval_loader,
device='cuda',
dtype=torch.cuda.FloatTensor,
task=None,
xMask=None,
):
NetC.to(device)
NetC.eval()
NetG.to(device)
NetG.eval()
mean_ssim = 0
mean_psnr = 0
mean_mse = 0
mean_mae = 0
num_imgs = 0
list_error = []
list_var = []
with tqdm(eval_loader, unit='batch') as tepoch:
for (idx, batch) in enumerate(tepoch):
tepoch.set_description('Validating ...')
##
xLR, xHR = batch[0].to(device), batch[1].to(device)
xLR, xHR = xLR.type(dtype), xHR.type(dtype)
if task == 'inpainting':
if xMask==None:
xMask = random_mask(xLR.shape[0], (xLR.shape[2], xLR.shape[3]))
xMask = xMask.to(device).type(dtype)
else:
xMask = xMask.to(device).type(dtype)
# pass them through the network
with torch.no_grad():
if task == 'inpainting':
_, xSR = NetG(xLR, xMask)
elif task == 'depth':
xSR = NetG(xLR)[("disp", 0)]
else:
xSR = NetG(xLR)
xSRC_mu, xSRC_alpha, xSRC_beta = NetC(xSR)
a_map = (1/(xSRC_alpha + 1e-5)).to('cpu').data
b_map = xSRC_beta.to('cpu').data
xSRvar = (a_map**2)*(torch.exp(torch.lgamma(3/(b_map + 1e-2)))/torch.exp(torch.lgamma(1/(b_map + 1e-2))))
n_batch = xSRC_mu.shape[0]
if task == 'depth':
xHR = xSR
for j in range(n_batch):
num_imgs += 1
mean_ssim += img_ssim(xSRC_mu[j], xHR[j])
mean_psnr += img_psnr(xSRC_mu[j], xHR[j])
mean_mse += img_mse(xSRC_mu[j], xHR[j])
mean_mae += img_mae(xSRC_mu[j], xHR[j])
show_SR_w_uncer(xLR[j], xHR[j], xSR[j], xSRvar[j])
error_map = torch.mean(torch.pow(torch.abs(xSR[j]-xHR[j]),2), dim=0).to('cpu').data.reshape(-1)
var_map = xSRvar[j].to('cpu').data.reshape(-1)
list_error.extend(list(error_map.numpy()))
list_var.extend(list(var_map.numpy()))
##
mean_ssim /= num_imgs
mean_psnr /= num_imgs
mean_mse /= num_imgs
mean_mae /= num_imgs
print(
'Avg. SSIM: {} | Avg. PSNR: {} | Avg. MSE: {} | Avg. MAE: {}'.format
(
mean_ssim, mean_psnr, mean_mse, mean_mae
)
)
# print(len(list_error), len(list_var))
# print('UCE: ', get_UCE(list_error[::10], list_var[::10], num_bins=500)[1])
# print('C.Coeff: ', np.corrcoef(np.array(list_error[::10]), np.array(list_var[::10])))
return mean_ssim
def eval_TTDA_p(
NetG,
eval_loader,
device='cuda',
dtype=torch.cuda.FloatTensor,
p_mag=0.05,
num_runs=50,
task = None,
xMask = None,
):
NetG.to(device)
NetG.eval()
mean_ssim = 0
mean_psnr = 0
mean_mse = 0
mean_mae = 0
num_imgs = 0
with tqdm(eval_loader, unit='batch') as tepoch:
for (idx, batch) in enumerate(tepoch):
tepoch.set_description('Validating ...')
##
xLR, xHR = batch[0].to(device), batch[1].to(device)
xLR, xHR = xLR.type(dtype), xHR.type(dtype)
# pass them through the network
list_xSR = []
with torch.no_grad():
if task=='inpainting':
_, xSR = NetG(xLR, xMask)
else:
xSR = NetG(xLR)
for z in range(num_runs):
xSRz = NetG(xLR+p_mag*xLR.max()*torch.randn_like(xLR))
list_xSR.append(xSRz)
xSRmean = torch.mean(torch.cat(list_xSR, dim=0), dim=0).unsqueeze(0)
xSRvar = torch.mean(torch.var(torch.cat(list_xSR, dim=0), dim=0), dim=0).unsqueeze(0).unsqueeze(1)
n_batch = xSR.shape[0]
for j in range(n_batch):
num_imgs += 1
mean_ssim += img_ssim(xSR[j], xHR[j])
mean_psnr += img_psnr(xSR[j], xHR[j])
mean_mse += img_mse(xSR[j], xHR[j])
mean_mae += img_mae(xSR[j], xHR[j])
show_SR_w_uncer(xLR[j], xHR[j], xSR[j], xSRvar[j])
mean_ssim /= num_imgs
mean_psnr /= num_imgs
mean_mse /= num_imgs
mean_mae /= num_imgs
print(
'Avg. SSIM: {} | Avg. PSNR: {} | Avg. MSE: {} | Avg. MAE: {}'.format
(
mean_ssim, mean_psnr, mean_mse, mean_mae
)
)
return mean_ssim
def eval_DO(
NetG,
eval_loader,
device='cuda',
dtype=torch.cuda.FloatTensor,
dop=0.2,
num_runs=50,
task=None,
xMask=None,
):
NetG.to(device)
NetG.eval()
mean_ssim = 0
mean_psnr = 0
mean_mse = 0
mean_mae = 0
num_imgs = 0
with tqdm(eval_loader, unit='batch') as tepoch:
for (idx, batch) in enumerate(tepoch):
tepoch.set_description('Validating ...')
##
xLR, xHR = batch[0].to(device), batch[1].to(device)
xLR, xHR = xLR.type(dtype), xHR.type(dtype)
# pass them through the network
list_xSR = []
with torch.no_grad():
if task == 'inpainting':
_, xSR = NetG(xLR, xMask)
else:
xSR = NetG(xLR)
for z in range(num_runs):
xSRz = NetG(xLR, dop=dop)
list_xSR.append(xSRz)
xSRmean = torch.mean(torch.cat(list_xSR, dim=0), dim=0).unsqueeze(0)
xSRvar = torch.mean(torch.var(torch.cat(list_xSR, dim=0), dim=0), dim=0).unsqueeze(0).unsqueeze(1)
n_batch = xSR.shape[0]
for j in range(n_batch):
num_imgs += 1
mean_ssim += img_ssim(xSR[j], xHR[j])
mean_psnr += img_psnr(xSR[j], xHR[j])
mean_mse += img_mse(xSR[j], xHR[j])
mean_mae += img_mae(xSR[j], xHR[j])
show_SR_w_uncer(xLR[j], xHR[j], xSR[j], xSRvar[j])
##
mean_ssim /= num_imgs
mean_psnr /= num_imgs
mean_mse /= num_imgs
mean_mae /= num_imgs
print(
'Avg. SSIM: {} | Avg. PSNR: {} | Avg. MSE: {} | Avg. MAE: {}'.format
(
mean_ssim, mean_psnr, mean_mse, mean_mae
)
)
return mean_ssim
############### compare all function
def compare_all(
NetC,
NetG,
eval_loader,
p_mag = 0.05,
dop = 0.2,
num_runs = 100,
device='cuda',
dtype=torch.cuda.FloatTensor,
task=None,
):
NetC.to(device)
NetC.eval()
NetG.to(device)
NetG.eval()
with tqdm(eval_loader, unit='batch') as tepoch:
for (idx, batch) in enumerate(tepoch):
tepoch.set_description('Comparing ...')
##
xLR, xHR = batch[0].to(device), batch[1].to(device)
xLR, xHR = xLR.type(dtype), xHR.type(dtype)
if task == 'inpainting':
xMask = random_mask(xLR.shape[0], (xLR.shape[2], xLR.shape[3]))
xMask = xMask.to(device).type(dtype)
# pass them through the network
with torch.no_grad():
if task == 'inpainting':
_, xSR = NetG(xLR, xMask)
else:
xSR = NetG(xLR)
xSRC_mu, xSRC_alpha, xSRC_beta = NetC(xSR)
if task == 'inpainting':
xSRvar1 = get_uncer_TTDAp(NetG, xLR, p_mag=p_mag, num_runs=num_runs, task='inpainting', xMask=xMask)
xSRvar2 = get_uncer_DO(NetG, xLR, dop=dop, num_runs=num_runs, task='inpainting', xMask=xMask)
xSRvar3 = get_uncer_BayesCap(NetC, NetG, xLR, task='inpainting', xMask=xMask)
else:
xSRvar1 = get_uncer_TTDAp(NetG, xLR, p_mag=p_mag, num_runs=num_runs)
xSRvar2 = get_uncer_DO(NetG, xLR, dop=dop, num_runs=num_runs)
xSRvar3 = get_uncer_BayesCap(NetC, NetG, xLR)
print('bdg', xSRvar1.shape, xSRvar2.shape, xSRvar3.shape)
n_batch = xSR.shape[0]
for j in range(n_batch):
if task=='s':
show_SR_w_err(xLR[j], xHR[j], xSR[j])
show_uncer4(xSRvar1[j], torch.sqrt(xSRvar1[j]), torch.pow(xSRvar1[j], 0.48), torch.pow(xSRvar1[j], 0.42))
show_uncer4(xSRvar2[j], torch.sqrt(xSRvar2[j]), torch.pow(xSRvar3[j], 1.5), xSRvar3[j])
if task=='d':
show_SR_w_err(xLR[j], xHR[j], 0.5*xSR[j]+0.5*xHR[j])
show_uncer4(xSRvar1[j], torch.sqrt(xSRvar1[j]), torch.pow(xSRvar1[j], 0.48), torch.pow(xSRvar1[j], 0.42))
show_uncer4(xSRvar2[j], torch.sqrt(xSRvar2[j]), torch.pow(xSRvar3[j], 0.8), xSRvar3[j])
if task=='inpainting':
show_SR_w_err(xLR[j]*(1-xMask[j]), xHR[j], xSR[j], elim=(0,0.25), task='inpainting', xMask=xMask[j])
show_uncer4(xSRvar1[j], torch.sqrt(xSRvar1[j]), torch.pow(xSRvar1[j], 0.45), torch.pow(xSRvar1[j], 0.4))
show_uncer4(xSRvar2[j], torch.sqrt(xSRvar2[j]), torch.pow(xSRvar3[j], 0.8), xSRvar3[j])
if task=='m':
show_SR_w_err(xLR[j], xHR[j], xSR[j], elim=(0,0.04), task='m')
show_uncer4(0.4*xSRvar1[j]+0.6*xSRvar2[j], torch.sqrt(xSRvar1[j]), torch.pow(xSRvar1[j], 0.48), torch.pow(xSRvar1[j], 0.42), ulim=(0.02,0.15))
show_uncer4(xSRvar2[j], torch.sqrt(xSRvar2[j]), torch.pow(xSRvar3[j], 1.5), xSRvar3[j], ulim=(0.02,0.15))
################# Degrading Identity
def degrage_BayesCap_p(
NetC,
NetG,
eval_loader,
device='cuda',
dtype=torch.cuda.FloatTensor,
num_runs=50,
):
NetC.to(device)
NetC.eval()
NetG.to(device)
NetG.eval()
p_mag_list = [0, 0.05, 0.1, 0.15, 0.2]
list_s = []
list_p = []
list_u1 = []
list_u2 = []
list_c = []
for p_mag in p_mag_list:
mean_ssim = 0
mean_psnr = 0
mean_mse = 0
mean_mae = 0
num_imgs = 0
list_error = []
list_error2 = []
list_var = []
with tqdm(eval_loader, unit='batch') as tepoch:
for (idx, batch) in enumerate(tepoch):
tepoch.set_description('Validating ...')
##
xLR, xHR = batch[0].to(device), batch[1].to(device)
xLR, xHR = xLR.type(dtype), xHR.type(dtype)
# pass them through the network
with torch.no_grad():
xSR = NetG(xLR)
xSRC_mu, xSRC_alpha, xSRC_beta = NetC(xSR + p_mag*xSR.max()*torch.randn_like(xSR))
a_map = (1/(xSRC_alpha + 1e-5)).to('cpu').data
b_map = xSRC_beta.to('cpu').data
xSRvar = (a_map**2)*(torch.exp(torch.lgamma(3/(b_map + 1e-2)))/torch.exp(torch.lgamma(1/(b_map + 1e-2))))
n_batch = xSRC_mu.shape[0]
for j in range(n_batch):
num_imgs += 1
mean_ssim += img_ssim(xSRC_mu[j], xSR[j])
mean_psnr += img_psnr(xSRC_mu[j], xSR[j])
mean_mse += img_mse(xSRC_mu[j], xSR[j])
mean_mae += img_mae(xSRC_mu[j], xSR[j])
error_map = torch.mean(torch.pow(torch.abs(xSR[j]-xHR[j]),2), dim=0).to('cpu').data.reshape(-1)
error_map2 = torch.mean(torch.pow(torch.abs(xSRC_mu[j]-xHR[j]),2), dim=0).to('cpu').data.reshape(-1)
var_map = xSRvar[j].to('cpu').data.reshape(-1)
list_error.extend(list(error_map.numpy()))
list_error2.extend(list(error_map2.numpy()))
list_var.extend(list(var_map.numpy()))
##
mean_ssim /= num_imgs
mean_psnr /= num_imgs
mean_mse /= num_imgs
mean_mae /= num_imgs
print(
'Avg. SSIM: {} | Avg. PSNR: {} | Avg. MSE: {} | Avg. MAE: {}'.format
(
mean_ssim, mean_psnr, mean_mse, mean_mae
)
)
uce1 = get_UCE(list_error[::100], list_var[::100], num_bins=200)[1]
uce2 = get_UCE(list_error2[::100], list_var[::100], num_bins=200)[1]
print('UCE1: ', uce1)
print('UCE2: ', uce2)
list_s.append(mean_ssim.item())
list_p.append(mean_psnr.item())
list_u1.append(uce1)
list_u2.append(uce2)
plt.plot(list_s)
plt.show()
plt.plot(list_p)
plt.show()
plt.plot(list_u1, label='wrt SR output')
plt.plot(list_u2, label='wrt BayesCap output')
plt.legend()
plt.show()
sns.set_style('darkgrid')
fig,ax = plt.subplots()
# make a plot
ax.plot(p_mag_list, list_s, color="red", marker="o")
# set x-axis label
ax.set_xlabel("Reducing faithfulness of BayesCap Reconstruction",fontsize=10)
# set y-axis label
ax.set_ylabel("SSIM btwn BayesCap and SRGAN outputs", color="red",fontsize=10)
# twin object for two different y-axis on the sample plot
ax2=ax.twinx()
# make a plot with different y-axis using second axis object
ax2.plot(p_mag_list, list_u1, color="blue", marker="o", label='UCE wrt to error btwn SRGAN output and GT')
ax2.plot(p_mag_list, list_u2, color="orange", marker="o", label='UCE wrt to error btwn BayesCap output and GT')
ax2.set_ylabel("UCE", color="green", fontsize=10)
plt.legend(fontsize=10)
plt.tight_layout()
plt.show()
################# DeepFill_v2
# ----------------------------------------
# PATH processing
# ----------------------------------------
def text_readlines(filename):
# Try to read a txt file and return a list.Return [] if there was a mistake.
try:
file = open(filename, 'r')
except IOError:
error = []
return error
content = file.readlines()
# This for loop deletes the EOF (like \n)
for i in range(len(content)):
content[i] = content[i][:len(content[i])-1]
file.close()
return content
def savetxt(name, loss_log):
np_loss_log = np.array(loss_log)
np.savetxt(name, np_loss_log)
def get_files(path):
# read a folder, return the complete path
ret = []
for root, dirs, files in os.walk(path):
for filespath in files:
ret.append(os.path.join(root, filespath))
return ret
def get_names(path):
# read a folder, return the image name
ret = []
for root, dirs, files in os.walk(path):
for filespath in files:
ret.append(filespath)
return ret
def text_save(content, filename, mode = 'a'):
# save a list to a txt
# Try to save a list variable in txt file.
file = open(filename, mode)
for i in range(len(content)):
file.write(str(content[i]) + '\n')
file.close()
def check_path(path):
if not os.path.exists(path):
os.makedirs(path)
# ----------------------------------------
# Validation and Sample at training
# ----------------------------------------
def save_sample_png(sample_folder, sample_name, img_list, name_list, pixel_max_cnt = 255):
# Save image one-by-one
for i in range(len(img_list)):
img = img_list[i]
# Recover normalization: * 255 because last layer is sigmoid activated
img = img * 255
# Process img_copy and do not destroy the data of img
img_copy = img.clone().data.permute(0, 2, 3, 1)[0, :, :, :].cpu().numpy()
img_copy = np.clip(img_copy, 0, pixel_max_cnt)
img_copy = img_copy.astype(np.uint8)
img_copy = cv2.cvtColor(img_copy, cv2.COLOR_RGB2BGR)
# Save to certain path
save_img_name = sample_name + '_' + name_list[i] + '.jpg'
save_img_path = os.path.join(sample_folder, save_img_name)
cv2.imwrite(save_img_path, img_copy)
def psnr(pred, target, pixel_max_cnt = 255):
mse = torch.mul(target - pred, target - pred)
rmse_avg = (torch.mean(mse).item()) ** 0.5
p = 20 * np.log10(pixel_max_cnt / rmse_avg)
return p
def grey_psnr(pred, target, pixel_max_cnt = 255):
pred = torch.sum(pred, dim = 0)
target = torch.sum(target, dim = 0)
mse = torch.mul(target - pred, target - pred)
rmse_avg = (torch.mean(mse).item()) ** 0.5
p = 20 * np.log10(pixel_max_cnt * 3 / rmse_avg)
return p
def ssim(pred, target):
pred = pred.clone().data.permute(0, 2, 3, 1).cpu().numpy()
target = target.clone().data.permute(0, 2, 3, 1).cpu().numpy()
target = target[0]
pred = pred[0]
ssim = skimage.measure.compare_ssim(target, pred, multichannel = True)
return ssim
## for contextual attention
def extract_image_patches(images, ksizes, strides, rates, padding='same'):
"""
Extract patches from images and put them in the C output dimension.
:param padding:
:param images: [batch, channels, in_rows, in_cols]. A 4-D Tensor with shape
:param ksizes: [ksize_rows, ksize_cols]. The size of the sliding window for
each dimension of images
:param strides: [stride_rows, stride_cols]
:param rates: [dilation_rows, dilation_cols]
:return: A Tensor
"""
assert len(images.size()) == 4
assert padding in ['same', 'valid']
batch_size, channel, height, width = images.size()
if padding == 'same':
images = same_padding(images, ksizes, strides, rates)
elif padding == 'valid':
pass
else:
raise NotImplementedError('Unsupported padding type: {}.\
Only "same" or "valid" are supported.'.format(padding))
unfold = torch.nn.Unfold(kernel_size=ksizes,
dilation=rates,
padding=0,
stride=strides)
patches = unfold(images)
return patches # [N, C*k*k, L], L is the total number of such blocks
def same_padding(images, ksizes, strides, rates):
assert len(images.size()) == 4
batch_size, channel, rows, cols = images.size()
out_rows = (rows + strides[0] - 1) // strides[0]
out_cols = (cols + strides[1] - 1) // strides[1]
effective_k_row = (ksizes[0] - 1) * rates[0] + 1
effective_k_col = (ksizes[1] - 1) * rates[1] + 1
padding_rows = max(0, (out_rows-1)*strides[0]+effective_k_row-rows)
padding_cols = max(0, (out_cols-1)*strides[1]+effective_k_col-cols)
# Pad the input
padding_top = int(padding_rows / 2.)
padding_left = int(padding_cols / 2.)
padding_bottom = padding_rows - padding_top
padding_right = padding_cols - padding_left
paddings = (padding_left, padding_right, padding_top, padding_bottom)
images = torch.nn.ZeroPad2d(paddings)(images)
return images
def reduce_mean(x, axis=None, keepdim=False):
if not axis:
axis = range(len(x.shape))
for i in sorted(axis, reverse=True):
x = torch.mean(x, dim=i, keepdim=keepdim)
return x
def reduce_std(x, axis=None, keepdim=False):
if not axis:
axis = range(len(x.shape))
for i in sorted(axis, reverse=True):
x = torch.std(x, dim=i, keepdim=keepdim)
return x
def reduce_sum(x, axis=None, keepdim=False):
if not axis:
axis = range(len(x.shape))
for i in sorted(axis, reverse=True):
x = torch.sum(x, dim=i, keepdim=keepdim)
return x
def random_mask(num_batch=1, mask_shape=(256,256)):
list_mask = []
for _ in range(num_batch):
# rectangle mask
image_height = mask_shape[0]
image_width = mask_shape[1]
max_delta_height = image_height//8
max_delta_width = image_width//8
height = image_height//4
width = image_width//4
max_t = image_height - height
max_l = image_width - width
t = random.randint(0, max_t)
l = random.randint(0, max_l)
# bbox = (t, l, height, width)
h = random.randint(0, max_delta_height//2)
w = random.randint(0, max_delta_width//2)
mask = torch.zeros((1, 1, image_height, image_width))
mask[:, :, t+h:t+height-h, l+w:l+width-w] = 1
rect_mask = mask
# brush mask
min_num_vertex = 4
max_num_vertex = 12
mean_angle = 2 * math.pi / 5
angle_range = 2 * math.pi / 15
min_width = 12
max_width = 40
H, W = image_height, image_width
average_radius = math.sqrt(H*H+W*W) / 8
mask = Image.new('L', (W, H), 0)
for _ in range(np.random.randint(1, 4)):
num_vertex = np.random.randint(min_num_vertex, max_num_vertex)
angle_min = mean_angle - np.random.uniform(0, angle_range)
angle_max = mean_angle + np.random.uniform(0, angle_range)
angles = []
vertex = []
for i in range(num_vertex):
if i % 2 == 0:
angles.append(2*math.pi - np.random.uniform(angle_min, angle_max))
else:
angles.append(np.random.uniform(angle_min, angle_max))
h, w = mask.size
vertex.append((int(np.random.randint(0, w)), int(np.random.randint(0, h))))
for i in range(num_vertex):
r = np.clip(
np.random.normal(loc=average_radius, scale=average_radius//2),
0, 2*average_radius)
new_x = np.clip(vertex[-1][0] + r * math.cos(angles[i]), 0, w)
new_y = np.clip(vertex[-1][1] + r * math.sin(angles[i]), 0, h)
vertex.append((int(new_x), int(new_y)))
draw = ImageDraw.Draw(mask)
width = int(np.random.uniform(min_width, max_width))
draw.line(vertex, fill=255, width=width)
for v in vertex:
draw.ellipse((v[0] - width//2,
v[1] - width//2,
v[0] + width//2,
v[1] + width//2),
fill=255)
if np.random.normal() > 0:
mask.transpose(Image.FLIP_LEFT_RIGHT)
if np.random.normal() > 0:
mask.transpose(Image.FLIP_TOP_BOTTOM)
mask = transforms.ToTensor()(mask)
mask = mask.reshape((1, 1, H, W))
brush_mask = mask
mask = torch.cat([rect_mask, brush_mask], dim=1).max(dim=1, keepdim=True)[0]
list_mask.append(mask)
mask = torch.cat(list_mask, dim=0)
return mask