AnimeIns_CPU / utils /mmdet_custom_hooks.py
ljsabc's picture
add utils
8aa4f1e
raw
history blame contribute delete
No virus
8.35 kB
from mmengine.fileio import FileClient
from mmengine.dist import master_only
from einops import rearrange
import torch
import mmcv
import numpy as np
import os.path as osp
import cv2
from typing import Optional, Sequence
import torch.nn as nn
from mmdet.apis import inference_detector
from mmcv.transforms import Compose
from mmdet.engine import DetVisualizationHook
from mmdet.registry import HOOKS
from mmdet.structures import DetDataSample
from utils.io_utils import find_all_imgs, square_pad_resize, imglist2grid
def inference_detector(
model: nn.Module,
imgs,
test_pipeline
):
if isinstance(imgs, (list, tuple)):
is_batch = True
else:
imgs = [imgs]
is_batch = False
if len(imgs) == 0:
return []
test_pipeline = test_pipeline.copy()
if isinstance(imgs[0], np.ndarray):
# Calling this method across libraries will result
# in module unregistered error if not prefixed with mmdet.
test_pipeline[0].type = 'mmdet.LoadImageFromNDArray'
test_pipeline = Compose(test_pipeline)
result_list = []
for img in imgs:
# prepare data
if isinstance(img, np.ndarray):
# TODO: remove img_id.
data_ = dict(img=img, img_id=0)
else:
# TODO: remove img_id.
data_ = dict(img_path=img, img_id=0)
# build the data pipeline
data_ = test_pipeline(data_)
data_['inputs'] = [data_['inputs']]
data_['data_samples'] = [data_['data_samples']]
# forward the model
with torch.no_grad():
results = model.test_step(data_)[0]
result_list.append(results)
if not is_batch:
return result_list[0]
else:
return result_list
@HOOKS.register_module()
class InstanceSegVisualizationHook(DetVisualizationHook):
def __init__(self, visualize_samples: str = '',
read_rgb: bool = False,
draw: bool = False,
interval: int = 50,
score_thr: float = 0.3,
show: bool = False,
wait_time: float = 0.,
test_out_dir: Optional[str] = None,
file_client_args: dict = dict(backend='disk')):
super().__init__(draw, interval, score_thr, show, wait_time, test_out_dir, file_client_args)
self.vis_samples = []
if osp.exists(visualize_samples):
self.channel_order = channel_order = 'rgb' if read_rgb else 'bgr'
samples = find_all_imgs(visualize_samples, abs_path=True)
for imgp in samples:
img = mmcv.imread(imgp, channel_order=channel_order)
img, _, _, _ = square_pad_resize(img, 640)
self.vis_samples.append(img)
def before_val(self, runner) -> None:
total_curr_iter = runner.iter
self._visualize_data(total_curr_iter, runner)
return super().before_val(runner)
# def after_val_iter(self, runner: Runner, batch_idx: int, data_batch: dict,
# outputs: Sequence[DetDataSample]) -> None:
# """Run after every ``self.interval`` validation iterations.
# Args:
# runner (:obj:`Runner`): The runner of the validation process.
# batch_idx (int): The index of the current batch in the val loop.
# data_batch (dict): Data from dataloader.
# outputs (Sequence[:obj:`DetDataSample`]]): A batch of data samples
# that contain annotations and predictions.
# """
# # if self.draw is False:
# # return
# if self.file_client is None:
# self.file_client = FileClient(**self.file_client_args)
# # There is no guarantee that the same batch of images
# # is visualized for each evaluation.
# total_curr_iter = runner.iter + batch_idx
# # # Visualize only the first data
# # img_path = outputs[0].img_path
# # img_bytes = self.file_client.get(img_path)
# # img = mmcv.imfrombytes(img_bytes, channel_order='rgb')
# if total_curr_iter % self.interval == 0 and self.vis_samples:
# self._visualize_data(total_curr_iter, runner)
@master_only
def _visualize_data(self, total_curr_iter, runner):
tgt_size = 384
runner.model.eval()
outputs = inference_detector(runner.model, self.vis_samples, test_pipeline=runner.cfg.test_pipeline)
vis_results = []
for img, output in zip(self.vis_samples, outputs):
vis_img = self.add_datasample(
'val_img',
img,
data_sample=output,
show=self.show,
wait_time=self.wait_time,
pred_score_thr=self.score_thr,
draw_gt=False,
step=total_curr_iter)
vis_results.append(cv2.resize(vis_img, (tgt_size, tgt_size), interpolation=cv2.INTER_AREA))
drawn_img = imglist2grid(vis_results, tgt_size)
if drawn_img is None:
return
drawn_img = cv2.cvtColor(drawn_img, cv2.COLOR_BGR2RGB)
visualizer = self._visualizer
visualizer.set_image(drawn_img)
visualizer.add_image('val_img', drawn_img, total_curr_iter)
@master_only
def add_datasample(
self,
name: str,
image: np.ndarray,
data_sample: Optional['DetDataSample'] = None,
draw_gt: bool = True,
draw_pred: bool = True,
show: bool = False,
wait_time: float = 0,
# TODO: Supported in mmengine's Viusalizer.
out_file: Optional[str] = None,
pred_score_thr: float = 0.3,
step: int = 0) -> np.ndarray:
image = image.clip(0, 255).astype(np.uint8)
visualizer = self._visualizer
classes = visualizer.dataset_meta.get('classes', None)
palette = visualizer.dataset_meta.get('palette', None)
gt_img_data = None
pred_img_data = None
if data_sample is not None:
data_sample = data_sample.cpu()
if draw_gt and data_sample is not None:
gt_img_data = image
if 'gt_instances' in data_sample:
gt_img_data = visualizer._draw_instances(image,
data_sample.gt_instances,
classes, palette)
if 'gt_panoptic_seg' in data_sample:
assert classes is not None, 'class information is ' \
'not provided when ' \
'visualizing panoptic ' \
'segmentation results.'
gt_img_data = visualizer._draw_panoptic_seg(
gt_img_data, data_sample.gt_panoptic_seg, classes)
if draw_pred and data_sample is not None:
pred_img_data = image
if 'pred_instances' in data_sample:
pred_instances = data_sample.pred_instances
pred_instances = pred_instances[
pred_instances.scores > pred_score_thr]
pred_img_data = visualizer._draw_instances(image, pred_instances,
classes, palette)
if 'pred_panoptic_seg' in data_sample:
assert classes is not None, 'class information is ' \
'not provided when ' \
'visualizing panoptic ' \
'segmentation results.'
pred_img_data = visualizer._draw_panoptic_seg(
pred_img_data, data_sample.pred_panoptic_seg.numpy(),
classes)
if gt_img_data is not None and pred_img_data is not None:
drawn_img = np.concatenate((gt_img_data, pred_img_data), axis=1)
elif gt_img_data is not None:
drawn_img = gt_img_data
elif pred_img_data is not None:
drawn_img = pred_img_data
else:
# Display the original image directly if nothing is drawn.
drawn_img = image
return drawn_img