Spaces:
Sleeping
Sleeping
from __future__ import division | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
import numpy as np | |
from PIL import Image | |
from yolo.utils import * | |
# from utils import * | |
def get_test_input_normal(): | |
input_image = "dog-cycle-car.png" | |
image = Image.open(input_image) | |
image = image.convert("RGB") | |
img = image.resize(( 416 , 416 )) | |
img = np.asarray(img) | |
img_ = img[:,:,::-1].transpose((2,0,1)) # BGR -> RGB | H X W C -> C X H X W | |
img_ = img_[np.newaxis,:,:,:]/255.0 #Add a channel at 0 (for batch) | Normalise | |
img_ = torch.from_numpy(img_).float() #Convert to float | |
img_ = Variable(img_) # Convert to Variable | |
return img_ | |
def get_test_input(): | |
img = cv2.imread("dog-cycle-car.png") | |
img = cv2.resize(img, (416,416)) #Resize to the input dimension | |
img_ = img[:,:,::-1].transpose((2,0,1)) # BGR -> RGB | H X W C -> C X H X W | |
img_ = img_[np.newaxis,:,:,:]/255.0 #Add a channel at 0 (for batch) | Normalise | |
img_ = torch.from_numpy(img_).float() #Convert to float | |
img_ = Variable(img_) # Convert to Variable | |
return img_ | |
def parse_cfg(cfgfile): | |
""" | |
Takes a configuration file | |
Returns a list of blocks. Each blocks describes a block in the neural | |
network to be built. Block is represented as a dictionary in the list | |
""" | |
file = open(cfgfile, 'r') | |
lines = file.read().split('\n') #store the lines in a list | |
lines = [x for x in lines if len(x) > 0] #get read of the empty lines | |
lines = [x for x in lines if x[0] != '#'] | |
lines = [x.rstrip().lstrip() for x in lines] | |
block = {} | |
blocks = [] | |
for line in lines: | |
if line[0] == "[": #This marks the start of a new block | |
if len(block) != 0: | |
blocks.append(block) | |
block = {} | |
block["type"] = line[1:-1].rstrip() | |
else: | |
key,value = line.split("=") | |
block[key.rstrip()] = value.lstrip() | |
blocks.append(block) | |
return blocks | |
# print('\n\n'.join([repr(x) for x in blocks])) | |
import pickle as pkl | |
class MaxPoolStride1(nn.Module): | |
def __init__(self, kernel_size): | |
super(MaxPoolStride1, self).__init__() | |
self.kernel_size = kernel_size | |
self.pad = kernel_size - 1 | |
def forward(self, x): | |
padded_x = F.pad(x, (0,self.pad,0,self.pad), mode="replicate") | |
pooled_x = nn.MaxPool2d(self.kernel_size, self.pad)(padded_x) | |
return pooled_x | |
class EmptyLayer(nn.Module): | |
def __init__(self): | |
super(EmptyLayer, self).__init__() | |
class DetectionLayer(nn.Module): | |
def __init__(self, anchors): | |
super(DetectionLayer, self).__init__() | |
self.anchors = anchors | |
def forward(self, x, inp_dim, num_classes, confidence): | |
x = x.data | |
global CUDA | |
prediction = x | |
prediction = predict_transform(prediction, inp_dim, self.anchors, num_classes, confidence, CUDA) | |
return prediction | |
class Upsample(nn.Module): | |
def __init__(self, stride=2): | |
super(Upsample, self).__init__() | |
self.stride = stride | |
def forward(self, x): | |
stride = self.stride | |
assert(x.data.dim() == 4) | |
B = x.data.size(0) | |
C = x.data.size(1) | |
H = x.data.size(2) | |
W = x.data.size(3) | |
ws = stride | |
hs = stride | |
x = x.view(B, C, H, 1, W, 1).expand(B, C, H, stride, W, stride).contiguous().view(B, C, H*stride, W*stride) | |
return x | |
# | |
class ReOrgLayer(nn.Module): | |
def __init__(self, stride = 2): | |
super(ReOrgLayer, self).__init__() | |
self.stride= stride | |
def forward(self,x): | |
assert(x.data.dim() == 4) | |
B,C,H,W = x.data.shape | |
hs = self.stride | |
ws = self.stride | |
assert(H % hs == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(H) | |
assert(W % ws == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(W) | |
x = x.view(B,C, H // hs, hs, W // ws, ws).transpose(-2,-3).contiguous() | |
x = x.view(B,C, H // hs * W // ws, hs, ws) | |
x = x.view(B,C, H // hs * W // ws, hs*ws).transpose(-1,-2).contiguous() | |
x = x.view(B, C, ws*hs, H // ws, W // ws).transpose(1,2).contiguous() | |
x = x.view(B, C*ws*hs, H // ws, W // ws) | |
return x | |
def create_modules(blocks): | |
net_info = blocks[0] #Captures the information about the input and pre-processing | |
module_list = nn.ModuleList() | |
index = 0 #indexing blocks helps with implementing route layers (skip connections) | |
prev_filters = 3 | |
output_filters = [] | |
for x in blocks: | |
module = nn.Sequential() | |
if (x["type"] == "net"): | |
continue | |
#If it's a convolutional layer | |
if (x["type"] == "convolutional"): | |
#Get the info about the layer | |
activation = x["activation"] | |
try: | |
batch_normalize = int(x["batch_normalize"]) | |
bias = False | |
except: | |
batch_normalize = 0 | |
bias = True | |
filters= int(x["filters"]) | |
padding = int(x["pad"]) | |
kernel_size = int(x["size"]) | |
stride = int(x["stride"]) | |
if padding: | |
pad = (kernel_size - 1) // 2 | |
else: | |
pad = 0 | |
#Add the convolutional layer | |
conv = nn.Conv2d(prev_filters, filters, kernel_size, stride, pad, bias = bias) | |
module.add_module("conv_{0}".format(index), conv) | |
#Add the Batch Norm Layer | |
if batch_normalize: | |
bn = nn.BatchNorm2d(filters) | |
module.add_module("batch_norm_{0}".format(index), bn) | |
#Check the activation. | |
#It is either Linear or a Leaky ReLU for YOLO | |
if activation == "leaky": | |
activn = nn.LeakyReLU(0.1, inplace = True) | |
module.add_module("leaky_{0}".format(index), activn) | |
#If it's an upsampling layer | |
#We use Bilinear2dUpsampling | |
elif (x["type"] == "upsample"): | |
stride = int(x["stride"]) | |
# upsample = Upsample(stride) | |
upsample = nn.Upsample(scale_factor = 2, mode = "nearest") | |
module.add_module("upsample_{}".format(index), upsample) | |
#If it is a route layer | |
elif (x["type"] == "route"): | |
x["layers"] = x["layers"].split(',') | |
#Start of a route | |
start = int(x["layers"][0]) | |
#end, if there exists one. | |
try: | |
end = int(x["layers"][1]) | |
except: | |
end = 0 | |
#Positive anotation | |
if start > 0: | |
start = start - index | |
if end > 0: | |
end = end - index | |
route = EmptyLayer() | |
module.add_module("route_{0}".format(index), route) | |
if end < 0: | |
filters = output_filters[index + start] + output_filters[index + end] | |
else: | |
filters= output_filters[index + start] | |
#shortcut corresponds to skip connection | |
elif x["type"] == "shortcut": | |
from_ = int(x["from"]) | |
shortcut = EmptyLayer() | |
module.add_module("shortcut_{}".format(index), shortcut) | |
elif x["type"] == "maxpool": | |
stride = int(x["stride"]) | |
size = int(x["size"]) | |
if stride != 1: | |
maxpool = nn.MaxPool2d(size, stride) | |
else: | |
maxpool = MaxPoolStride1(size) | |
module.add_module("maxpool_{}".format(index), maxpool) | |
#Yolo is the detection layer | |
elif x["type"] == "yolo": | |
mask = x["mask"].split(",") | |
mask = [int(x) for x in mask] | |
anchors = x["anchors"].split(",") | |
anchors = [int(a) for a in anchors] | |
anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors),2)] | |
anchors = [anchors[i] for i in mask] | |
detection = DetectionLayer(anchors) | |
module.add_module("Detection_{}".format(index), detection) | |
else: | |
print("Something I dunno") | |
assert False | |
module_list.append(module) | |
prev_filters = filters | |
output_filters.append(filters) | |
index += 1 | |
return (net_info, module_list) | |
class Darknet(nn.Module): | |
def __init__(self, cfgfile): | |
super(Darknet, self).__init__() | |
self.blocks = parse_cfg(cfgfile) | |
self.net_info, self.module_list = create_modules(self.blocks) | |
self.header = torch.IntTensor([0,0,0,0]) | |
self.seen = 0 | |
def get_blocks(self): | |
return self.blocks | |
def get_module_list(self): | |
return self.module_list | |
def forward(self, x, CUDA): | |
detections = [] | |
modules = self.blocks[1:] | |
outputs = {} #We cache the outputs for the route layer | |
write = 0 | |
for i in range(len(modules)): | |
module_type = (modules[i]["type"]) | |
if module_type == "convolutional" or module_type == "upsample" or module_type == "maxpool": | |
x = self.module_list[i](x) | |
outputs[i] = x | |
elif module_type == "route": | |
layers = modules[i]["layers"] | |
layers = [int(a) for a in layers] | |
if (layers[0]) > 0: | |
layers[0] = layers[0] - i | |
if len(layers) == 1: | |
x = outputs[i + (layers[0])] | |
else: | |
if (layers[1]) > 0: | |
layers[1] = layers[1] - i | |
map1 = outputs[i + layers[0]] | |
map2 = outputs[i + layers[1]] | |
x = torch.cat((map1, map2), 1) | |
outputs[i] = x | |
elif module_type == "shortcut": | |
from_ = int(modules[i]["from"]) | |
x = outputs[i-1] + outputs[i+from_] | |
outputs[i] = x | |
elif module_type == 'yolo': | |
anchors = self.module_list[i][0].anchors | |
#Get the input dimensions | |
inp_dim = int (self.net_info["height"]) | |
#Get the number of classes | |
num_classes = int (modules[i]["classes"]) | |
#Output the result | |
x = x.data | |
x = predict_transform(x, inp_dim, anchors, num_classes, CUDA) | |
if type(x) == int: | |
continue | |
if not write: | |
detections = x | |
write = 1 | |
else: | |
detections = torch.cat((detections, x), 1) | |
outputs[i] = outputs[i-1] | |
try: | |
return detections | |
except: | |
return 0 | |
def load_weights_url(self, weightfile): | |
# Open the weights file | |
fp = get_data_s3(weightfile) | |
# The first 5 values are header information | |
# 1. Major version number | |
# 2. Minor Version Number | |
# 3. Subversion number | |
# 4,5. Images seen by the network (during training) | |
header = np.frombuffer( fp.getvalue() , dtype = np.int32, count = 5) | |
self.header = torch.from_numpy(header) | |
self.seen = self.header[3] | |
weights = np.frombuffer( fp.getvalue() , dtype = np.float32) | |
ptr = 0 | |
for i in range(len(self.module_list)): | |
module_type = self.blocks[i + 1]["type"] | |
#If module_type is convolutional load weights | |
#Otherwise ignore. | |
if module_type == "convolutional": | |
model = self.module_list[i] | |
try: | |
batch_normalize = int(self.blocks[i+1]["batch_normalize"]) | |
except: | |
batch_normalize = 0 | |
conv = model[0] | |
if (batch_normalize): | |
bn = model[1] | |
#Get the number of weights of Batch Norm Layer | |
num_bn_biases = bn.bias.numel() | |
#Load the weights | |
bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
#Cast the loaded weights into dims of model weights. | |
bn_biases = bn_biases.view_as(bn.bias.data) | |
bn_weights = bn_weights.view_as(bn.weight.data) | |
bn_running_mean = bn_running_mean.view_as(bn.running_mean) | |
bn_running_var = bn_running_var.view_as(bn.running_var) | |
#Copy the data to model | |
bn.bias.data.copy_(bn_biases) | |
bn.weight.data.copy_(bn_weights) | |
bn.running_mean.copy_(bn_running_mean) | |
bn.running_var.copy_(bn_running_var) | |
else: | |
#Number of biases | |
num_biases = conv.bias.numel() | |
#Load the weights | |
conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases]) | |
ptr = ptr + num_biases | |
#reshape the loaded weights according to the dims of the model weights | |
conv_biases = conv_biases.view_as(conv.bias.data) | |
#Finally copy the data | |
conv.bias.data.copy_(conv_biases) | |
#Let us load the weights for the Convolutional layers | |
num_weights = conv.weight.numel() | |
#Do the same as above for weights | |
conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights]) | |
ptr = ptr + num_weights | |
conv_weights = conv_weights.view_as(conv.weight.data) | |
conv.weight.data.copy_(conv_weights) | |
def load_weights(self, weightfile): | |
# Open the weights file | |
fp = open(weightfile, "rb") | |
# The first 5 values are header information | |
# 1. Major version number | |
# 2. Minor Version Number | |
# 3. Subversion number | |
# 4,5. Images seen by the network (during training) | |
header = np.fromfile(fp, dtype = np.int32, count = 5) | |
self.header = torch.from_numpy(header) | |
self.seen = self.header[3] | |
weights = np.fromfile(fp, dtype = np.float32) | |
ptr = 0 | |
for i in range(len(self.module_list)): | |
module_type = self.blocks[i + 1]["type"] | |
#If module_type is convolutional load weights | |
#Otherwise ignore. | |
if module_type == "convolutional": | |
model = self.module_list[i] | |
try: | |
batch_normalize = int(self.blocks[i+1]["batch_normalize"]) | |
except: | |
batch_normalize = 0 | |
conv = model[0] | |
if (batch_normalize): | |
bn = model[1] | |
#Get the number of weights of Batch Norm Layer | |
num_bn_biases = bn.bias.numel() | |
#Load the weights | |
bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) | |
ptr += num_bn_biases | |
#Cast the loaded weights into dims of model weights. | |
bn_biases = bn_biases.view_as(bn.bias.data) | |
bn_weights = bn_weights.view_as(bn.weight.data) | |
bn_running_mean = bn_running_mean.view_as(bn.running_mean) | |
bn_running_var = bn_running_var.view_as(bn.running_var) | |
#Copy the data to model | |
bn.bias.data.copy_(bn_biases) | |
bn.weight.data.copy_(bn_weights) | |
bn.running_mean.copy_(bn_running_mean) | |
bn.running_var.copy_(bn_running_var) | |
else: | |
#Number of biases | |
num_biases = conv.bias.numel() | |
#Load the weights | |
conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases]) | |
ptr = ptr + num_biases | |
#reshape the loaded weights according to the dims of the model weights | |
conv_biases = conv_biases.view_as(conv.bias.data) | |
#Finally copy the data | |
conv.bias.data.copy_(conv_biases) | |
#Let us load the weights for the Convolutional layers | |
num_weights = conv.weight.numel() | |
#Do the same as above for weights | |
conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights]) | |
ptr = ptr + num_weights | |
conv_weights = conv_weights.view_as(conv.weight.data) | |
conv.weight.data.copy_(conv_weights) | |
if __name__ == '__main__': | |
model = Darknet("yolov3.cfg") | |
model.load_weights_url("yolov3.weights") | |
CUDA = torch.cuda.is_available() | |
print(' cuda : ' , CUDA ) | |
inp = get_test_input() | |
# if CUDA: | |
# model.cuda() | |
# inp.cuda() | |
pred = model( inp , False ) | |
print (pred) | |
print( 'shape' , pred.shape ) |