# YOLOv5 🚀 by Ultralytics, GPL-3.0 license """ TensorFlow, Keras and TFLite versions of YOLOv5 Authored by https://github.com/zldrobit in PR https://github.com/ultralytics/yolov5/pull/1127 Usage: $ python models/tf.py --weights yolov5s.pt Export: $ python path/to/export.py --weights yolov5s.pt --include saved_model pb tflite tfjs """ import argparse import logging import sys from copy import deepcopy from pathlib import Path FILE = Path(__file__).resolve() ROOT = FILE.parents[1] # yolov5/ dir sys.path.append(ROOT.as_posix()) # add yolov5/ to path import numpy as np import tensorflow as tf import torch import torch.nn as nn from tensorflow import keras from models.common import Conv, Bottleneck, SPP, DWConv, Focus, BottleneckCSP, Concat, autopad, C3 from models.experimental import MixConv2d, CrossConv, attempt_load from models.yolo import Detect from utils.general import colorstr, make_divisible, set_logging from utils.activations import SiLU LOGGER = logging.getLogger(__name__) class TFBN(keras.layers.Layer): # TensorFlow BatchNormalization wrapper def __init__(self, w=None): super(TFBN, self).__init__() self.bn = keras.layers.BatchNormalization( beta_initializer=keras.initializers.Constant(w.bias.numpy()), gamma_initializer=keras.initializers.Constant(w.weight.numpy()), moving_mean_initializer=keras.initializers.Constant(w.running_mean.numpy()), moving_variance_initializer=keras.initializers.Constant(w.running_var.numpy()), epsilon=w.eps) def call(self, inputs): return self.bn(inputs) class TFPad(keras.layers.Layer): def __init__(self, pad): super(TFPad, self).__init__() self.pad = tf.constant([[0, 0], [pad, pad], [pad, pad], [0, 0]]) def call(self, inputs): return tf.pad(inputs, self.pad, mode='constant', constant_values=0) class TFConv(keras.layers.Layer): # Standard convolution def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True, w=None): # ch_in, ch_out, weights, kernel, stride, padding, groups super(TFConv, self).__init__() assert g == 1, "TF v2.2 Conv2D does not support 'groups' argument" assert isinstance(k, int), "Convolution with multiple kernels are not allowed." # TensorFlow convolution padding is inconsistent with PyTorch (e.g. k=3 s=2 'SAME' padding) # see https://stackoverflow.com/questions/52975843/comparing-conv2d-with-padding-between-tensorflow-and-pytorch conv = keras.layers.Conv2D( c2, k, s, 'SAME' if s == 1 else 'VALID', use_bias=False, kernel_initializer=keras.initializers.Constant(w.conv.weight.permute(2, 3, 1, 0).numpy())) self.conv = conv if s == 1 else keras.Sequential([TFPad(autopad(k, p)), conv]) self.bn = TFBN(w.bn) if hasattr(w, 'bn') else tf.identity # YOLOv5 activations if isinstance(w.act, nn.LeakyReLU): self.act = (lambda x: keras.activations.relu(x, alpha=0.1)) if act else tf.identity elif isinstance(w.act, nn.Hardswish): self.act = (lambda x: x * tf.nn.relu6(x + 3) * 0.166666667) if act else tf.identity elif isinstance(w.act, (nn.SiLU, SiLU)): self.act = (lambda x: keras.activations.swish(x)) if act else tf.identity else: raise Exception(f'no matching TensorFlow activation found for {w.act}') def call(self, inputs): return self.act(self.bn(self.conv(inputs))) class TFFocus(keras.layers.Layer): # Focus wh information into c-space def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True, w=None): # ch_in, ch_out, kernel, stride, padding, groups super(TFFocus, self).__init__() self.conv = TFConv(c1 * 4, c2, k, s, p, g, act, w.conv) def call(self, inputs): # x(b,w,h,c) -> y(b,w/2,h/2,4c) # inputs = inputs / 255. # normalize 0-255 to 0-1 return self.conv(tf.concat([inputs[:, ::2, ::2, :], inputs[:, 1::2, ::2, :], inputs[:, ::2, 1::2, :], inputs[:, 1::2, 1::2, :]], 3)) class TFBottleneck(keras.layers.Layer): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, e=0.5, w=None): # ch_in, ch_out, shortcut, groups, expansion super(TFBottleneck, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = TFConv(c1, c_, 1, 1, w=w.cv1) self.cv2 = TFConv(c_, c2, 3, 1, g=g, w=w.cv2) self.add = shortcut and c1 == c2 def call(self, inputs): return inputs + self.cv2(self.cv1(inputs)) if self.add else self.cv2(self.cv1(inputs)) class TFConv2d(keras.layers.Layer): # Substitution for PyTorch nn.Conv2D def __init__(self, c1, c2, k, s=1, g=1, bias=True, w=None): super(TFConv2d, self).__init__() assert g == 1, "TF v2.2 Conv2D does not support 'groups' argument" self.conv = keras.layers.Conv2D( c2, k, s, 'VALID', use_bias=bias, kernel_initializer=keras.initializers.Constant(w.weight.permute(2, 3, 1, 0).numpy()), bias_initializer=keras.initializers.Constant(w.bias.numpy()) if bias else None, ) def call(self, inputs): return self.conv(inputs) class TFBottleneckCSP(keras.layers.Layer): # CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5, w=None): # ch_in, ch_out, number, shortcut, groups, expansion super(TFBottleneckCSP, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = TFConv(c1, c_, 1, 1, w=w.cv1) self.cv2 = TFConv2d(c1, c_, 1, 1, bias=False, w=w.cv2) self.cv3 = TFConv2d(c_, c_, 1, 1, bias=False, w=w.cv3) self.cv4 = TFConv(2 * c_, c2, 1, 1, w=w.cv4) self.bn = TFBN(w.bn) self.act = lambda x: keras.activations.relu(x, alpha=0.1) self.m = keras.Sequential([TFBottleneck(c_, c_, shortcut, g, e=1.0, w=w.m[j]) for j in range(n)]) def call(self, inputs): y1 = self.cv3(self.m(self.cv1(inputs))) y2 = self.cv2(inputs) return self.cv4(self.act(self.bn(tf.concat((y1, y2), axis=3)))) class TFC3(keras.layers.Layer): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5, w=None): # ch_in, ch_out, number, shortcut, groups, expansion super(TFC3, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = TFConv(c1, c_, 1, 1, w=w.cv1) self.cv2 = TFConv(c1, c_, 1, 1, w=w.cv2) self.cv3 = TFConv(2 * c_, c2, 1, 1, w=w.cv3) self.m = keras.Sequential([TFBottleneck(c_, c_, shortcut, g, e=1.0, w=w.m[j]) for j in range(n)]) def call(self, inputs): return self.cv3(tf.concat((self.m(self.cv1(inputs)), self.cv2(inputs)), axis=3)) class TFSPP(keras.layers.Layer): # Spatial pyramid pooling layer used in YOLOv3-SPP def __init__(self, c1, c2, k=(5, 9, 13), w=None): super(TFSPP, self).__init__() c_ = c1 // 2 # hidden channels self.cv1 = TFConv(c1, c_, 1, 1, w=w.cv1) self.cv2 = TFConv(c_ * (len(k) + 1), c2, 1, 1, w=w.cv2) self.m = [keras.layers.MaxPool2D(pool_size=x, strides=1, padding='SAME') for x in k] def call(self, inputs): x = self.cv1(inputs) return self.cv2(tf.concat([x] + [m(x) for m in self.m], 3)) class TFDetect(keras.layers.Layer): def __init__(self, nc=80, anchors=(), ch=(), imgsz=(640, 640), w=None): # detection layer super(TFDetect, self).__init__() self.stride = tf.convert_to_tensor(w.stride.numpy(), dtype=tf.float32) self.nc = nc # number of classes self.no = nc + 5 # number of outputs per anchor self.nl = len(anchors) # number of detection layers self.na = len(anchors[0]) // 2 # number of anchors self.grid = [tf.zeros(1)] * self.nl # init grid self.anchors = tf.convert_to_tensor(w.anchors.numpy(), dtype=tf.float32) self.anchor_grid = tf.reshape(tf.convert_to_tensor(w.anchor_grid.numpy(), dtype=tf.float32), [self.nl, 1, -1, 1, 2]) self.m = [TFConv2d(x, self.no * self.na, 1, w=w.m[i]) for i, x in enumerate(ch)] self.training = False # set to False after building model self.imgsz = imgsz for i in range(self.nl): ny, nx = self.imgsz[0] // self.stride[i], self.imgsz[1] // self.stride[i] self.grid[i] = self._make_grid(nx, ny) def call(self, inputs): z = [] # inference output x = [] for i in range(self.nl): x.append(self.m[i](inputs[i])) # x(bs,20,20,255) to x(bs,3,20,20,85) ny, nx = self.imgsz[0] // self.stride[i], self.imgsz[1] // self.stride[i] x[i] = tf.transpose(tf.reshape(x[i], [-1, ny * nx, self.na, self.no]), [0, 2, 1, 3]) if not self.training: # inference y = tf.sigmoid(x[i]) xy = (y[..., 0:2] * 2. - 0.5 + self.grid[i]) * self.stride[i] # xy wh = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i] # Normalize xywh to 0-1 to reduce calibration error xy /= tf.constant([[self.imgsz[1], self.imgsz[0]]], dtype=tf.float32) wh /= tf.constant([[self.imgsz[1], self.imgsz[0]]], dtype=tf.float32) y = tf.concat([xy, wh, y[..., 4:]], -1) z.append(tf.reshape(y, [-1, 3 * ny * nx, self.no])) return x if self.training else (tf.concat(z, 1), x) @staticmethod def _make_grid(nx=20, ny=20): # yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)]) # return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float() xv, yv = tf.meshgrid(tf.range(nx), tf.range(ny)) return tf.cast(tf.reshape(tf.stack([xv, yv], 2), [1, 1, ny * nx, 2]), dtype=tf.float32) class TFUpsample(keras.layers.Layer): def __init__(self, size, scale_factor, mode, w=None): # warning: all arguments needed including 'w' super(TFUpsample, self).__init__() assert scale_factor == 2, "scale_factor must be 2" self.upsample = lambda x: tf.image.resize(x, (x.shape[1] * 2, x.shape[2] * 2), method=mode) # self.upsample = keras.layers.UpSampling2D(size=scale_factor, interpolation=mode) # with default arguments: align_corners=False, half_pixel_centers=False # self.upsample = lambda x: tf.raw_ops.ResizeNearestNeighbor(images=x, # size=(x.shape[1] * 2, x.shape[2] * 2)) def call(self, inputs): return self.upsample(inputs) class TFConcat(keras.layers.Layer): def __init__(self, dimension=1, w=None): super(TFConcat, self).__init__() assert dimension == 1, "convert only NCHW to NHWC concat" self.d = 3 def call(self, inputs): return tf.concat(inputs, self.d) def parse_model(d, ch, model, imgsz): # model_dict, input_channels(3) LOGGER.info('\n%3s%18s%3s%10s %-40s%-30s' % ('', 'from', 'n', 'params', 'module', 'arguments')) anchors, nc, gd, gw = d['anchors'], d['nc'], d['depth_multiple'], d['width_multiple'] na = (len(anchors[0]) // 2) if isinstance(anchors, list) else anchors # number of anchors no = na * (nc + 5) # number of outputs = anchors * (classes + 5) layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out for i, (f, n, m, args) in enumerate(d['backbone'] + d['head']): # from, number, module, args m_str = m m = eval(m) if isinstance(m, str) else m # eval strings for j, a in enumerate(args): try: args[j] = eval(a) if isinstance(a, str) else a # eval strings except: pass n = max(round(n * gd), 1) if n > 1 else n # depth gain if m in [nn.Conv2d, Conv, Bottleneck, SPP, DWConv, MixConv2d, Focus, CrossConv, BottleneckCSP, C3]: c1, c2 = ch[f], args[0] c2 = make_divisible(c2 * gw, 8) if c2 != no else c2 args = [c1, c2, *args[1:]] if m in [BottleneckCSP, C3]: args.insert(2, n) n = 1 elif m is nn.BatchNorm2d: args = [ch[f]] elif m is Concat: c2 = sum([ch[-1 if x == -1 else x + 1] for x in f]) elif m is Detect: args.append([ch[x + 1] for x in f]) if isinstance(args[1], int): # number of anchors args[1] = [list(range(args[1] * 2))] * len(f) args.append(imgsz) else: c2 = ch[f] tf_m = eval('TF' + m_str.replace('nn.', '')) m_ = keras.Sequential([tf_m(*args, w=model.model[i][j]) for j in range(n)]) if n > 1 \ else tf_m(*args, w=model.model[i]) # module torch_m_ = nn.Sequential(*[m(*args) for _ in range(n)]) if n > 1 else m(*args) # module t = str(m)[8:-2].replace('__main__.', '') # module type np = sum([x.numel() for x in torch_m_.parameters()]) # number params m_.i, m_.f, m_.type, m_.np = i, f, t, np # attach index, 'from' index, type, number params LOGGER.info('%3s%18s%3s%10.0f %-40s%-30s' % (i, f, n, np, t, args)) # print save.extend(x % i for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelist layers.append(m_) ch.append(c2) return keras.Sequential(layers), sorted(save) class TFModel: def __init__(self, cfg='yolov5s.yaml', ch=3, nc=None, model=None, imgsz=(640, 640)): # model, channels, classes super(TFModel, self).__init__() if isinstance(cfg, dict): self.yaml = cfg # model dict else: # is *.yaml import yaml # for torch hub self.yaml_file = Path(cfg).name with open(cfg) as f: self.yaml = yaml.load(f, Loader=yaml.FullLoader) # model dict # Define model if nc and nc != self.yaml['nc']: print('Overriding %s nc=%g with nc=%g' % (cfg, self.yaml['nc'], nc)) self.yaml['nc'] = nc # override yaml value self.model, self.savelist = parse_model(deepcopy(self.yaml), ch=[ch], model=model, imgsz=imgsz) def predict(self, inputs, tf_nms=False, agnostic_nms=False, topk_per_class=100, topk_all=100, iou_thres=0.45, conf_thres=0.25): y = [] # outputs x = inputs for i, m in enumerate(self.model.layers): if m.f != -1: # if not from previous layer x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers x = m(x) # run y.append(x if m.i in self.savelist else None) # save output # Add TensorFlow NMS if tf_nms: boxes = self._xywh2xyxy(x[0][..., :4]) probs = x[0][:, :, 4:5] classes = x[0][:, :, 5:] scores = probs * classes if agnostic_nms: nms = AgnosticNMS()((boxes, classes, scores), topk_all, iou_thres, conf_thres) return nms, x[1] else: boxes = tf.expand_dims(boxes, 2) nms = tf.image.combined_non_max_suppression( boxes, scores, topk_per_class, topk_all, iou_thres, conf_thres, clip_boxes=False) return nms, x[1] return x[0] # output only first tensor [1,6300,85] = [xywh, conf, class0, class1, ...] # x = x[0][0] # [x(1,6300,85), ...] to x(6300,85) # xywh = x[..., :4] # x(6300,4) boxes # conf = x[..., 4:5] # x(6300,1) confidences # cls = tf.reshape(tf.cast(tf.argmax(x[..., 5:], axis=1), tf.float32), (-1, 1)) # x(6300,1) classes # return tf.concat([conf, cls, xywh], 1) @staticmethod def _xywh2xyxy(xywh): # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right x, y, w, h = tf.split(xywh, num_or_size_splits=4, axis=-1) return tf.concat([x - w / 2, y - h / 2, x + w / 2, y + h / 2], axis=-1) class AgnosticNMS(keras.layers.Layer): # TF Agnostic NMS def call(self, input, topk_all, iou_thres, conf_thres): # wrap map_fn to avoid TypeSpec related error https://stackoverflow.com/a/65809989/3036450 return tf.map_fn(self._nms, input, fn_output_signature=(tf.float32, tf.float32, tf.float32, tf.int32), name='agnostic_nms') @staticmethod def _nms(x, topk_all=100, iou_thres=0.45, conf_thres=0.25): # agnostic NMS boxes, classes, scores = x class_inds = tf.cast(tf.argmax(classes, axis=-1), tf.float32) scores_inp = tf.reduce_max(scores, -1) selected_inds = tf.image.non_max_suppression( boxes, scores_inp, max_output_size=topk_all, iou_threshold=iou_thres, score_threshold=conf_thres) selected_boxes = tf.gather(boxes, selected_inds) padded_boxes = tf.pad(selected_boxes, paddings=[[0, topk_all - tf.shape(selected_boxes)[0]], [0, 0]], mode="CONSTANT", constant_values=0.0) selected_scores = tf.gather(scores_inp, selected_inds) padded_scores = tf.pad(selected_scores, paddings=[[0, topk_all - tf.shape(selected_boxes)[0]]], mode="CONSTANT", constant_values=-1.0) selected_classes = tf.gather(class_inds, selected_inds) padded_classes = tf.pad(selected_classes, paddings=[[0, topk_all - tf.shape(selected_boxes)[0]]], mode="CONSTANT", constant_values=-1.0) valid_detections = tf.shape(selected_inds)[0] return padded_boxes, padded_scores, padded_classes, valid_detections def representative_dataset_gen(dataset, ncalib=100): # Representative dataset generator for use with converter.representative_dataset, returns a generator of np arrays for n, (path, img, im0s, vid_cap) in enumerate(dataset): input = np.transpose(img, [1, 2, 0]) input = np.expand_dims(input, axis=0).astype(np.float32) input /= 255.0 yield [input] if n >= ncalib: break def run(weights=ROOT / 'yolov5s.pt', # weights path imgsz=(640, 640), # inference size h,w batch_size=1, # batch size dynamic=False, # dynamic batch size ): # PyTorch model im = torch.zeros((batch_size, 3, *imgsz)) # BCHW image model = attempt_load(weights, map_location=torch.device('cpu'), inplace=True, fuse=False) y = model(im) # inference model.info() # TensorFlow model im = tf.zeros((batch_size, *imgsz, 3)) # BHWC image tf_model = TFModel(cfg=model.yaml, model=model, nc=model.nc, imgsz=imgsz) y = tf_model.predict(im) # inference # Keras model im = keras.Input(shape=(*imgsz, 3), batch_size=None if dynamic else batch_size) keras_model = keras.Model(inputs=im, outputs=tf_model.predict(im)) keras_model.summary() def parse_opt(): parser = argparse.ArgumentParser() parser.add_argument('--weights', type=str, default=ROOT / 'yolov5s.pt', help='weights path') parser.add_argument('--imgsz', '--img', '--img-size', nargs='+', type=int, default=[640], help='inference size h,w') parser.add_argument('--batch-size', type=int, default=1, help='batch size') parser.add_argument('--dynamic', action='store_true', help='dynamic batch size') opt = parser.parse_args() opt.imgsz *= 2 if len(opt.imgsz) == 1 else 1 # expand return opt def main(opt): set_logging() print(colorstr('tf.py: ') + ', '.join(f'{k}={v}' for k, v in vars(opt).items())) run(**vars(opt)) if __name__ == "__main__": opt = parse_opt() main(opt)