File size: 7,331 Bytes
9f88559 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
from torchvision import transforms
import torchvision
from torch.utils.data import DataLoader
class SEAttention(nn.Module):
def __init__(self, in_channels, reduction_ratio=16):
super(SEAttention, self).__init__()
# 定义全局均值池化层
self.avg_pool = nn.AdaptiveAvgPool2d(1)
# 定义全连接层
self.fc = nn.Sequential(
nn.Linear(in_channels, in_channels // reduction_ratio),
nn.ReLU(inplace=True),
nn.Linear(in_channels // reduction_ratio, in_channels),
nn.Sigmoid()
)
def forward(self, x):
# 计算全局平均值,并通过全连接层得到每个通道的重要度
module_input = x # 用于残差计算
x = self.avg_pool(x)
x = torch.flatten(x, start_dim=1)
x = self.fc(x)
x = x.view(-1, x.size(1), 1, 1)
# 通过重要度对每个通道的特征图进行加权
x = module_input * x.expand_as(module_input)
return x
class BasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=[1, 1], padding=1) -> None:
super(BasicBlock, self).__init__()
# 残差部分
self.layer = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride[0], padding=padding, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True), # 原地替换 节省内存开销
nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride[1], padding=padding, bias=False),
nn.BatchNorm2d(out_channels)
)
# shortcut 部分
# 由于存在维度不一致的情况 所以分情况
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
# 卷积核为1 进行升降维
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = self.layer(x)
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNet18(nn.Module):
def __init__(self, BasicBlock, num_classes=10) -> None:
super(ResNet18, self).__init__()
self.in_channels = 64
# 第一层作为单独的 因为没有残差快
self.conv1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
nn.BatchNorm2d(64),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
# conv2_x
self.conv2 = self._make_layer(BasicBlock, 64, [[1, 1], [1, 1]])
# self.conv2_2 = self._make_layer(BasicBlock,64,[1,1])
# conv3_x
self.conv3 = self._make_layer(BasicBlock, 128, [[2, 1], [1, 1]])
# self.conv3_2 = self._make_layer(BasicBlock,128,[1,1])
# conv4_x
self.conv4 = self._make_layer(BasicBlock, 256, [[2, 1], [1, 1]])
# self.conv4_2 = self._make_layer(BasicBlock,256,[1,1])
# conv5_x
self.conv5 = self._make_layer(BasicBlock, 512, [[2, 1], [1, 1]])
# self.conv5_2 = self._make_layer(BasicBlock,512,[1,1])
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512, num_classes)
self.senet64 = SEAttention(64)
self.senet128 = SEAttention(128)
self.senet256 = SEAttention(256)
self.senet512 = SEAttention(512)
# 这个函数主要是用来,重复同一个残差块
def _make_layer(self, block, out_channels, strides):
layers = []
for stride in strides:
layers.append(block(self.in_channels, out_channels, stride))
self.in_channels = out_channels
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv1(x)
out = self.conv2(out)
out = self.senet64(out)
out = self.conv3(out)
out = self.senet128(out)
out = self.conv4(out)
out = self.senet256(out)
out = self.conv5(out)
out = self.senet512(out)
out = self.avgpool(out)
out = out.reshape(x.shape[0], -1)
out = self.fc(out)
return out
transform = transforms.Compose([
transforms.Resize((512, 512)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def make_dir(path):
import os
dir = os.path.exists(path)
if not dir:
os.makedirs(path)
make_dir('models')
batch_size = 16
train_set = torchvision.datasets.ImageFolder(root='data/cat_vs_dog/train', transform=transform)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True,
num_workers=4) # Batch Size定义:一次训练所选取的样本数。 Batch Size的大小影响模型的优化程度和速度。
val_dataset = torchvision.datasets.ImageFolder(root='data/cat_vs_dog/val', transform=transform)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True,
num_workers=4) # Batch Size定义:一次训练所选取的样本数。 Batch Size的大小影响模型的优化程度和速度。
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# net = torchvision.models.resnet18(weights=True)
model = ResNet18(BasicBlock)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2) # 将输出维度修改为2
criterion = nn.CrossEntropyLoss()
model = model.to(device)
optimizer = torch.optim.AdamW(lr=0.0001, params=model.parameters())
eposhs = 100
for epoch in range(eposhs):
print(f'--------------------{epoch}--------------------')
correct_train = 0
sum_loss_train = 0
total_correct_train = 0
for inputs, labels in tqdm(train_loader):
inputs = inputs.to(device)
labels = labels.to(device)
output = model(inputs)
loss = criterion(output, labels)
sum_loss_train = sum_loss_train + loss.item()
total_correct_train = total_correct_train + labels.size(0)
optimizer.zero_grad()
_, predicted = torch.max(output.data, 1)
loss.backward()
optimizer.step()
correct_train = correct_train + (predicted == labels).sum().item()
acc_train = correct_train / total_correct_train
print('训练准确率是{:.3f}%:'.format(acc_train*100) )
model.eval()
correct_val = 0
sum_loss_val = 0
total_correct_val = 0
for inputs, labels in tqdm(val_loader):
inputs = inputs.to(device)
labels = labels.to(device)
output = model(inputs)
loss = criterion(output, labels)
sum_loss_val = sum_loss_val + loss.item()
output = model(inputs)
total_correct_val = total_correct_val + labels.size(0)
optimizer.zero_grad()
_, predicted = torch.max(output.data, 1)
correct_val = correct_val + (predicted == labels).sum().item()
acc_val = correct_val / total_correct_val
print('验证准确率是{:.3f}%:'.format(acc_val*100) )
torch.save(model, 'models/{}-{:.5f}_{:.3f}%_{:.5f}_{:.3f}%.pth'.format(epoch, sum_loss_train, acc_train * 100, sum_loss_val, acc_val * 100))
|