|
|
|
|
|
|
|
|
|
|
|
import argparse |
|
import threading |
|
from pathlib import Path |
|
import os |
|
import queue |
|
from time import time |
|
from tqdm import tqdm |
|
|
|
import blocks as B |
|
import cv2 |
|
import numpy as np |
|
import skimage.color as cl |
|
from skimage.transform import resize |
|
from raw_prc_pipeline import (expected_img_ext, expected_landscape_img_height, |
|
expected_landscape_img_width) |
|
from raw_prc_pipeline.pipeline import RawProcessingPipelineDemo |
|
from tqdm import tqdm |
|
from utils import fraction_from_json, json_read |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser( |
|
description='Script for processing PNG images with given metadata files.') |
|
|
|
parser.add_argument('-p', '--png_dir', type=Path, required=True, |
|
help='Path of the directory containing PNG images with metadata files.') |
|
parser.add_argument('-o', '--out_dir', type=Path, default='./results', |
|
help='Path to the directory where processed images will be saved. Images will be saved in JPG format.') |
|
|
|
parser.add_argument('-ie', '--illumination_estimation', type=str, default='', |
|
help='Options for illumination estimation algorithms: "gw", "wp", "sog", "iwp".') |
|
parser.add_argument('-tm', '--tone_mapping', type=str, default='Flash', |
|
help='Options for tone mapping algorithms: "Base", "Flash", "Storm", "Linear", "Drago", "Mantiuk", "Reinhard".') |
|
|
|
parser.add_argument('-gc', '--gamma_correction', type=float, default=1 / 1.4, |
|
help='Global gamma correction.') |
|
parser.add_argument('-dm', '--denoise_mask', type=float, default=0.6, |
|
help='Value to control denoising effect in bright regions. Should be between 0 and 1') |
|
args = parser.parse_args() |
|
|
|
if args.out_dir is None: |
|
args.out_dir = args.png_dir |
|
|
|
return args |
|
|
|
|
|
class PNGProcessing(): |
|
def __init__(self, ie_method, tone_mapping, gamma_correction, denoise_mask): |
|
self.pipeline_params = { |
|
'illumination_estimation': ie_method, |
|
|
|
'out_landscape_width': expected_landscape_img_width, |
|
'out_landscape_height': expected_landscape_img_height |
|
} |
|
|
|
self.pipeline = RawProcessingPipelineDemo(**self.pipeline_params) |
|
self.gamma_correction = gamma_correction |
|
self.denoise_mask = denoise_mask |
|
|
|
def pipeline_exec(self, raw_image, metadata): |
|
|
|
normalized_image = self.pipeline.normalize(raw_image, metadata) |
|
|
|
demosaiced_image = self.pipeline.demosaic(normalized_image, metadata) |
|
|
|
|
|
demosaiced_image = resize(demosaiced_image, (768, 1024), preserve_range=True, anti_aliasing=True) |
|
|
|
wb_image = self.pipeline.white_balance(demosaiced_image, metadata) |
|
|
|
xyz_image = self.pipeline.xyz_transform(wb_image, metadata) |
|
srgb_image = self.pipeline.srgb_transform(xyz_image, metadata) |
|
|
|
denoised_image = B.denoise_raw( |
|
srgb_image, l_w=1, ch_w=7) |
|
|
|
|
|
|
|
light_enhancer = B.LCC(2) |
|
|
|
light_image = light_enhancer(denoised_image).clip(0) |
|
|
|
contrast_image = B.global_mean_contrast(light_image, beta=1.5).clip(0) |
|
|
|
|
|
gamma_image = B.scurve(contrast_image, alpha=0, lmbd=(1 / 1.8)).clip(0) |
|
|
|
|
|
black_adj_image = B.imadjust(gamma_image, 0.99).clip(0) |
|
|
|
|
|
im_h = cl.rgb2hsv(black_adj_image)[:, :, 2] |
|
if im_h.mean() < 0.2: |
|
black_adj_image = B.scurve_central(black_adj_image, lmbd=(1 / 1.8)).clip(0) |
|
|
|
elif im_h.mean() < 0.25: |
|
black_adj_image = B.scurve_central(black_adj_image, lmbd=(1 / 1.4)).clip(0) |
|
|
|
elif im_h.mean() > 0.4: |
|
black_adj_image = B.gamma_correction(black_adj_image, 1.6).clip(0) |
|
|
|
|
|
sharp_image = B.sharpening(black_adj_image, sigma=1).clip(0) |
|
|
|
|
|
wb_image = B.white_balance(sharp_image, denoise_first=True).clip(0) |
|
|
|
|
|
uint8_image = self.pipeline.to_uint8(wb_image, metadata) |
|
|
|
resulted_image = self.pipeline.fix_orientation(uint8_image, metadata) |
|
|
|
|
|
return resulted_image |
|
|
|
def __call__(self, png_path: Path, out_path: Path): |
|
|
|
|
|
raw_image = cv2.imread(str(png_path), cv2.IMREAD_UNCHANGED) |
|
|
|
metadata = json_read(png_path.with_suffix( |
|
'.json'), object_hook=fraction_from_json) |
|
|
|
start = time() |
|
output_image = self.pipeline_exec(raw_image, metadata) |
|
end = time() |
|
|
|
|
|
output_image = cv2.cvtColor(output_image, cv2.COLOR_RGB2BGR) |
|
cv2.imwrite(str(out_path), output_image, [ |
|
cv2.IMWRITE_JPEG_QUALITY, 100]) |
|
return end - start |
|
|
|
|
|
def process(png_processor, out_dir, png_paths): |
|
out_paths = [ |
|
out_dir / png_path.with_suffix(expected_img_ext).name for png_path in png_paths] |
|
times = [] |
|
pbar = tqdm(total=len(png_paths), ncols=100) |
|
for png_path, out_path in zip(png_paths, out_paths): |
|
runtime = png_processor(png_path, out_path) |
|
times.append(runtime) |
|
pbar.update() |
|
return times |
|
|
|
|
|
def main(png_dir, out_dir, illumination_estimation, tone_mapping, gamma_correction, denoise_mask): |
|
|
|
os.makedirs(out_dir, exist_ok=True) |
|
|
|
png_paths = list(png_dir.glob('*.png')) |
|
|
|
png_processor = PNGProcessing( |
|
illumination_estimation, tone_mapping, gamma_correction, denoise_mask) |
|
|
|
times = process(png_processor, out_dir, png_paths) |
|
print(f'Average time: {np.mean(times)} seconds.') |
|
|
|
|
|
if __name__ == '__main__': |
|
args = parse_args() |
|
main(**vars((args))) |
|
|