Spaces:
Runtime error
Runtime error
""" | |
Implementation of YOLOv3 architecture | |
""" | |
import os | |
import pytorch_lightning as pl | |
import pandas as pd | |
import seaborn as sn | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import torchvision | |
from IPython.core.display import display | |
#from pl_bolts.datamodules import CIFAR10DataModule | |
#from pl_bolts.transforms.dataset_normalizations import cifar10_normalization | |
from pytorch_lightning import LightningModule, Trainer, seed_everything | |
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint | |
from pytorch_lightning.callbacks.progress import TQDMProgressBar | |
from pytorch_lightning.loggers import CSVLogger | |
from torch.optim.lr_scheduler import OneCycleLR | |
from torchmetrics.functional import accuracy | |
import torch.cuda.amp as amp | |
from torch.utils.data import DataLoader | |
from loss import YoloLoss | |
from pytorch_lightning import LightningModule, Trainer | |
from torch.optim.lr_scheduler import OneCycleLR | |
from torch_lr_finder import LRFinder | |
import torch.nn as nn | |
from dataset import YOLODataset | |
import config | |
import torch | |
import torch.optim as optim | |
import os | |
#from model import YOLOv3 | |
from tqdm import tqdm | |
from utils import ( | |
mean_average_precision, | |
cells_to_bboxes, | |
get_evaluation_bboxes, | |
save_checkpoint, | |
load_checkpoint, | |
check_class_accuracy, | |
get_loaders, | |
plot_couple_examples | |
) | |
from loss import YoloLoss | |
import warnings | |
from pytorch_lightning import LightningModule | |
import torch | |
from loss import YoloLoss | |
import torch.nn as nn | |
import config | |
""" | |
Information about architecture config: | |
Tuple is structured by (filters, kernel_size, stride) | |
Every conv is a same convolution. | |
List is structured by "B" indicating a residual block followed by the number of repeats | |
"S" is for scale prediction block and computing the yolo loss | |
"U" is for upsampling the feature map and concatenating with a previous layer | |
""" | |
config_1 = [ | |
(32, 3, 1), | |
(64, 3, 2), | |
["B", 1], | |
(128, 3, 2), | |
["B", 2], | |
(256, 3, 2), | |
["B", 8], | |
(512, 3, 2), | |
["B", 8], | |
(1024, 3, 2), | |
["B", 4], # To this point is Darknet-53 | |
(512, 1, 1), | |
(1024, 3, 1), | |
"S", | |
(256, 1, 1), | |
"U", | |
(256, 1, 1), | |
(512, 3, 1), | |
"S", | |
(128, 1, 1), | |
"U", | |
(128, 1, 1), | |
(256, 3, 1), | |
"S", | |
] | |
class CNNBlock(nn.Module): | |
def __init__(self, in_channels, out_channels, bn_act=True, **kwargs): | |
super().__init__() | |
self.conv = nn.Conv2d(in_channels, out_channels, bias=not bn_act, **kwargs) | |
self.bn = nn.BatchNorm2d(out_channels) | |
self.leaky = nn.LeakyReLU(0.1) | |
self.use_bn_act = bn_act | |
def forward(self, x): | |
if self.use_bn_act: | |
return self.leaky(self.bn(self.conv(x))) | |
else: | |
return self.conv(x) | |
class ResidualBlock(nn.Module): | |
def __init__(self, channels, use_residual=True, num_repeats=1): | |
super().__init__() | |
self.layers = nn.ModuleList() | |
for repeat in range(num_repeats): | |
self.layers += [ | |
nn.Sequential( | |
CNNBlock(channels, channels // 2, kernel_size=1), | |
CNNBlock(channels // 2, channels, kernel_size=3, padding=1), | |
) | |
] | |
self.use_residual = use_residual | |
self.num_repeats = num_repeats | |
def forward(self, x): | |
for layer in self.layers: | |
if self.use_residual: | |
x = x + layer(x) | |
else: | |
x = layer(x) | |
return x | |
class ScalePrediction(nn.Module): | |
def __init__(self, in_channels, num_classes): | |
super().__init__() | |
self.pred = nn.Sequential( | |
CNNBlock(in_channels, 2 * in_channels, kernel_size=3, padding=1), | |
CNNBlock( | |
2 * in_channels, (num_classes + 5) * 3, bn_act=False, kernel_size=1 | |
), | |
) | |
self.num_classes = num_classes | |
def forward(self, x): | |
return ( | |
self.pred(x) | |
.reshape(x.shape[0], 3, self.num_classes + 5, x.shape[2], x.shape[3]) | |
.permute(0, 1, 3, 4, 2) | |
) | |
class YOLOv3(LightningModule): | |
def __init__(self, in_channels=3, num_classes=80): | |
super().__init__() | |
self.num_classes = num_classes | |
self.in_channels = in_channels | |
self.layers = self._create_conv_layers() | |
def forward(self, x): | |
outputs = [] # for each scale | |
route_connections = [] | |
for layer in self.layers: | |
if isinstance(layer, ScalePrediction): | |
outputs.append(layer(x)) | |
continue | |
x = layer(x) | |
if isinstance(layer, ResidualBlock) and layer.num_repeats == 8: | |
route_connections.append(x) | |
elif isinstance(layer, nn.Upsample): | |
x = torch.cat([x, route_connections[-1]], dim=1) | |
route_connections.pop() | |
return outputs | |
def _create_conv_layers(self): | |
layers = nn.ModuleList() | |
in_channels = self.in_channels | |
for module in config_1: | |
if isinstance(module, tuple): | |
out_channels, kernel_size, stride = module | |
layers.append( | |
CNNBlock( | |
in_channels, | |
out_channels, | |
kernel_size=kernel_size, | |
stride=stride, | |
padding=1 if kernel_size == 3 else 0, | |
) | |
) | |
in_channels = out_channels | |
elif isinstance(module, list): | |
num_repeats = module[1] | |
layers.append(ResidualBlock(in_channels, num_repeats=num_repeats,)) | |
elif isinstance(module, str): | |
if module == "S": | |
layers += [ | |
ResidualBlock(in_channels, use_residual=False, num_repeats=1), | |
CNNBlock(in_channels, in_channels // 2, kernel_size=1), | |
ScalePrediction(in_channels // 2, num_classes=self.num_classes), | |
] | |
in_channels = in_channels // 2 | |
elif module == "U": | |
layers.append(nn.Upsample(scale_factor=2),) | |
in_channels = in_channels * 3 | |
return layers | |
class YoloVersion3(LightningModule): | |
def __init__(self): | |
super(YoloVersion3, self).__init__( ) | |
self.save_hyperparameters() | |
# Set our init args as class attributes | |
self.learning_rate=config.LEARNING_RATE | |
#self.config=config | |
self.num_classes=config.NUM_CLASSES | |
self.train_csv=config.DATASET + "/train.csv" | |
self.test_csv=config.DATASET + "/test.csv" | |
self.loss_fn= YoloLoss() | |
self.scaler = amp.GradScaler() | |
#self.train_transform_function= config.train_transforms | |
#self.in_channels = 3 | |
self.model= YOLOv3(num_classes=config.NUM_CLASSES).to(config.DEVICE) | |
self.scaled_anchors = ( | |
torch.tensor(config.ANCHORS) * torch.tensor(config.S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2)).to(config.DEVICE) | |
#self.register_buffer("scaled_anchors", self.scaled_anchors) | |
self.training_step_outputs = [] | |
def forward(self, x): | |
return self.model(x) | |
def training_step(self, batch, batch_idx): | |
x, y = batch | |
y0, y1, y2 = ( | |
y[0], | |
y[1], | |
y[2], | |
) | |
out = self(x) | |
loss = ( | |
self.loss_fn(out[0], y0, self.scaled_anchors[0]) | |
+ self.loss_fn(out[1], y1, self.scaled_anchors[1]) | |
+ self.loss_fn(out[2], y2, self.scaled_anchors[2]) | |
) | |
self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True) # Logging the training loss for visualization | |
self.training_step_outputs.append(loss) | |
return loss | |
def on_train_epoch_end(self): | |
print(f"\nCurrently epoch {self.current_epoch}") | |
train_epoch_average = torch.stack(self.training_step_outputs).mean() | |
self.training_step_outputs.clear() | |
print(f"Train loss {train_epoch_average}") | |
print("On Train Eval loader:") | |
print("On Train loader:") | |
class_accuracy, no_obj_accuracy, obj_accuracy = check_class_accuracy(self.model, self.train_loader, threshold=config.CONF_THRESHOLD) | |
self.log("class_accuracy", class_accuracy, on_epoch=True, prog_bar=True, logger=True) | |
self.log("no_obj_accuracy", no_obj_accuracy, on_epoch=True, prog_bar=True, logger=True) | |
self.log("obj_accuracy", obj_accuracy, on_epoch=True, prog_bar=True, logger=True) | |
if (self.current_epoch>0) and ((self.current_epoch+1) % 6 == 0): # for every 10 epochs we are plotting | |
plot_couple_examples(self.model, self.test_loader, 0.6, 0.5, self.scaled_anchors) | |
if (self.current_epoch>0) and (self.current_epoch+1 == self.trainer.max_epochs ): #map calculation across last epoch | |
check_class_accuracy(self.model, self.test_loader, threshold=config.CONF_THRESHOLD) | |
pred_boxes, true_boxes = get_evaluation_bboxes( | |
self.test_loader, | |
self.model, | |
iou_threshold=config.NMS_IOU_THRESH, | |
anchors=config.ANCHORS, | |
threshold=config.CONF_THRESHOLD, | |
) | |
mapval = mean_average_precision( | |
pred_boxes, | |
true_boxes, | |
iou_threshold=config.MAP_IOU_THRESH, | |
box_format="midpoint", | |
num_classes=config.NUM_CLASSES, | |
) | |
print(f"MAP: {mapval.item()}") | |
self.log("MAP", mapval.item(), on_epoch=True, prog_bar=True, logger=True) | |
def configure_optimizers(self): | |
optimizer = optim.Adam( | |
self.parameters(), | |
lr=config.LEARNING_RATE, | |
weight_decay=config.WEIGHT_DECAY, | |
) | |
self.trainer.fit_loop.setup_data() | |
dataloader = self.trainer.train_dataloader | |
EPOCHS = config.NUM_EPOCHS # 40 % of number of epochs | |
lr_scheduler = OneCycleLR( | |
optimizer, | |
max_lr=1E-3, | |
steps_per_epoch=len(dataloader), | |
epochs=EPOCHS, | |
pct_start=5/EPOCHS, | |
div_factor=100, | |
three_phase=False, | |
final_div_factor=100, | |
anneal_strategy='linear' | |
) | |
scheduler = {"scheduler": lr_scheduler, "interval" : "step"} | |
return [optimizer] | |
def setup(self, stage=None): | |
self.train_loader, self.test_loader, self.train_eval_loader = get_loaders( | |
train_csv_path=self.train_csv, | |
test_csv_path=self.test_csv, | |
) | |
def train_dataloader(self): | |
return self.train_loader | |
def val_dataloader(self): | |
return self.train_eval_loader | |
def test_dataloader(self): | |
return self.test_loader | |
# if __name__ == "__main__": | |
# model = YoloVersion3() | |
# checkpoint = ModelCheckpoint(filename='last_epoch', save_last=True) | |
# lr_rate_monitor = LearningRateMonitor(logging_interval="epoch") | |
# trainer = pl.Trainer( | |
# max_epochs=config.NUM_EPOCHS, | |
# deterministic=True, | |
# logger=True, | |
# default_root_dir="/content/drive/MyDrive/sunandini/Checkpoint/", | |
# callbacks=[lr_rate_monitor], | |
# enable_model_summary=False, | |
# log_every_n_steps=1, | |
# precision="16-mixed" | |
# ) | |
# print("---- Training Started ---- Sunandini ----") | |
# trainer.fit(model) | |
# torch.save(model.state_dict(), 'YOLOv3.pth') | |
if __name__ == "__main__": | |
num_classes = 20 | |
IMAGE_SIZE = 416 | |
model = YOLOv3(num_classes=num_classes) | |
x = torch.randn((2, 3, IMAGE_SIZE, IMAGE_SIZE)) | |
out = model(x) | |
assert model(x)[0].shape == (2, 3, IMAGE_SIZE//32, IMAGE_SIZE//32, num_classes + 5) | |
assert model(x)[1].shape == (2, 3, IMAGE_SIZE//16, IMAGE_SIZE//16, num_classes + 5) | |
assert model(x)[2].shape == (2, 3, IMAGE_SIZE//8, IMAGE_SIZE//8, num_classes + 5) | |
print("Success!") |