import torch import torch.nn as nn import torch.nn.functional as F from tqdm import tqdm from torchvision import transforms import torchvision from torch.utils.data import DataLoader class SEAttention(nn.Module): def __init__(self, in_channels, reduction_ratio=16): super(SEAttention, self).__init__() # 定义全局均值池化层 self.avg_pool = nn.AdaptiveAvgPool2d(1) # 定义全连接层 self.fc = nn.Sequential( nn.Linear(in_channels, in_channels // reduction_ratio), nn.ReLU(inplace=True), nn.Linear(in_channels // reduction_ratio, in_channels), nn.Sigmoid() ) def forward(self, x): # 计算全局平均值,并通过全连接层得到每个通道的重要度 module_input = x # 用于残差计算 x = self.avg_pool(x) x = torch.flatten(x, start_dim=1) x = self.fc(x) x = x.view(-1, x.size(1), 1, 1) # 通过重要度对每个通道的特征图进行加权 x = module_input * x.expand_as(module_input) return x class BasicBlock(nn.Module): def __init__(self, in_channels, out_channels, stride=[1, 1], padding=1) -> None: super(BasicBlock, self).__init__() # 残差部分 self.layer = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride[0], padding=padding, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), # 原地替换 节省内存开销 nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride[1], padding=padding, bias=False), nn.BatchNorm2d(out_channels) ) # shortcut 部分 # 由于存在维度不一致的情况 所以分情况 self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( # 卷积核为1 进行升降维 nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], bias=False), nn.BatchNorm2d(out_channels) ) def forward(self, x): out = self.layer(x) out += self.shortcut(x) out = F.relu(out) return out class ResNet18(nn.Module): def __init__(self, BasicBlock, num_classes=10) -> None: super(ResNet18, self).__init__() self.in_channels = 64 # 第一层作为单独的 因为没有残差快 self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False), nn.BatchNorm2d(64), nn.MaxPool2d(kernel_size=3, stride=2, padding=1) ) # conv2_x self.conv2 = self._make_layer(BasicBlock, 64, [[1, 1], [1, 1]]) # self.conv2_2 = self._make_layer(BasicBlock,64,[1,1]) # conv3_x self.conv3 = self._make_layer(BasicBlock, 128, [[2, 1], [1, 1]]) # self.conv3_2 = self._make_layer(BasicBlock,128,[1,1]) # conv4_x self.conv4 = self._make_layer(BasicBlock, 256, [[2, 1], [1, 1]]) # self.conv4_2 = self._make_layer(BasicBlock,256,[1,1]) # conv5_x self.conv5 = self._make_layer(BasicBlock, 512, [[2, 1], [1, 1]]) # self.conv5_2 = self._make_layer(BasicBlock,512,[1,1]) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(512, num_classes) self.senet64 = SEAttention(64) self.senet128 = SEAttention(128) self.senet256 = SEAttention(256) self.senet512 = SEAttention(512) # 这个函数主要是用来,重复同一个残差块 def _make_layer(self, block, out_channels, strides): layers = [] for stride in strides: layers.append(block(self.in_channels, out_channels, stride)) self.in_channels = out_channels return nn.Sequential(*layers) def forward(self, x): out = self.conv1(x) out = self.conv2(out) out = self.senet64(out) out = self.conv3(out) out = self.senet128(out) out = self.conv4(out) out = self.senet256(out) out = self.conv5(out) out = self.senet512(out) out = self.avgpool(out) out = out.reshape(x.shape[0], -1) out = self.fc(out) return out transform = transforms.Compose([ transforms.Resize((512, 512)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def make_dir(path): import os dir = os.path.exists(path) if not dir: os.makedirs(path) make_dir('models') batch_size = 16 train_set = torchvision.datasets.ImageFolder(root='data/cat_vs_dog/train', transform=transform) train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4) # Batch Size定义:一次训练所选取的样本数。 Batch Size的大小影响模型的优化程度和速度。 val_dataset = torchvision.datasets.ImageFolder(root='data/cat_vs_dog/val', transform=transform) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=4) # Batch Size定义:一次训练所选取的样本数。 Batch Size的大小影响模型的优化程度和速度。 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # net = torchvision.models.resnet18(weights=True) model = ResNet18(BasicBlock) num_ftrs = model.fc.in_features model.fc = nn.Linear(num_ftrs, 2) # 将输出维度修改为2 criterion = nn.CrossEntropyLoss() model = model.to(device) optimizer = torch.optim.AdamW(lr=0.0001, params=model.parameters()) eposhs = 100 for epoch in range(eposhs): print(f'--------------------{epoch}--------------------') correct_train = 0 sum_loss_train = 0 total_correct_train = 0 for inputs, labels in tqdm(train_loader): inputs = inputs.to(device) labels = labels.to(device) output = model(inputs) loss = criterion(output, labels) sum_loss_train = sum_loss_train + loss.item() total_correct_train = total_correct_train + labels.size(0) optimizer.zero_grad() _, predicted = torch.max(output.data, 1) loss.backward() optimizer.step() correct_train = correct_train + (predicted == labels).sum().item() acc_train = correct_train / total_correct_train print('训练准确率是{:.3f}%:'.format(acc_train*100) ) model.eval() correct_val = 0 sum_loss_val = 0 total_correct_val = 0 for inputs, labels in tqdm(val_loader): inputs = inputs.to(device) labels = labels.to(device) output = model(inputs) loss = criterion(output, labels) sum_loss_val = sum_loss_val + loss.item() output = model(inputs) total_correct_val = total_correct_val + labels.size(0) optimizer.zero_grad() _, predicted = torch.max(output.data, 1) correct_val = correct_val + (predicted == labels).sum().item() acc_val = correct_val / total_correct_val print('验证准确率是{:.3f}%:'.format(acc_val*100) ) torch.save(model, 'models/{}-{:.5f}_{:.3f}%_{:.5f}_{:.3f}%.pth'.format(epoch, sum_loss_train, acc_train * 100, sum_loss_val, acc_val * 100))