Spaces:
Runtime error
Runtime error
# !/usr/bin/env python3 | |
""" | |
codes for oilpainting style transfer. | |
""" | |
import paddle | |
import paddle.nn as nn | |
import paddle.nn.functional as F | |
import numpy as np | |
from PIL import Image | |
import math | |
# import cv2 | |
import render_utils | |
import time | |
def get_single_layer_lists(param, decision, ori_img, render_size_x, render_size_y, h, w, meta_brushes, dilation, erosion, stroke_num): | |
""" | |
get_single_layer_lists | |
""" | |
valid_foregrounds = render_utils.param2stroke(param[:, :], render_size_y, render_size_x, meta_brushes) | |
valid_alphas = (valid_foregrounds > 0).astype('float32') | |
valid_foregrounds = valid_foregrounds.reshape([-1, stroke_num, 1, render_size_y, render_size_x]) | |
valid_alphas = valid_alphas.reshape([-1, stroke_num, 1, render_size_y, render_size_x]) | |
temp = [dilation(valid_foregrounds[:, i, :, :, :]) for i in range(stroke_num)] | |
valid_foregrounds = paddle.stack(temp, axis=1) | |
valid_foregrounds = valid_foregrounds.reshape([-1, 1, render_size_y, render_size_x]) | |
temp = [erosion(valid_alphas[:, i, :, :, :]) for i in range(stroke_num)] | |
valid_alphas = paddle.stack(temp, axis=1) | |
valid_alphas = valid_alphas.reshape([-1, 1, render_size_y, render_size_x]) | |
patch_y = 4 * render_size_y // 5 | |
patch_x = 4 * render_size_x // 5 | |
img_patch = ori_img.reshape([1, 3, h, ori_img.shape[2]//h, w, ori_img.shape[3]//w]) | |
img_patch = img_patch.transpose([0, 2, 4, 1, 3, 5])[0] | |
xid_list = [] | |
yid_list = [] | |
error_list = [] | |
for flag_idx, flag in enumerate(decision.cpu().numpy()): | |
if flag: | |
flag_idx = flag_idx // stroke_num | |
x_id = flag_idx % w | |
flag_idx = flag_idx // w | |
y_id = flag_idx % h | |
xid_list.append(x_id) | |
yid_list.append(y_id) | |
inner_fores = valid_foregrounds[:, :, render_size_y // 10:9 * render_size_y // 10, | |
render_size_x // 10:9 * render_size_x // 10] | |
inner_alpha = valid_alphas[:, :, render_size_y // 10:9 * render_size_y // 10, | |
render_size_x // 10:9 * render_size_x // 10] | |
inner_fores = inner_fores.reshape([h * w, stroke_num, 1, patch_y, patch_x]) | |
inner_alpha = inner_alpha.reshape([h * w, stroke_num, 1, patch_y, patch_x]) | |
inner_real = img_patch.reshape([h * w, 3, patch_y, patch_x]).unsqueeze(1) | |
R = param[:, 5] | |
G = param[:, 6] | |
B = param[:, 7]#, G, B = param[5:] | |
R = R.reshape([-1, stroke_num]).unsqueeze(-1).unsqueeze(-1).unsqueeze(-1) | |
G = G.reshape([-1, stroke_num]).unsqueeze(-1).unsqueeze(-1).unsqueeze(-1) | |
B = B.reshape([-1, stroke_num]).unsqueeze(-1).unsqueeze(-1).unsqueeze(-1) | |
error_R = R * inner_fores - inner_real[:, :, 0:1, :, :] | |
error_G = G * inner_fores - inner_real[:, :, 1:2, :, :] | |
error_B = B * inner_fores - inner_real[:, :, 2:3, :, :] | |
error = paddle.abs(error_R) + paddle.abs(error_G)+ paddle.abs(error_B) | |
error = error * inner_alpha | |
error = paddle.sum(error, axis=(2, 3, 4)) / paddle.sum(inner_alpha, axis=(2, 3, 4)) | |
error_list = error.reshape([-1]).numpy()[decision.numpy()] | |
error_list = list(error_list) | |
valid_foregrounds = paddle.to_tensor(valid_foregrounds.numpy()[decision.numpy()]) | |
valid_alphas = paddle.to_tensor(valid_alphas.numpy()[decision.numpy()]) | |
selected_param = paddle.to_tensor(param.numpy()[decision.numpy()]) | |
return xid_list, yid_list, valid_foregrounds, valid_alphas, error_list, selected_param | |
def get_single_stroke_on_full_image_A(x_id, y_id, valid_foregrounds, valid_alphas, param, original_img, | |
render_size_x, render_size_y, patch_x, patch_y): | |
""" | |
get_single_stroke_on_full_image_A | |
""" | |
tmp_foreground = paddle.zeros_like(original_img) | |
patch_y_num = original_img.shape[2] // patch_y | |
patch_x_num = original_img.shape[3] // patch_x | |
brush = valid_foregrounds.unsqueeze(0) | |
color_map = param[5:] | |
brush = brush.tile([1, 3, 1, 1]) | |
color_map = color_map.unsqueeze(-1).unsqueeze(-1).unsqueeze(0)#.repeat(1, 1, H, W) | |
brush = brush * color_map | |
pad_l = x_id * patch_x | |
pad_r = (patch_x_num - x_id - 1) * patch_x | |
pad_t = y_id * patch_y | |
pad_b = (patch_y_num - y_id - 1) * patch_y | |
tmp_foreground = nn.functional.pad(brush, [pad_l, pad_r, pad_t, pad_b]) | |
tmp_foreground = tmp_foreground[:, :, render_size_y // 10:-render_size_y // 10, | |
render_size_x // 10:-render_size_x // 10] | |
tmp_alpha = nn.functional.pad(valid_alphas.unsqueeze(0), [pad_l, pad_r, pad_t, pad_b]) | |
tmp_alpha = tmp_alpha[:, :, render_size_y // 10:-render_size_y // 10, render_size_x // 10:-render_size_x // 10] | |
return tmp_foreground, tmp_alpha | |
def get_single_stroke_on_full_image_B(x_id, y_id, valid_foregrounds, valid_alphas, param, | |
original_img, render_size_x, render_size_y, patch_x, patch_y): | |
""" | |
get_single_stroke_on_full_image_B | |
""" | |
x_expand = patch_x // 2 + render_size_x // 10 | |
y_expand = patch_y // 2 + render_size_y // 10 | |
pad_l = x_id * patch_x | |
pad_r = original_img.shape[3] + 2 * x_expand - (x_id * patch_x + render_size_x) | |
pad_t = y_id * patch_y | |
pad_b = original_img.shape[2] + 2 * y_expand - (y_id * patch_y + render_size_y) | |
brush = valid_foregrounds.unsqueeze(0) | |
color_map = param[5:] | |
brush = brush.tile([1, 3, 1, 1]) | |
color_map = color_map.unsqueeze(-1).unsqueeze(-1).unsqueeze(0)#.repeat(1, 1, H, W) | |
brush = brush * color_map | |
tmp_foreground = nn.functional.pad(brush, [pad_l, pad_r, pad_t, pad_b]) | |
tmp_foreground = tmp_foreground[:, :, y_expand:- y_expand, x_expand:-x_expand] | |
tmp_alpha = nn.functional.pad(valid_alphas.unsqueeze(0), [pad_l, pad_r, pad_t, pad_b]) | |
tmp_alpha = tmp_alpha[:, :, y_expand:- y_expand, x_expand:-x_expand] | |
return tmp_foreground, tmp_alpha | |
def stroke_net_predict(img_patch, result_patch, patch_size, net_g, stroke_num): | |
""" | |
stroke_net_predict | |
""" | |
img_patch = img_patch.transpose([0, 2, 1]).reshape([-1, 3, patch_size, patch_size]) | |
result_patch = result_patch.transpose([0, 2, 1]).reshape([-1, 3, patch_size, patch_size]) | |
#*----- Stroke Predictor -----*# | |
shape_param, stroke_decision = net_g(img_patch, result_patch) | |
stroke_decision = (stroke_decision > 0).astype('float32') | |
#*----- sampling color -----*# | |
grid = shape_param[:, :, :2].reshape([img_patch.shape[0] * stroke_num, 1, 1, 2]) | |
img_temp = img_patch.unsqueeze(1).tile([1, stroke_num, 1, 1, 1]).reshape([ | |
img_patch.shape[0] * stroke_num, 3, patch_size, patch_size]) | |
color = nn.functional.grid_sample(img_temp, 2 * grid - 1, align_corners=False).reshape([ | |
img_patch.shape[0], stroke_num, 3]) | |
stroke_param = paddle.concat([shape_param, color], axis=-1) | |
param = stroke_param.reshape([-1, 8]) | |
decision = stroke_decision.reshape([-1]).astype('bool') | |
param[:, :2] = param[:, :2] / 1.25 + 0.1 | |
param[:, 2:4] = param[:, 2:4] / 1.25 | |
return param, decision | |
def sort_strokes(params, decision, scores): | |
""" | |
sort_strokes | |
""" | |
sorted_scores, sorted_index = paddle.sort(scores, axis=1, descending=False) | |
sorted_params = [] | |
for idx in range(8): | |
tmp_pick_params = paddle.gather(params[:, :, idx], axis=1, index=sorted_index) | |
sorted_params.append(tmp_pick_params) | |
sorted_params = paddle.stack(sorted_params, axis=2) | |
sorted_decison = paddle.gather(decision.squeeze(2), axis=1, index=sorted_index) | |
return sorted_params, sorted_decison | |
def render_serial(original_img, net_g, meta_brushes): | |
patch_size = 32 | |
stroke_num = 8 | |
H, W = original_img.shape[-2:] | |
K = max(math.ceil(math.log2(max(H, W) / patch_size)), 0) | |
dilation = render_utils.Dilation2d(m=1) | |
erosion = render_utils.Erosion2d(m=1) | |
frames_per_layer = [20, 20, 30, 40, 60] | |
final_frame_list = [] | |
with paddle.no_grad(): | |
#* ----- read in image and init canvas ----- *# | |
final_result = paddle.zeros_like(original_img) | |
for layer in range(0, K + 1): | |
t0 = time.time() | |
layer_size = patch_size * (2 ** layer) | |
img = nn.functional.interpolate(original_img, (layer_size, layer_size)) | |
result = nn.functional.interpolate(final_result, (layer_size, layer_size)) | |
img_patch = nn.functional.unfold(img, [patch_size, patch_size], | |
strides=[patch_size, patch_size]) | |
result_patch = nn.functional.unfold(result, [patch_size, patch_size], | |
strides=[patch_size, patch_size]) | |
h = (img.shape[2] - patch_size) // patch_size + 1 | |
w = (img.shape[3] - patch_size) // patch_size + 1 | |
render_size_y = int(1.25 * H // h) | |
render_size_x = int(1.25 * W // w) | |
#* -------------------------------------------------------------*# | |
#* -------------generate strokes on window type A---------------*# | |
#* -------------------------------------------------------------*# | |
param, decision = stroke_net_predict(img_patch, result_patch, patch_size, net_g, stroke_num) | |
expand_img = original_img | |
wA_xid_list, wA_yid_list, wA_fore_list, wA_alpha_list, wA_error_list, wA_params = \ | |
get_single_layer_lists(param, decision, original_img, render_size_x, render_size_y, h, w, | |
meta_brushes, dilation, erosion, stroke_num) | |
#* -------------------------------------------------------------*# | |
#* -------------generate strokes on window type B---------------*# | |
#* -------------------------------------------------------------*# | |
#*----- generate input canvas and target patches -----*# | |
wB_error_list = [] | |
img = nn.functional.pad(img, [patch_size // 2, patch_size // 2, | |
patch_size // 2, patch_size // 2]) | |
result = nn.functional.pad(result, [patch_size // 2, patch_size // 2, | |
patch_size // 2, patch_size // 2]) | |
img_patch = nn.functional.unfold(img, [patch_size, patch_size], | |
strides=[patch_size, patch_size]) | |
result_patch = nn.functional.unfold(result, [patch_size, patch_size], | |
strides=[patch_size, patch_size]) | |
h += 1 | |
w += 1 | |
param, decision = stroke_net_predict(img_patch, result_patch, patch_size, net_g, stroke_num) | |
patch_y = 4 * render_size_y // 5 | |
patch_x = 4 * render_size_x // 5 | |
expand_img = nn.functional.pad(original_img, [patch_x // 2, patch_x // 2, | |
patch_y // 2, patch_y // 2]) | |
wB_xid_list, wB_yid_list, wB_fore_list, wB_alpha_list, wB_error_list, wB_params = \ | |
get_single_layer_lists(param, decision, expand_img, render_size_x, render_size_y, h, w, | |
meta_brushes, dilation, erosion, stroke_num) | |
#* -------------------------------------------------------------*# | |
#* -------------rank strokes and plot stroke one by one---------*# | |
#* -------------------------------------------------------------*# | |
numA = len(wA_error_list) | |
numB = len(wB_error_list) | |
total_error_list = wA_error_list + wB_error_list | |
sort_list = list(np.argsort(total_error_list)) | |
sample = 0 | |
samples = np.linspace(0, len(sort_list) - 2, frames_per_layer[layer]).astype(int) | |
for ii in sort_list: | |
ii = int(ii) | |
if ii < numA: | |
x_id = wA_xid_list[ii] | |
y_id = wA_yid_list[ii] | |
valid_foregrounds = wA_fore_list[ii] | |
valid_alphas = wA_alpha_list[ii] | |
sparam = wA_params[ii] | |
tmp_foreground, tmp_alpha = get_single_stroke_on_full_image_A(x_id, y_id, | |
valid_foregrounds, valid_alphas, sparam, original_img, render_size_x, render_size_y, patch_x, patch_y) | |
else: | |
x_id = wB_xid_list[ii - numA] | |
y_id = wB_yid_list[ii - numA] | |
valid_foregrounds = wB_fore_list[ii - numA] | |
valid_alphas = wB_alpha_list[ii - numA] | |
sparam = wB_params[ii - numA] | |
tmp_foreground, tmp_alpha = get_single_stroke_on_full_image_B(x_id, y_id, | |
valid_foregrounds, valid_alphas, sparam, original_img, render_size_x, render_size_y, patch_x, patch_y) | |
final_result = tmp_foreground * tmp_alpha + (1 - tmp_alpha) * final_result | |
if sample in samples: | |
saveframe = (final_result.numpy().squeeze().transpose([1,2,0])[:,:,::-1] * 255).astype(np.uint8) | |
final_frame_list.append(saveframe) | |
#saveframe = cv2.resize(saveframe, (ow, oh)) | |
sample += 1 | |
print("layer %d cost: %.02f" %(layer, time.time() - t0)) | |
saveframe = (final_result.numpy().squeeze().transpose([1,2,0])[:,:,::-1] * 255).astype(np.uint8) | |
final_frame_list.append(saveframe) | |
return final_frame_list |