# In[1] 导入所需工具包
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
import time
from torch.nn import functional as F
from math import floor, ceil
from import DataLoader,TensorDataset
# import torchvision.transforms as transforms
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# In[1] 设置超参数
num_epochs = 60
batch_size = 1000
learning_rate = 0.001
# In[2] 获取数据包括训练数据和测试数据
transform = torchvision.transforms.Compose([
(0.1307,), (0.3081,))
train_set = torchvision.datasets.MNIST(root='MNIST', train=True, download=True)
train_data = / 255.0
train_labels = train_set.targets
train_dataset = TensorDataset(train_data,train_labels)
train_loader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True)
test_set = torchvision.datasets.MNIST(root='MNIST', train=False, download=True)
test_data = / 255.0
test_labels = test_set.targets
test_dataset = TensorDataset(test_data,test_labels)
test_loader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True)
# In[1] 定义卷积核
def conv3x3(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=True)
# In[1] 定义残差块
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResidualBlock, self).__init__()
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
# 下采样
if self.downsample:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
# In[1] 搭建残差神经网络
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=10):
super(ResNet, self).__init__()
self.in_channels = 16
self.conv = conv3x3(1, 16) = nn.BatchNorm2d(16)
self.relu = nn.ReLU(inplace=True)
# 构建残差块,恒等映射
# in_channels == out_channels and stride = 1 所以这里我们构建残差块,没有下采样
self.layer1 = self.make_layer(block, 16, layers[0], stride=1)
# 不构建残差块,进行了下采样
# layers中记录的是数字,表示对应位置的残差块数目
self.layer2 = self.make_layer(block, 32, layers[1], 2)
# 不构建残差块,进行了下采样
self.layer3 = self.make_layer(block, 64, layers[2], 2)
self.avg_pool = nn.AvgPool2d(8)
self.fc1 = nn.Linear(3136, 128)
self.normfc12 = nn.LayerNorm((128), eps=1e-5)
self.fc2 = nn.Linear(128, num_classes)
def make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if (stride != 1) or (self.in_channels != out_channels):
downsample = nn.Sequential(
conv3x3(self.in_channels, out_channels, stride=stride),
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
# 当out_channels = 32时,in_channels也变成32了
self.in_channels = out_channels
# blocks是残差块的数目
# 残差块之后的网络结构,是out_channels->out_channels的
# 可以说,make_layer做的是输出尺寸相同的所有网络结构
# 由于输出尺寸会改变,我们用make_layers去生成一大块对应尺寸完整网络结构
for i in range(1, blocks):
layers.append(block(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv(x)
out =
out = self.relu(out)
# layer1是三块in_channels等于16的网络结构,包括三个恒等映射
out = self.layer1(out)
# layer2包括了16->32下采样,然后是32的三个恒等映射
out = self.layer2(out)
# layer3包括了32->64的下采样,然后是64的三个恒等映射
out = self.layer3(out)
# out = self.avg_pool(out)
# 全连接压缩
# out.size(0)可以看作是batch_size
out = out.view(out.size(0), -1)
out = self.fc1(out)
out = self.normfc12(out)
out = self.relu(out)
out = self.fc2(out)
return out
# In[1] 定义模型和损失函数
# [2,2,2]表示的是不同in_channels下的恒等映射数目
model = ResNet(ResidualBlock, [2, 2, 2]).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# In[1] 设置一个通过优化器更新学习率的函数
def update_lr(optimizer, lr):
for param_group in optimizer.param_groups:
param_group['lr'] = lr
# In[1] 定义测试函数
def test(model, test_loader):
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images =
labels =
outputs = model(images)
_, predicted = torch.max(, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))
# In[1] 训练模型更新学习率
total_step = len(train_loader)
curr_lr = learning_rate
for epoch in range(num_epochs):
in_epoch = time.time()
for i, (images, labels) in enumerate(train_loader):
images =
labels =
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
if (i + 1) % 100 == 0:
print("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}"
.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
test(model, test_loader)
out_epoch = time.time()
print(f"use {(out_epoch - in_epoch) // 60}min{(out_epoch - in_epoch) % 60}s")
if (epoch + 1) % 20 == 0:
curr_lr /= 3
update_lr(optimizer, curr_lr)
# In[1] 测试模型并保存
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images =
labels =
outputs = model(images)
_, predicted = torch.max(, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the model on the test images: {} %'.format(100 * correct / total)), '../resnet.ckpt')