|
import numpy as np |
|
import cv2 |
|
|
|
import os |
|
import glob |
|
from tqdm import tqdm |
|
from pathlib import Path |
|
import torch |
|
import torch.nn.functional as F |
|
|
|
def warp_flow(img, flow): |
|
''' |
|
Applies to img the transformation described by flow. |
|
''' |
|
|
|
hf, wf = flow.shape[:2] |
|
|
|
flow[:, :, 0] += np.arange(wf) |
|
flow[:, :, 1] += np.arange(hf)[:, np.newaxis] |
|
res = cv2.remap(img, flow, None, cv2.INTER_LINEAR) |
|
return res |
|
|
|
def estimate_invflow(img0, img1, me_algo): |
|
''' |
|
Estimates inverse optical flow by using the me_algo algorithm. |
|
''' |
|
|
|
|
|
if me_algo == "DeepFlow": |
|
of_estim = cv2.optflow.createOptFlow_DeepFlow() |
|
else: |
|
raise Exception("Incorrect motion estimation algorithm") |
|
|
|
|
|
flow = of_estim.calc(img1, img0, None) |
|
|
|
|
|
return flow |
|
|
|
def align_frames(img_to_align, img_source, mc_alg='DeepFlow'): |
|
''' |
|
Applies to img_to_align a transformation which converts it into img_source. |
|
Args: |
|
img_to_align: HxWxC image |
|
img_source: HxWxC image |
|
mc_alg: selects between DeepFlow, SimpleFlow, and TVL1. DeepFlow runs by default. |
|
Returns: |
|
HxWxC aligned image |
|
''' |
|
if img_to_align.ndim == 2: |
|
img0 = img_to_align |
|
img1 = img_source |
|
else: |
|
img0 = img_to_align[:, :, 1] |
|
img1 = img_source[:, :, 1] |
|
out_img = None |
|
|
|
|
|
flow = estimate_invflow(img0, img1, mc_alg) |
|
|
|
|
|
|
|
out_img = warp_flow(img_to_align, flow.astype(np.float32)) |
|
|
|
return out_img, flow |
|
|
|
|
|
|
|
def SIFT(img1gray, img2gray): |
|
|
|
sift = cv2.xfeatures2d.SIFT_create() |
|
|
|
|
|
kp1, des1 = sift.detectAndCompute(img1gray, None) |
|
kp2, des2 = sift.detectAndCompute(img2gray, None) |
|
|
|
FLANN_INDEX_KDTREE = 1 |
|
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) |
|
search_params = dict(checks=10) |
|
flann = cv2.FlannBasedMatcher(index_params, search_params) |
|
matches = flann.knnMatch(des1, des2, k=2) |
|
|
|
|
|
|
|
matchesMask = [[0, 0] for i in range(len(matches))] |
|
|
|
good = [] |
|
|
|
for i, (m, n) in enumerate(matches): |
|
if m.distance < 0.65*n.distance: |
|
good.append(m) |
|
matchesMask[i] = [1, 0] |
|
|
|
|
|
MIN_MATCH_COUNT = 9 |
|
|
|
print(len(good)) |
|
|
|
if len(good) > MIN_MATCH_COUNT: |
|
src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2) |
|
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2) |
|
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3) |
|
|
|
else: |
|
print('error!!!!!!!!!!!!!!!!!!!!!!!!!!!') |
|
return |
|
|
|
|
|
return M |
|
|
|
|
|
|
|
|
|
def match_colors(im_ref, im_q, im_test): |
|
|
|
im_ref_mean_re = im_ref.view(*im_ref.shape[:2], -1) |
|
im_q_mean_re = im_q.view(*im_q.shape[:2], -1) |
|
|
|
|
|
c_mat_all = [] |
|
for ir, iq in zip(im_ref_mean_re, im_q_mean_re): |
|
c = torch.linalg.lstsq(iq.t(), ir.t()) |
|
c = c.solution[:im_ref_mean_re.size(1)] |
|
c_mat_all.append(c) |
|
|
|
c_mat = torch.stack(c_mat_all, dim=0) |
|
|
|
im_test_re = im_test.view(*im_test.shape[:2], -1) |
|
im_t_conv = torch.matmul(im_test_re.permute(0, 2, 1), c_mat).permute(0, 2, 1) |
|
im_t_conv = im_t_conv.view(im_test.shape) |
|
|
|
return im_t_conv |
|
|
|
def color_correction(gt, in_put, output, scale_factor=2): |
|
|
|
output_cor = match_channel_colors(gt, in_put, output) |
|
return output_cor |
|
|
|
def match_channel_colors(im_ref, im_q, im_test): |
|
|
|
im_ref_reshape = im_ref.view(*im_ref.shape[:2], -1) |
|
im_q_reshape = im_q.view(*im_q.shape[:2], -1) |
|
im_test_reshape = im_test.view(*im_test.shape[:2], -1) |
|
|
|
|
|
im_t_conv_list = [] |
|
for i in range(im_ref.size(1)): |
|
c_mat_all = [] |
|
for ir_batch, iq_batch in zip(im_ref_reshape[:, i:i+1, :], im_q_reshape[:, i:i+1, :]): |
|
c = torch.linalg.lstsq(iq_batch.t(), ir_batch.t()) |
|
c = c.solution[:1] |
|
c_mat_all.append(c) |
|
|
|
c_mat = torch.stack(c_mat_all, dim=0) |
|
|
|
im_t_conv = torch.matmul(im_test_reshape[:, i:i+1, :].permute(0, 2, 1), c_mat).permute(0, 2, 1) |
|
im_t_conv = im_t_conv.view(*im_t_conv.shape[:2], *im_test.shape[-2:]) |
|
im_t_conv_list.append(im_t_conv) |
|
|
|
im_t_conv = torch.cat(im_t_conv_list, dim=1) |
|
|
|
return im_t_conv |
|
|
|
|
|
|
|
|
|
def img2tensor(imgs, bgr2rgb=True, float32=True): |
|
"""Numpy array to tensor. |
|
|
|
Args: |
|
imgs (list[ndarray] | ndarray): Input images. |
|
bgr2rgb (bool): Whether to change bgr to rgb. |
|
float32 (bool): Whether to change to float32. |
|
|
|
Returns: |
|
list[tensor] | tensor: Tensor images. If returned results only have |
|
one element, just return tensor. |
|
""" |
|
|
|
def _totensor(img, bgr2rgb, float32): |
|
if img.shape[2] == 3 and bgr2rgb: |
|
if img.dtype == 'float64': |
|
img = img.astype('float32') |
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
img = torch.from_numpy(img.transpose(2, 0, 1)) |
|
if float32: |
|
img = img.float() |
|
return img |
|
|
|
if isinstance(imgs, list): |
|
return [_totensor(img, bgr2rgb, float32) for img in imgs] |
|
else: |
|
return _totensor(imgs, bgr2rgb, float32) |
|
|
|
|
|
def tensor2img(tensor, rgb2bgr=True, out_type=np.uint8, min_max=(0, 1)): |
|
"""Convert torch Tensors into image numpy arrays. |
|
|
|
After clamping to [min, max], values will be normalized to [0, 1]. |
|
|
|
Args: |
|
tensor (Tensor or list[Tensor]): Accept shapes: |
|
1) 4D mini-batch Tensor of shape (B x 3/1 x H x W); |
|
2) 3D Tensor of shape (3/1 x H x W); |
|
3) 2D Tensor of shape (H x W). |
|
Tensor channel should be in RGB order. |
|
rgb2bgr (bool): Whether to change rgb to bgr. |
|
out_type (numpy type): output types. If ``np.uint8``, transform outputs |
|
to uint8 type with range [0, 255]; otherwise, float type with |
|
range [0, 1]. Default: ``np.uint8``. |
|
min_max (tuple[int]): min and max values for clamp. |
|
|
|
Returns: |
|
(Tensor or list): 3D ndarray of shape (H x W x C) OR 2D ndarray of |
|
shape (H x W). The channel order is BGR. |
|
""" |
|
if not (torch.is_tensor(tensor) or (isinstance(tensor, list) and all(torch.is_tensor(t) for t in tensor))): |
|
raise TypeError(f'tensor or list of tensors expected, got {type(tensor)}') |
|
|
|
if torch.is_tensor(tensor): |
|
tensor = [tensor] |
|
result = [] |
|
for _tensor in tensor: |
|
_tensor = _tensor.squeeze(0).float().detach().cpu().clamp_(*min_max) |
|
_tensor = (_tensor - min_max[0]) / (min_max[1] - min_max[0]) |
|
|
|
n_dim = _tensor.dim() |
|
if n_dim == 4: |
|
img_np = make_grid(_tensor, nrow=int(math.sqrt(_tensor.size(0))), normalize=False).numpy() |
|
img_np = img_np.transpose(1, 2, 0) |
|
if rgb2bgr: |
|
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) |
|
elif n_dim == 3: |
|
img_np = _tensor.numpy() |
|
img_np = img_np.transpose(1, 2, 0) |
|
if img_np.shape[2] == 1: |
|
img_np = np.squeeze(img_np, axis=2) |
|
else: |
|
if rgb2bgr: |
|
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) |
|
elif n_dim == 2: |
|
img_np = _tensor.numpy() |
|
else: |
|
raise TypeError(f'Only support 4D, 3D or 2D tensor. But received with dimension: {n_dim}') |
|
if out_type == np.uint8: |
|
|
|
img_np = (img_np * 255.0).round() |
|
img_np = img_np.astype(out_type) |
|
result.append(img_np) |
|
if len(result) == 1: |
|
result = result[0] |
|
return result |