|
|
|
|
|
|
|
import os |
|
import cv2 |
|
import random |
|
import numpy as np |
|
import tensorflow as tf |
|
import core.utils as utils |
|
from core.config import cfg |
|
|
|
|
|
class Dataset(object): |
|
"""implement Dataset here""" |
|
|
|
def __init__(self, FLAGS, is_training: bool, dataset_type: str = "converted_coco"): |
|
self.tiny = FLAGS.tiny |
|
self.strides, self.anchors, NUM_CLASS, XYSCALE = utils.load_config(FLAGS) |
|
self.dataset_type = dataset_type |
|
|
|
self.annot_path = ( |
|
cfg.TRAIN.ANNOT_PATH if is_training else cfg.TEST.ANNOT_PATH |
|
) |
|
self.input_sizes = ( |
|
cfg.TRAIN.INPUT_SIZE if is_training else cfg.TEST.INPUT_SIZE |
|
) |
|
self.batch_size = ( |
|
cfg.TRAIN.BATCH_SIZE if is_training else cfg.TEST.BATCH_SIZE |
|
) |
|
self.data_aug = cfg.TRAIN.DATA_AUG if is_training else cfg.TEST.DATA_AUG |
|
|
|
self.train_input_sizes = cfg.TRAIN.INPUT_SIZE |
|
self.classes = utils.read_class_names(cfg.YOLO.CLASSES) |
|
self.num_classes = len(self.classes) |
|
self.anchor_per_scale = cfg.YOLO.ANCHOR_PER_SCALE |
|
self.max_bbox_per_scale = 150 |
|
|
|
self.annotations = self.load_annotations() |
|
self.num_samples = len(self.annotations) |
|
self.num_batchs = int(np.ceil(self.num_samples / self.batch_size)) |
|
self.batch_count = 0 |
|
|
|
def load_annotations(self): |
|
with open(self.annot_path, "r") as f: |
|
txt = f.readlines() |
|
if self.dataset_type == "converted_coco": |
|
annotations = [ |
|
line.strip() |
|
for line in txt |
|
if len(line.strip().split()[1:]) != 0 |
|
] |
|
elif self.dataset_type == "yolo": |
|
annotations = [] |
|
for line in txt: |
|
image_path = line.strip() |
|
root, _ = os.path.splitext(image_path) |
|
with open(root + ".txt") as fd: |
|
boxes = fd.readlines() |
|
string = "" |
|
for box in boxes: |
|
box = box.strip() |
|
box = box.split() |
|
class_num = int(box[0]) |
|
center_x = float(box[1]) |
|
center_y = float(box[2]) |
|
half_width = float(box[3]) / 2 |
|
half_height = float(box[4]) / 2 |
|
string += " {},{},{},{},{}".format( |
|
center_x - half_width, |
|
center_y - half_height, |
|
center_x + half_width, |
|
center_y + half_height, |
|
class_num, |
|
) |
|
annotations.append(image_path + string) |
|
|
|
np.random.shuffle(annotations) |
|
return annotations |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
with tf.device("/cpu:0"): |
|
|
|
self.train_input_size = cfg.TRAIN.INPUT_SIZE |
|
self.train_output_sizes = self.train_input_size // self.strides |
|
|
|
batch_image = np.zeros( |
|
( |
|
self.batch_size, |
|
self.train_input_size, |
|
self.train_input_size, |
|
3, |
|
), |
|
dtype=np.float32, |
|
) |
|
|
|
batch_label_sbbox = np.zeros( |
|
( |
|
self.batch_size, |
|
self.train_output_sizes[0], |
|
self.train_output_sizes[0], |
|
self.anchor_per_scale, |
|
5 + self.num_classes, |
|
), |
|
dtype=np.float32, |
|
) |
|
batch_label_mbbox = np.zeros( |
|
( |
|
self.batch_size, |
|
self.train_output_sizes[1], |
|
self.train_output_sizes[1], |
|
self.anchor_per_scale, |
|
5 + self.num_classes, |
|
), |
|
dtype=np.float32, |
|
) |
|
batch_label_lbbox = np.zeros( |
|
( |
|
self.batch_size, |
|
self.train_output_sizes[2], |
|
self.train_output_sizes[2], |
|
self.anchor_per_scale, |
|
5 + self.num_classes, |
|
), |
|
dtype=np.float32, |
|
) |
|
|
|
batch_sbboxes = np.zeros( |
|
(self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32 |
|
) |
|
batch_mbboxes = np.zeros( |
|
(self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32 |
|
) |
|
batch_lbboxes = np.zeros( |
|
(self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32 |
|
) |
|
|
|
num = 0 |
|
if self.batch_count < self.num_batchs: |
|
while num < self.batch_size: |
|
index = self.batch_count * self.batch_size + num |
|
if index >= self.num_samples: |
|
index -= self.num_samples |
|
annotation = self.annotations[index] |
|
image, bboxes = self.parse_annotation(annotation) |
|
( |
|
label_sbbox, |
|
label_mbbox, |
|
label_lbbox, |
|
sbboxes, |
|
mbboxes, |
|
lbboxes, |
|
) = self.preprocess_true_boxes(bboxes) |
|
|
|
batch_image[num, :, :, :] = image |
|
batch_label_sbbox[num, :, :, :, :] = label_sbbox |
|
batch_label_mbbox[num, :, :, :, :] = label_mbbox |
|
batch_label_lbbox[num, :, :, :, :] = label_lbbox |
|
batch_sbboxes[num, :, :] = sbboxes |
|
batch_mbboxes[num, :, :] = mbboxes |
|
batch_lbboxes[num, :, :] = lbboxes |
|
num += 1 |
|
self.batch_count += 1 |
|
batch_smaller_target = batch_label_sbbox, batch_sbboxes |
|
batch_medium_target = batch_label_mbbox, batch_mbboxes |
|
batch_larger_target = batch_label_lbbox, batch_lbboxes |
|
|
|
return ( |
|
batch_image, |
|
( |
|
batch_smaller_target, |
|
batch_medium_target, |
|
batch_larger_target, |
|
), |
|
) |
|
else: |
|
self.batch_count = 0 |
|
np.random.shuffle(self.annotations) |
|
raise StopIteration |
|
|
|
def random_horizontal_flip(self, image, bboxes): |
|
if random.random() < 0.5: |
|
_, w, _ = image.shape |
|
image = image[:, ::-1, :] |
|
bboxes[:, [0, 2]] = w - bboxes[:, [2, 0]] |
|
|
|
return image, bboxes |
|
|
|
def random_crop(self, image, bboxes): |
|
if random.random() < 0.5: |
|
h, w, _ = image.shape |
|
max_bbox = np.concatenate( |
|
[ |
|
np.min(bboxes[:, 0:2], axis=0), |
|
np.max(bboxes[:, 2:4], axis=0), |
|
], |
|
axis=-1, |
|
) |
|
|
|
max_l_trans = max_bbox[0] |
|
max_u_trans = max_bbox[1] |
|
max_r_trans = w - max_bbox[2] |
|
max_d_trans = h - max_bbox[3] |
|
|
|
crop_xmin = max( |
|
0, int(max_bbox[0] - random.uniform(0, max_l_trans)) |
|
) |
|
crop_ymin = max( |
|
0, int(max_bbox[1] - random.uniform(0, max_u_trans)) |
|
) |
|
crop_xmax = max( |
|
w, int(max_bbox[2] + random.uniform(0, max_r_trans)) |
|
) |
|
crop_ymax = max( |
|
h, int(max_bbox[3] + random.uniform(0, max_d_trans)) |
|
) |
|
|
|
image = image[crop_ymin:crop_ymax, crop_xmin:crop_xmax] |
|
|
|
bboxes[:, [0, 2]] = bboxes[:, [0, 2]] - crop_xmin |
|
bboxes[:, [1, 3]] = bboxes[:, [1, 3]] - crop_ymin |
|
|
|
return image, bboxes |
|
|
|
def random_translate(self, image, bboxes): |
|
if random.random() < 0.5: |
|
h, w, _ = image.shape |
|
max_bbox = np.concatenate( |
|
[ |
|
np.min(bboxes[:, 0:2], axis=0), |
|
np.max(bboxes[:, 2:4], axis=0), |
|
], |
|
axis=-1, |
|
) |
|
|
|
max_l_trans = max_bbox[0] |
|
max_u_trans = max_bbox[1] |
|
max_r_trans = w - max_bbox[2] |
|
max_d_trans = h - max_bbox[3] |
|
|
|
tx = random.uniform(-(max_l_trans - 1), (max_r_trans - 1)) |
|
ty = random.uniform(-(max_u_trans - 1), (max_d_trans - 1)) |
|
|
|
M = np.array([[1, 0, tx], [0, 1, ty]]) |
|
image = cv2.warpAffine(image, M, (w, h)) |
|
|
|
bboxes[:, [0, 2]] = bboxes[:, [0, 2]] + tx |
|
bboxes[:, [1, 3]] = bboxes[:, [1, 3]] + ty |
|
|
|
return image, bboxes |
|
|
|
def parse_annotation(self, annotation): |
|
line = annotation.split() |
|
image_path = line[0] |
|
if not os.path.exists(image_path): |
|
raise KeyError("%s does not exist ... " % image_path) |
|
image = cv2.imread(image_path) |
|
if self.dataset_type == "converted_coco": |
|
bboxes = np.array( |
|
[list(map(int, box.split(","))) for box in line[1:]] |
|
) |
|
elif self.dataset_type == "yolo": |
|
height, width, _ = image.shape |
|
bboxes = np.array( |
|
[list(map(float, box.split(","))) for box in line[1:]] |
|
) |
|
bboxes = bboxes * np.array([width, height, width, height, 1]) |
|
bboxes = bboxes.astype(np.int64) |
|
|
|
if self.data_aug: |
|
image, bboxes = self.random_horizontal_flip( |
|
np.copy(image), np.copy(bboxes) |
|
) |
|
image, bboxes = self.random_crop(np.copy(image), np.copy(bboxes)) |
|
image, bboxes = self.random_translate( |
|
np.copy(image), np.copy(bboxes) |
|
) |
|
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
image, bboxes = utils.image_preprocess( |
|
np.copy(image), |
|
[self.train_input_size, self.train_input_size], |
|
np.copy(bboxes), |
|
) |
|
return image, bboxes |
|
|
|
|
|
def preprocess_true_boxes(self, bboxes): |
|
label = [ |
|
np.zeros( |
|
( |
|
self.train_output_sizes[i], |
|
self.train_output_sizes[i], |
|
self.anchor_per_scale, |
|
5 + self.num_classes, |
|
) |
|
) |
|
for i in range(3) |
|
] |
|
bboxes_xywh = [np.zeros((self.max_bbox_per_scale, 4)) for _ in range(3)] |
|
bbox_count = np.zeros((3,)) |
|
|
|
for bbox in bboxes: |
|
bbox_coor = bbox[:4] |
|
bbox_class_ind = bbox[4] |
|
|
|
onehot = np.zeros(self.num_classes, dtype=np.float) |
|
onehot[bbox_class_ind] = 1.0 |
|
uniform_distribution = np.full( |
|
self.num_classes, 1.0 / self.num_classes |
|
) |
|
deta = 0.01 |
|
smooth_onehot = onehot * (1 - deta) + deta * uniform_distribution |
|
|
|
bbox_xywh = np.concatenate( |
|
[ |
|
(bbox_coor[2:] + bbox_coor[:2]) * 0.5, |
|
bbox_coor[2:] - bbox_coor[:2], |
|
], |
|
axis=-1, |
|
) |
|
bbox_xywh_scaled = ( |
|
1.0 * bbox_xywh[np.newaxis, :] / self.strides[:, np.newaxis] |
|
) |
|
|
|
iou = [] |
|
exist_positive = False |
|
for i in range(3): |
|
anchors_xywh = np.zeros((self.anchor_per_scale, 4)) |
|
anchors_xywh[:, 0:2] = ( |
|
np.floor(bbox_xywh_scaled[i, 0:2]).astype(np.int32) + 0.5 |
|
) |
|
anchors_xywh[:, 2:4] = self.anchors[i] |
|
|
|
iou_scale = utils.bbox_iou( |
|
bbox_xywh_scaled[i][np.newaxis, :], anchors_xywh |
|
) |
|
iou.append(iou_scale) |
|
iou_mask = iou_scale > 0.3 |
|
|
|
if np.any(iou_mask): |
|
xind, yind = np.floor(bbox_xywh_scaled[i, 0:2]).astype( |
|
np.int32 |
|
) |
|
|
|
label[i][yind, xind, iou_mask, :] = 0 |
|
label[i][yind, xind, iou_mask, 0:4] = bbox_xywh |
|
label[i][yind, xind, iou_mask, 4:5] = 1.0 |
|
label[i][yind, xind, iou_mask, 5:] = smooth_onehot |
|
|
|
bbox_ind = int(bbox_count[i] % self.max_bbox_per_scale) |
|
bboxes_xywh[i][bbox_ind, :4] = bbox_xywh |
|
bbox_count[i] += 1 |
|
|
|
exist_positive = True |
|
|
|
if not exist_positive: |
|
best_anchor_ind = np.argmax(np.array(iou).reshape(-1), axis=-1) |
|
best_detect = int(best_anchor_ind / self.anchor_per_scale) |
|
best_anchor = int(best_anchor_ind % self.anchor_per_scale) |
|
xind, yind = np.floor( |
|
bbox_xywh_scaled[best_detect, 0:2] |
|
).astype(np.int32) |
|
|
|
label[best_detect][yind, xind, best_anchor, :] = 0 |
|
label[best_detect][yind, xind, best_anchor, 0:4] = bbox_xywh |
|
label[best_detect][yind, xind, best_anchor, 4:5] = 1.0 |
|
label[best_detect][yind, xind, best_anchor, 5:] = smooth_onehot |
|
|
|
bbox_ind = int( |
|
bbox_count[best_detect] % self.max_bbox_per_scale |
|
) |
|
bboxes_xywh[best_detect][bbox_ind, :4] = bbox_xywh |
|
bbox_count[best_detect] += 1 |
|
label_sbbox, label_mbbox, label_lbbox = label |
|
sbboxes, mbboxes, lbboxes = bboxes_xywh |
|
return label_sbbox, label_mbbox, label_lbbox, sbboxes, mbboxes, lbboxes |
|
|
|
def __len__(self): |
|
return self.num_batchs |
|
|