transform = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
train_transforms = transforms.Compose([
#transforms.Resize(224),
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(train_mean, train_std),
])
test_transforms = transforms.Compose([
transforms.ToTensor(),
#transforms.Resize(224),
transforms.Normalize(train_mean, train_std)
])
Training dataset
4로 padding한 이후에 32의 크기로 random cropping을 하고 horizontal flip을 랜덤하게 수행. 그 이후에 이미지의 평균과 표준편차로 standardization 해준다.
Test dataset은 standardization만 수행
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1):
super(BasicBlock, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(out_channels)
)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != self.expansion * out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion * out_channels),
)
def forward(self, x):
out = self.features(x)
out += self.shortcut(x)
out = torch.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, in_channels, zip_channels, stride=1):
super(Bottleneck, self).__init__()
out_channels = self.expansion * zip_channels
self.features = nn.Sequential(
nn.Conv2d(in_channels, zip_channels, kernel_size=1, bias=False),
nn.BatchNorm2d(zip_channels),
nn.ReLU(inplace=True),
nn.Conv2d(zip_channels, zip_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(zip_channels),
nn.ReLU(inplace=True),
nn.Conv2d(zip_channels, out_channels, kernel_size=1, bias=False),
nn.BatchNorm2d(out_channels)
)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = self.features(x)
out += self.shortcut(x)
out = torch.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_channels = 64
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True)
)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) # 512 체크?
self.avg_pool = nn.AvgPool2d(kernel_size=4) #kernal size=4?
self.classifer = nn.Linear(512 * block.expansion, num_classes)
def _make_layer(self, block, out_channels, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(self.in_channels, out_channels, stride))
self.in_channels = out_channels * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = self.features(x)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = self.avg_pool(out)
out = out.view(out.size(0), -1)
out = self.classifer(out)
return out
def ResNet18():
return ResNet(BasicBlock, [2,2,2,2])
def ResNet34():
return ResNet(BasicBlock, [3,4,6,3])
def ResNet50():
return ResNet(Bottleneck, [3,4,6,3])
def ResNet101():
return ResNet(Bottleneck, [3,4,23,3])
def ResNet152():
return ResNet(Bottleneck, [3,8,36,3])
net = ResNet34().to(device) #Resnet34
print(net)
if device == 'cuda':
net = nn.DataParallel(net)
#
torch.backends.cudnn.benchmark = True
lr = 0.1
momentum = 0.9
weight_decay = 0.0001
lr = 0.1
momentum = 0.9
weight_decay = 0.0005
overfitting을 억제하고자 weight decay를 높여보았다.
for epoch in range(start_epoch, 100): #epoch 50
loss = train(epoch)
print('Total loss: %.6f' % loss)
start_epoch = epoch
lr_scheduler.step()
for epoch in range(start_epoch, 100): #epoch 200
loss = train(epoch)
print('Total loss: %.6f' % loss)
start_epoch = epoch
lr_scheduler.step()
epoch
50->200
for epoch in range(start_epoch, 50):
loss = train(epoch)
print('Total loss: %.6f' % loss)
start_epoch = epoch
xdim=[28,28,1]
if epoch < 0.5*50:
lr = lr
elif epoch < 0.75*50:
lr = lr/2.0
else:
lr = lr/10.0
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,
threshold=0.1, patience=3, verbose=True)
성능이 향상이 없을 때 learning rate를 감소시킨다.
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
milestones=decay_epoch, gamma=0.1)
learning rate를 감소시킬 epoch을 지정
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
optimizer = optim.Adam(net.parameters(), lr=0.1)
성능이 좋다고 알려진 Adam optimizer로 변경
# Training
def train(epoch):
print('\nEpoch: %d' % (epoch + 1))
net.train()
train_loss = 0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
if batch_idx % 100 == 99: # print every 100 mini-batches
print('[%d, %5d] loss: %.5f | Acc: %.3f%% (%d/%d)' %
(epoch + 1, batch_idx + 1, train_loss / 2000, 100.*correct/total, correct, total))
train_loss = 0.0
total = 0
correct = 0
load_model = True
start_epoch = 0
print('start_epoch: %s' % start_epoch)
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the network on the 10000 test images: %d %%' % (
100 * correct / total))
training 진행 후 test data로 acc 측정
for epoch in range(num_epoch):
print(f"====== { epoch+1} epoch of { num_epoch } ======")
model.train()
#scheduler2.step()
lr_scheduler(optimizer, epoch)
train_loss = 0
valid_loss = 0
correct = 0
total_cnt = 0
# Train Phase
for step, batch in enumerate(train_loader):
# input and target
batch[0], batch[1] = batch[0].to(device), batch[1].to(device)
optimizer.zero_grad()
logits = model(batch[0])
loss = loss_fn(logits, batch[1])
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predict = logits.max(1)
total_cnt += batch[1].size(0)
correct += predict.eq(batch[1]).sum().item()
if step % 100 == 0 and step != 0:
print(f"\n====== { step } Step of { len(train_loader) } ======")
print(f"Train Acc : { correct / total_cnt }")
print(f"Train Loss : { loss.item() / batch[1].size(0) }")
correct = 0
total_cnt = 0
# Test Phase
with torch.no_grad():
model.eval()
for step, batch in enumerate(test_loader):
# input and target
batch[0], batch[1] = batch[0].to(device), batch[1].to(device)
total_cnt += batch[1].size(0)
logits = model(batch[0])
valid_loss += loss_fn(logits, batch[1])
_, predict = logits.max(1)
correct += predict.eq(batch[1]).sum().item()
valid_acc = correct / total_cnt
print(f"\nValid Acc : { valid_acc }")
print(f"Valid Loss : { valid_loss / total_cnt }")
if(valid_acc > best_acc):
best_acc = valid_acc
torch.save(model, model_name)
print("Model Saved!")
if epoch % 1 == 0: # 매 10 iteration마다 업데이트
# writer.add_scalar('train_loss', loss.item() / batch[1].size(0), epoch)
writer.add_scalar('test_loss', valid_loss / total_cnt, epoch)
writer.add_scalar('train_acc', correct / total_cnt, epoch)
writer.add_scalar('test_acc', valid_acc, epoch)
writer.close
best acc 일때 model save