################################################### # Night challenge 2024 ################################################### import argparse import threading from pathlib import Path import os import queue from time import time from tqdm import tqdm import blocks as B import cv2 import numpy as np import skimage.color as cl from skimage.transform import resize from raw_prc_pipeline import (expected_img_ext, expected_landscape_img_height, expected_landscape_img_width) from raw_prc_pipeline.pipeline import RawProcessingPipelineDemo from tqdm import tqdm from utils import fraction_from_json, json_read def parse_args(): parser = argparse.ArgumentParser( description='Script for processing PNG images with given metadata files.') # folder params parser.add_argument('-p', '--png_dir', type=Path, required=True, help='Path of the directory containing PNG images with metadata files.') parser.add_argument('-o', '--out_dir', type=Path, default='./results', help='Path to the directory where processed images will be saved. Images will be saved in JPG format.') # raw processing params parser.add_argument('-ie', '--illumination_estimation', type=str, default='', help='Options for illumination estimation algorithms: "gw", "wp", "sog", "iwp".') parser.add_argument('-tm', '--tone_mapping', type=str, default='Flash', help='Options for tone mapping algorithms: "Base", "Flash", "Storm", "Linear", "Drago", "Mantiuk", "Reinhard".') # srgb processing params parser.add_argument('-gc', '--gamma_correction', type=float, default=1 / 1.4, help='Global gamma correction.') parser.add_argument('-dm', '--denoise_mask', type=float, default=0.6, help='Value to control denoising effect in bright regions. Should be between 0 and 1') args = parser.parse_args() if args.out_dir is None: args.out_dir = args.png_dir return args class PNGProcessing(): def __init__(self, ie_method, tone_mapping, gamma_correction, denoise_mask): self.pipeline_params = { 'illumination_estimation': ie_method, # 'tone_mapping': tone_mapping, 'out_landscape_width': expected_landscape_img_width, 'out_landscape_height': expected_landscape_img_height } self.pipeline = RawProcessingPipelineDemo(**self.pipeline_params) self.gamma_correction = gamma_correction self.denoise_mask = denoise_mask def pipeline_exec(self, raw_image, metadata): normalized_image = self.pipeline.normalize(raw_image, metadata) demosaiced_image = self.pipeline.demosaic(normalized_image, metadata) # check the original demosaicing to see if results are the same demosaiced_image = resize(demosaiced_image, (768, 1024), preserve_range=True, anti_aliasing=True) wb_image = self.pipeline.white_balance(demosaiced_image, metadata) xyz_image = self.pipeline.xyz_transform(wb_image, metadata) srgb_image = self.pipeline.srgb_transform(xyz_image, metadata) denoised_image = B.denoise_raw( srgb_image, l_w=1, ch_w=7) # srgb_image, l_w=4.5, ch_w=20) # srgb_image, l_w=1.659923974475318, ch_w=5.459274910995606) light_enhancer = B.LCC(2) # light_enhancer = B.LCC(sigma=6.463076463115174) light_image = light_enhancer(denoised_image).clip(0) contrast_image = B.global_mean_contrast(light_image, beta=1.5).clip(0) # contrast_image = B.global_mean_contrast(light_image, beta=0.8653634653721171).clip(0) gamma_image = B.scurve(contrast_image, alpha=0, lmbd=(1 / 1.8)).clip(0) # gamma_image = B.scurve(contrast_image, alpha=0.7050463096367395, lmbd=0.9740931227248038).clip(0) black_adj_image = B.imadjust(gamma_image, 0.99).clip(0) # black_adj_image = B.imadjust(gamma_image, 0.9957007298972433, 0.01697803128505186).clip(0) im_h = cl.rgb2hsv(black_adj_image)[:, :, 2] if im_h.mean() < 0.2: black_adj_image = B.scurve_central(black_adj_image, lmbd=(1 / 1.8)).clip(0) # black_adj_image = B.scurve_central(black_adj_image, lmbd=0.6913136563678325).clip(0) elif im_h.mean() < 0.25: black_adj_image = B.scurve_central(black_adj_image, lmbd=(1 / 1.4)).clip(0) # black_adj_image = B.scurve_central(black_adj_image, lmbd=0.3612134419536918).clip(0) elif im_h.mean() > 0.4: black_adj_image = B.gamma_correction(black_adj_image, 1.6).clip(0) # black_adj_image = B.gamma_correction(black_adj_image, 3.5208650132731356).clip(0) sharp_image = B.sharpening(black_adj_image, sigma=1).clip(0) # sharp_image = B.sharpening(black_adj_image, 1.5389081796026578, 0.05456721376794549).clip(0) wb_image = B.white_balance(sharp_image, denoise_first=True).clip(0) # wb_image = B.white_balance(sharp_image, 0.740831363817609, 0.004044358054560114).clip(0) uint8_image = self.pipeline.to_uint8(wb_image, metadata) # resized_image = self.pipeline.resize(uint8_image, metadata) resulted_image = self.pipeline.fix_orientation(uint8_image, metadata) return resulted_image def __call__(self, png_path: Path, out_path: Path): # parse raw img raw_image = cv2.imread(str(png_path), cv2.IMREAD_UNCHANGED) # parse metadata metadata = json_read(png_path.with_suffix( '.json'), object_hook=fraction_from_json) start = time() output_image = self.pipeline_exec(raw_image, metadata) end = time() # save results output_image = cv2.cvtColor(output_image, cv2.COLOR_RGB2BGR) cv2.imwrite(str(out_path), output_image, [ cv2.IMWRITE_JPEG_QUALITY, 100]) return end - start def process(png_processor, out_dir, png_paths): out_paths = [ out_dir / png_path.with_suffix(expected_img_ext).name for png_path in png_paths] times = [] pbar = tqdm(total=len(png_paths), ncols=100) for png_path, out_path in zip(png_paths, out_paths): runtime = png_processor(png_path, out_path) times.append(runtime) pbar.update() return times def main(png_dir, out_dir, illumination_estimation, tone_mapping, gamma_correction, denoise_mask): # out_dir.mkdir(exist_ok=True) os.makedirs(out_dir, exist_ok=True) png_paths = list(png_dir.glob('*.png')) png_processor = PNGProcessing( illumination_estimation, tone_mapping, gamma_correction, denoise_mask) times = process(png_processor, out_dir, png_paths) print(f'Average time: {np.mean(times)} seconds.') if __name__ == '__main__': args = parse_args() main(**vars((args)))