Spaces:
Running
Running
File size: 8,347 Bytes
8aa4f1e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
from mmengine.fileio import FileClient
from mmengine.dist import master_only
from einops import rearrange
import torch
import mmcv
import numpy as np
import os.path as osp
import cv2
from typing import Optional, Sequence
import torch.nn as nn
from mmdet.apis import inference_detector
from mmcv.transforms import Compose
from mmdet.engine import DetVisualizationHook
from mmdet.registry import HOOKS
from mmdet.structures import DetDataSample
from utils.io_utils import find_all_imgs, square_pad_resize, imglist2grid
def inference_detector(
model: nn.Module,
imgs,
test_pipeline
):
if isinstance(imgs, (list, tuple)):
is_batch = True
else:
imgs = [imgs]
is_batch = False
if len(imgs) == 0:
return []
test_pipeline = test_pipeline.copy()
if isinstance(imgs[0], np.ndarray):
# Calling this method across libraries will result
# in module unregistered error if not prefixed with mmdet.
test_pipeline[0].type = 'mmdet.LoadImageFromNDArray'
test_pipeline = Compose(test_pipeline)
result_list = []
for img in imgs:
# prepare data
if isinstance(img, np.ndarray):
# TODO: remove img_id.
data_ = dict(img=img, img_id=0)
else:
# TODO: remove img_id.
data_ = dict(img_path=img, img_id=0)
# build the data pipeline
data_ = test_pipeline(data_)
data_['inputs'] = [data_['inputs']]
data_['data_samples'] = [data_['data_samples']]
# forward the model
with torch.no_grad():
results = model.test_step(data_)[0]
result_list.append(results)
if not is_batch:
return result_list[0]
else:
return result_list
@HOOKS.register_module()
class InstanceSegVisualizationHook(DetVisualizationHook):
def __init__(self, visualize_samples: str = '',
read_rgb: bool = False,
draw: bool = False,
interval: int = 50,
score_thr: float = 0.3,
show: bool = False,
wait_time: float = 0.,
test_out_dir: Optional[str] = None,
file_client_args: dict = dict(backend='disk')):
super().__init__(draw, interval, score_thr, show, wait_time, test_out_dir, file_client_args)
self.vis_samples = []
if osp.exists(visualize_samples):
self.channel_order = channel_order = 'rgb' if read_rgb else 'bgr'
samples = find_all_imgs(visualize_samples, abs_path=True)
for imgp in samples:
img = mmcv.imread(imgp, channel_order=channel_order)
img, _, _, _ = square_pad_resize(img, 640)
self.vis_samples.append(img)
def before_val(self, runner) -> None:
total_curr_iter = runner.iter
self._visualize_data(total_curr_iter, runner)
return super().before_val(runner)
# def after_val_iter(self, runner: Runner, batch_idx: int, data_batch: dict,
# outputs: Sequence[DetDataSample]) -> None:
# """Run after every ``self.interval`` validation iterations.
# Args:
# runner (:obj:`Runner`): The runner of the validation process.
# batch_idx (int): The index of the current batch in the val loop.
# data_batch (dict): Data from dataloader.
# outputs (Sequence[:obj:`DetDataSample`]]): A batch of data samples
# that contain annotations and predictions.
# """
# # if self.draw is False:
# # return
# if self.file_client is None:
# self.file_client = FileClient(**self.file_client_args)
# # There is no guarantee that the same batch of images
# # is visualized for each evaluation.
# total_curr_iter = runner.iter + batch_idx
# # # Visualize only the first data
# # img_path = outputs[0].img_path
# # img_bytes = self.file_client.get(img_path)
# # img = mmcv.imfrombytes(img_bytes, channel_order='rgb')
# if total_curr_iter % self.interval == 0 and self.vis_samples:
# self._visualize_data(total_curr_iter, runner)
@master_only
def _visualize_data(self, total_curr_iter, runner):
tgt_size = 384
runner.model.eval()
outputs = inference_detector(runner.model, self.vis_samples, test_pipeline=runner.cfg.test_pipeline)
vis_results = []
for img, output in zip(self.vis_samples, outputs):
vis_img = self.add_datasample(
'val_img',
img,
data_sample=output,
show=self.show,
wait_time=self.wait_time,
pred_score_thr=self.score_thr,
draw_gt=False,
step=total_curr_iter)
vis_results.append(cv2.resize(vis_img, (tgt_size, tgt_size), interpolation=cv2.INTER_AREA))
drawn_img = imglist2grid(vis_results, tgt_size)
if drawn_img is None:
return
drawn_img = cv2.cvtColor(drawn_img, cv2.COLOR_BGR2RGB)
visualizer = self._visualizer
visualizer.set_image(drawn_img)
visualizer.add_image('val_img', drawn_img, total_curr_iter)
@master_only
def add_datasample(
self,
name: str,
image: np.ndarray,
data_sample: Optional['DetDataSample'] = None,
draw_gt: bool = True,
draw_pred: bool = True,
show: bool = False,
wait_time: float = 0,
# TODO: Supported in mmengine's Viusalizer.
out_file: Optional[str] = None,
pred_score_thr: float = 0.3,
step: int = 0) -> np.ndarray:
image = image.clip(0, 255).astype(np.uint8)
visualizer = self._visualizer
classes = visualizer.dataset_meta.get('classes', None)
palette = visualizer.dataset_meta.get('palette', None)
gt_img_data = None
pred_img_data = None
if data_sample is not None:
data_sample = data_sample.cpu()
if draw_gt and data_sample is not None:
gt_img_data = image
if 'gt_instances' in data_sample:
gt_img_data = visualizer._draw_instances(image,
data_sample.gt_instances,
classes, palette)
if 'gt_panoptic_seg' in data_sample:
assert classes is not None, 'class information is ' \
'not provided when ' \
'visualizing panoptic ' \
'segmentation results.'
gt_img_data = visualizer._draw_panoptic_seg(
gt_img_data, data_sample.gt_panoptic_seg, classes)
if draw_pred and data_sample is not None:
pred_img_data = image
if 'pred_instances' in data_sample:
pred_instances = data_sample.pred_instances
pred_instances = pred_instances[
pred_instances.scores > pred_score_thr]
pred_img_data = visualizer._draw_instances(image, pred_instances,
classes, palette)
if 'pred_panoptic_seg' in data_sample:
assert classes is not None, 'class information is ' \
'not provided when ' \
'visualizing panoptic ' \
'segmentation results.'
pred_img_data = visualizer._draw_panoptic_seg(
pred_img_data, data_sample.pred_panoptic_seg.numpy(),
classes)
if gt_img_data is not None and pred_img_data is not None:
drawn_img = np.concatenate((gt_img_data, pred_img_data), axis=1)
elif gt_img_data is not None:
drawn_img = gt_img_data
elif pred_img_data is not None:
drawn_img = pred_img_data
else:
# Display the original image directly if nothing is drawn.
drawn_img = image
return drawn_img
|