yolo_v4_tflite / core /dataset.py
SamMorgan
Adding more yolov4-tflite files
20e841b
#! /usr/bin/env python
# coding=utf-8
import os
import cv2
import random
import numpy as np
import tensorflow as tf
import core.utils as utils
from core.config import cfg
class Dataset(object):
"""implement Dataset here"""
def __init__(self, FLAGS, is_training: bool, dataset_type: str = "converted_coco"):
self.tiny = FLAGS.tiny
self.strides, self.anchors, NUM_CLASS, XYSCALE = utils.load_config(FLAGS)
self.dataset_type = dataset_type
self.annot_path = (
cfg.TRAIN.ANNOT_PATH if is_training else cfg.TEST.ANNOT_PATH
)
self.input_sizes = (
cfg.TRAIN.INPUT_SIZE if is_training else cfg.TEST.INPUT_SIZE
)
self.batch_size = (
cfg.TRAIN.BATCH_SIZE if is_training else cfg.TEST.BATCH_SIZE
)
self.data_aug = cfg.TRAIN.DATA_AUG if is_training else cfg.TEST.DATA_AUG
self.train_input_sizes = cfg.TRAIN.INPUT_SIZE
self.classes = utils.read_class_names(cfg.YOLO.CLASSES)
self.num_classes = len(self.classes)
self.anchor_per_scale = cfg.YOLO.ANCHOR_PER_SCALE
self.max_bbox_per_scale = 150
self.annotations = self.load_annotations()
self.num_samples = len(self.annotations)
self.num_batchs = int(np.ceil(self.num_samples / self.batch_size))
self.batch_count = 0
def load_annotations(self):
with open(self.annot_path, "r") as f:
txt = f.readlines()
if self.dataset_type == "converted_coco":
annotations = [
line.strip()
for line in txt
if len(line.strip().split()[1:]) != 0
]
elif self.dataset_type == "yolo":
annotations = []
for line in txt:
image_path = line.strip()
root, _ = os.path.splitext(image_path)
with open(root + ".txt") as fd:
boxes = fd.readlines()
string = ""
for box in boxes:
box = box.strip()
box = box.split()
class_num = int(box[0])
center_x = float(box[1])
center_y = float(box[2])
half_width = float(box[3]) / 2
half_height = float(box[4]) / 2
string += " {},{},{},{},{}".format(
center_x - half_width,
center_y - half_height,
center_x + half_width,
center_y + half_height,
class_num,
)
annotations.append(image_path + string)
np.random.shuffle(annotations)
return annotations
def __iter__(self):
return self
def __next__(self):
with tf.device("/cpu:0"):
# self.train_input_size = random.choice(self.train_input_sizes)
self.train_input_size = cfg.TRAIN.INPUT_SIZE
self.train_output_sizes = self.train_input_size // self.strides
batch_image = np.zeros(
(
self.batch_size,
self.train_input_size,
self.train_input_size,
3,
),
dtype=np.float32,
)
batch_label_sbbox = np.zeros(
(
self.batch_size,
self.train_output_sizes[0],
self.train_output_sizes[0],
self.anchor_per_scale,
5 + self.num_classes,
),
dtype=np.float32,
)
batch_label_mbbox = np.zeros(
(
self.batch_size,
self.train_output_sizes[1],
self.train_output_sizes[1],
self.anchor_per_scale,
5 + self.num_classes,
),
dtype=np.float32,
)
batch_label_lbbox = np.zeros(
(
self.batch_size,
self.train_output_sizes[2],
self.train_output_sizes[2],
self.anchor_per_scale,
5 + self.num_classes,
),
dtype=np.float32,
)
batch_sbboxes = np.zeros(
(self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32
)
batch_mbboxes = np.zeros(
(self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32
)
batch_lbboxes = np.zeros(
(self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32
)
num = 0
if self.batch_count < self.num_batchs:
while num < self.batch_size:
index = self.batch_count * self.batch_size + num
if index >= self.num_samples:
index -= self.num_samples
annotation = self.annotations[index]
image, bboxes = self.parse_annotation(annotation)
(
label_sbbox,
label_mbbox,
label_lbbox,
sbboxes,
mbboxes,
lbboxes,
) = self.preprocess_true_boxes(bboxes)
batch_image[num, :, :, :] = image
batch_label_sbbox[num, :, :, :, :] = label_sbbox
batch_label_mbbox[num, :, :, :, :] = label_mbbox
batch_label_lbbox[num, :, :, :, :] = label_lbbox
batch_sbboxes[num, :, :] = sbboxes
batch_mbboxes[num, :, :] = mbboxes
batch_lbboxes[num, :, :] = lbboxes
num += 1
self.batch_count += 1
batch_smaller_target = batch_label_sbbox, batch_sbboxes
batch_medium_target = batch_label_mbbox, batch_mbboxes
batch_larger_target = batch_label_lbbox, batch_lbboxes
return (
batch_image,
(
batch_smaller_target,
batch_medium_target,
batch_larger_target,
),
)
else:
self.batch_count = 0
np.random.shuffle(self.annotations)
raise StopIteration
def random_horizontal_flip(self, image, bboxes):
if random.random() < 0.5:
_, w, _ = image.shape
image = image[:, ::-1, :]
bboxes[:, [0, 2]] = w - bboxes[:, [2, 0]]
return image, bboxes
def random_crop(self, image, bboxes):
if random.random() < 0.5:
h, w, _ = image.shape
max_bbox = np.concatenate(
[
np.min(bboxes[:, 0:2], axis=0),
np.max(bboxes[:, 2:4], axis=0),
],
axis=-1,
)
max_l_trans = max_bbox[0]
max_u_trans = max_bbox[1]
max_r_trans = w - max_bbox[2]
max_d_trans = h - max_bbox[3]
crop_xmin = max(
0, int(max_bbox[0] - random.uniform(0, max_l_trans))
)
crop_ymin = max(
0, int(max_bbox[1] - random.uniform(0, max_u_trans))
)
crop_xmax = max(
w, int(max_bbox[2] + random.uniform(0, max_r_trans))
)
crop_ymax = max(
h, int(max_bbox[3] + random.uniform(0, max_d_trans))
)
image = image[crop_ymin:crop_ymax, crop_xmin:crop_xmax]
bboxes[:, [0, 2]] = bboxes[:, [0, 2]] - crop_xmin
bboxes[:, [1, 3]] = bboxes[:, [1, 3]] - crop_ymin
return image, bboxes
def random_translate(self, image, bboxes):
if random.random() < 0.5:
h, w, _ = image.shape
max_bbox = np.concatenate(
[
np.min(bboxes[:, 0:2], axis=0),
np.max(bboxes[:, 2:4], axis=0),
],
axis=-1,
)
max_l_trans = max_bbox[0]
max_u_trans = max_bbox[1]
max_r_trans = w - max_bbox[2]
max_d_trans = h - max_bbox[3]
tx = random.uniform(-(max_l_trans - 1), (max_r_trans - 1))
ty = random.uniform(-(max_u_trans - 1), (max_d_trans - 1))
M = np.array([[1, 0, tx], [0, 1, ty]])
image = cv2.warpAffine(image, M, (w, h))
bboxes[:, [0, 2]] = bboxes[:, [0, 2]] + tx
bboxes[:, [1, 3]] = bboxes[:, [1, 3]] + ty
return image, bboxes
def parse_annotation(self, annotation):
line = annotation.split()
image_path = line[0]
if not os.path.exists(image_path):
raise KeyError("%s does not exist ... " % image_path)
image = cv2.imread(image_path)
if self.dataset_type == "converted_coco":
bboxes = np.array(
[list(map(int, box.split(","))) for box in line[1:]]
)
elif self.dataset_type == "yolo":
height, width, _ = image.shape
bboxes = np.array(
[list(map(float, box.split(","))) for box in line[1:]]
)
bboxes = bboxes * np.array([width, height, width, height, 1])
bboxes = bboxes.astype(np.int64)
if self.data_aug:
image, bboxes = self.random_horizontal_flip(
np.copy(image), np.copy(bboxes)
)
image, bboxes = self.random_crop(np.copy(image), np.copy(bboxes))
image, bboxes = self.random_translate(
np.copy(image), np.copy(bboxes)
)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image, bboxes = utils.image_preprocess(
np.copy(image),
[self.train_input_size, self.train_input_size],
np.copy(bboxes),
)
return image, bboxes
def preprocess_true_boxes(self, bboxes):
label = [
np.zeros(
(
self.train_output_sizes[i],
self.train_output_sizes[i],
self.anchor_per_scale,
5 + self.num_classes,
)
)
for i in range(3)
]
bboxes_xywh = [np.zeros((self.max_bbox_per_scale, 4)) for _ in range(3)]
bbox_count = np.zeros((3,))
for bbox in bboxes:
bbox_coor = bbox[:4]
bbox_class_ind = bbox[4]
onehot = np.zeros(self.num_classes, dtype=np.float)
onehot[bbox_class_ind] = 1.0
uniform_distribution = np.full(
self.num_classes, 1.0 / self.num_classes
)
deta = 0.01
smooth_onehot = onehot * (1 - deta) + deta * uniform_distribution
bbox_xywh = np.concatenate(
[
(bbox_coor[2:] + bbox_coor[:2]) * 0.5,
bbox_coor[2:] - bbox_coor[:2],
],
axis=-1,
)
bbox_xywh_scaled = (
1.0 * bbox_xywh[np.newaxis, :] / self.strides[:, np.newaxis]
)
iou = []
exist_positive = False
for i in range(3):
anchors_xywh = np.zeros((self.anchor_per_scale, 4))
anchors_xywh[:, 0:2] = (
np.floor(bbox_xywh_scaled[i, 0:2]).astype(np.int32) + 0.5
)
anchors_xywh[:, 2:4] = self.anchors[i]
iou_scale = utils.bbox_iou(
bbox_xywh_scaled[i][np.newaxis, :], anchors_xywh
)
iou.append(iou_scale)
iou_mask = iou_scale > 0.3
if np.any(iou_mask):
xind, yind = np.floor(bbox_xywh_scaled[i, 0:2]).astype(
np.int32
)
label[i][yind, xind, iou_mask, :] = 0
label[i][yind, xind, iou_mask, 0:4] = bbox_xywh
label[i][yind, xind, iou_mask, 4:5] = 1.0
label[i][yind, xind, iou_mask, 5:] = smooth_onehot
bbox_ind = int(bbox_count[i] % self.max_bbox_per_scale)
bboxes_xywh[i][bbox_ind, :4] = bbox_xywh
bbox_count[i] += 1
exist_positive = True
if not exist_positive:
best_anchor_ind = np.argmax(np.array(iou).reshape(-1), axis=-1)
best_detect = int(best_anchor_ind / self.anchor_per_scale)
best_anchor = int(best_anchor_ind % self.anchor_per_scale)
xind, yind = np.floor(
bbox_xywh_scaled[best_detect, 0:2]
).astype(np.int32)
label[best_detect][yind, xind, best_anchor, :] = 0
label[best_detect][yind, xind, best_anchor, 0:4] = bbox_xywh
label[best_detect][yind, xind, best_anchor, 4:5] = 1.0
label[best_detect][yind, xind, best_anchor, 5:] = smooth_onehot
bbox_ind = int(
bbox_count[best_detect] % self.max_bbox_per_scale
)
bboxes_xywh[best_detect][bbox_ind, :4] = bbox_xywh
bbox_count[best_detect] += 1
label_sbbox, label_mbbox, label_lbbox = label
sbboxes, mbboxes, lbboxes = bboxes_xywh
return label_sbbox, label_mbbox, label_lbbox, sbboxes, mbboxes, lbboxes
def __len__(self):
return self.num_batchs