def init_cnn(module): #@save
"""Initialize weights for CNNs."""
if type(module) == nn.Linear or type(module) == nn.Conv2d:
nn.init.xavier_uniform_(module.weight)
class LeNet(d2l.Classifier): #@save
def __init__(self, lr=0.1, num_classes=10):
super().__init__()
self.save_hyperparameters()
self.net = nn.Sequential(
nn.LazyConv2d(6, kernel_size=5, padding=2), nn.Sigmoid(),
nn.AvgPool2d(kernel_size=2, stride=2),
nn.LazyConv2d(16, kernel_size=5), nn.Sigmoid(),
nn.AvgPool2d(kernel_size=2, stride=2),
nn.Flatten(),
nn.LazyLinear(120), nn.Sigmoid(),
nn.LazyLinear(84), nn.Sigmoid(),
nn.LazyLinear(num_classes))
@d2l.add_to_class(d2l.Classifier) #@save
def layer_summary(self, X_shape):
X = torch.randn(*X_shape)
for layer in self.net:
X = layer(X)
print(layer.__class__.__name__, 'output shape:\t', X.shape)
model = LeNet()
model.layer_summary((1, 1, 28, 28))
"""result)
Conv2d output shape: torch.Size([1, 6, 28, 28])
Sigmoid output shape: torch.Size([1, 6, 28, 28])
AvgPool2d output shape: torch.Size([1, 6, 14, 14])
Conv2d output shape: torch.Size([1, 16, 10, 10])
Sigmoid output shape: torch.Size([1, 16, 10, 10])
AvgPool2d output shape: torch.Size([1, 16, 5, 5])
Flatten output shape: torch.Size([1, 400])
Linear output shape: torch.Size([1, 120])
Sigmoid output shape: torch.Size([1, 120])
Linear output shape: torch.Size([1, 84])
Sigmoid output shape: torch.Size([1, 84])
Linear output shape: torch.Size([1, 10])
"""
trainer = d2l.Trainer(max_epochs=10, num_gpus=1)
data = d2l.FashionMNIST(batch_size=128)
model = LeNet(lr=0.1)
model.apply_init([next(iter(data.get_dataloader(True)))[0]], init_cnn)
trainer.fit(model, data)
def vgg_block(num_convs, out_channels):
layers = []
for _ in range(num_convs):
layers.append(nn.LazyConv2d(out_channels, kernel_size=3, padding=1))
layers.append(nn.ReLU())
layers.append(nn.MaxPool2d(kernel_size=2,stride=2))
return nn.Sequential(*layers)
class VGG(d2l.Classifier):
def __init__(self, arch, lr=0.1, num_classes=10):
super().__init__()
self.save_hyperparameters()
conv_blks = []
"""
arch에서 받아온 convolution수,channel수 정보를 활용하여
Convolution Block의 구조가 설계된다.
Convolution 레이어는 kernel_size=3 padding=1으로,
MaxPool 레이어는 kernel_size=2, stride=2로 구성된다
"""
for (num_convs, out_channels) in arch:
conv_blks.append(vgg_block(num_convs, out_channels))
self.net = nn.Sequential(
*conv_blks, nn.Flatten(),
"""Fully-Connected Block 영역
마지막 아웃풋은 num_classes로 받는다"""
nn.LazyLinear(4096), nn.ReLU(), nn.Dropout(0.5),
nn.LazyLinear(4096), nn.ReLU(), nn.Dropout(0.5),
nn.LazyLinear(num_classes))
self.net.apply(d2l.init_cnn)
model = VGG(arch=((1, 16), (1, 32), (2, 64), (2, 128), (2, 128)), lr=0.01)
trainer = d2l.Trainer(max_epochs=10, num_gpus=1)
data = d2l.FashionMNIST(batch_size=128, resize=(224, 224))
model.apply_init([next(iter(data.get_dataloader(True)))[0]], d2l.init_cnn)
trainer.fit(model, data)
import torch
from torch import nn
from d2l import torch as d2l
def nin_block(out_channels, kernel_size, strides, padding):
return nn.Sequential(
nn.LazyConv2d(out_channels, kernel_size, strides, padding), nn.ReLU(),
nn.LazyConv2d(out_channels, kernel_size=1), nn.ReLU(),
nn.LazyConv2d(out_channels, kernel_size=1), nn.ReLU())
class NiN(d2l.Classifier):
def __init__(self, lr=0.1, num_classes=10):
super().__init__()
self.save_hyperparameters()
self.net = nn.Sequential(
nin_block(96, kernel_size=11, strides=4, padding=0),
nn.MaxPool2d(3, stride=2),
nin_block(256, kernel_size=5, strides=1, padding=2),
nn.MaxPool2d(3, stride=2),
nin_block(384, kernel_size=3, strides=1, padding=1),
nn.MaxPool2d(3, stride=2),
nn.Dropout(0.5),
nin_block(num_classes, kernel_size=3, strides=1, padding=1),
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten())
self.net.apply(d2l.init_cnn)
NiN().layer_summary((1, 1, 224, 224))
"""result)
Sequential output shape: torch.Size([1, 96, 54, 54])
MaxPool2d output shape: torch.Size([1, 96, 26, 26])
Sequential output shape: torch.Size([1, 256, 26, 26])
MaxPool2d output shape: torch.Size([1, 256, 12, 12])
Sequential output shape: torch.Size([1, 384, 12, 12])
MaxPool2d output shape: torch.Size([1, 384, 5, 5])
Dropout output shape: torch.Size([1, 384, 5, 5])
Sequential output shape: torch.Size([1, 10, 5, 5])
AdaptiveAvgPool2d output shape: torch.Size([1, 10, 1, 1])
Flatten output shape: torch.Size([1, 10])
"""
model = NiN(lr=0.05)
trainer = d2l.Trainer(max_epochs=10, num_gpus=1)
data = d2l.FashionMNIST(batch_size=128, resize=(224, 224))
model.apply_init([next(iter(data.get_dataloader(True)))[0]], d2l.init_cnn)
trainer.fit(model, data)
- ![](https://img1.daumcdn.net/thumb/R1280x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FBrb0r%2FbtrcM1G9gYV%2FKp0MqbXQ2HZ9haKx5LYIpk%2Fimg.png)
- 이렇게 보이듯이 $1 \times 1$ 합성곱은 연산량을 줄이는 효과를 갖고 있다. GoogLeNet에선 대략 절반 이상의 파라미터 수를 줄이는 연산상의 이득을 제공하였다
- 이렇게 $1 \times 1$ 합성곱을 통해 채널수를 줄인뒤, 다시 채널을 늘리는 형태를 Bottleneck Layer라고 부른다
- [ref.](https://bigdata-analyst.tistory.com/290?category=953758)
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
class Inception(nn.Module):
# `c1`--`c4` are the number of output channels for each branch
def __init__(self, c1, c2, c3, c4, **kwargs):
super(Inception, self).__init__(**kwargs)
# Branch 1
self.b1_1 = nn.LazyConv2d(c1, kernel_size=1)
# Branch 2
self.b2_1 = nn.LazyConv2d(c2[0], kernel_size=1)
self.b2_2 = nn.LazyConv2d(c2[1], kernel_size=3, padding=1)
# Branch 3
self.b3_1 = nn.LazyConv2d(c3[0], kernel_size=1)
self.b3_2 = nn.LazyConv2d(c3[1], kernel_size=5, padding=2)
# Branch 4
self.b4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
self.b4_2 = nn.LazyConv2d(c4, kernel_size=1)
def forward(self, x):
b1 = F.relu(self.b1_1(x))
b2 = F.relu(self.b2_2(F.relu(self.b2_1(x))))
b3 = F.relu(self.b3_2(F.relu(self.b3_1(x))))
b4 = F.relu(self.b4_2(self.b4_1(x)))
return torch.cat((b1, b2, b3, b4), dim=1)
class GoogleNet(d2l.Classifier):
"""1.channel=64, padding=3 stride=2 kernel_size=7 합성곱층
2.padding=1 stride=2 kernel_size=3 MaxPool층"""
def b1(self):
return nn.Sequential(
nn.LazyConv2d(64, kernel_size=7, stride=2, padding=3),
nn.ReLU(), nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
@d2l.add_to_class(GoogleNet)
def b2(self):
"""4.channel=64 kernel_size=1 합성곱층
5.channel=192 padding=1 kernel_size=3 합성곱층
6.padding=1 stride=2 kernel_size=3 MaxPool층"""
return nn.Sequential(
nn.LazyConv2d(64, kernel_size=1), nn.ReLU(),
nn.LazyConv2d(192, kernel_size=3, padding=1), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
@d2l.add_to_class(GoogleNet)
def b3(self):
"""7.인셉션 레이어 2개
8.padding=1 stride=2 kernel_size=3 MaxPool층"""
return nn.Sequential(Inception(64, (96, 128), (16, 32), 32),
Inception(128, (128, 192), (32, 96), 64),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
@d2l.add_to_class(GoogleNet)
def b4(self):
"""9.인셉션 레이어 5개
10.padding=1 stride=2 kernel_size=3 MaxPool층"""
return nn.Sequential(Inception(192, (96, 208), (16, 48), 64),
Inception(160, (112, 224), (24, 64), 64),
Inception(128, (128, 256), (24, 64), 64),
Inception(112, (144, 288), (32, 64), 64),
Inception(256, (160, 320), (32, 128), 128),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
@d2l.add_to_class(GoogleNet)
def b5(self):
"""11.인셉션 레이어 2개
12.AdaptiveAvgPool 층 후 Flatten"""
return nn.Sequential(Inception(256, (160, 320), (32, 128), 128),
Inception(384, (192, 384), (48, 128), 128),
nn.AdaptiveAvgPool2d((1,1)), nn.Flatten())
@d2l.add_to_class(GoogleNet)
"""12.num_class 만큼의 출력노드를 갖는 Fully-Connected 층"""
def __init__(self, lr=0.1, num_classes=10):
super(GoogleNet, self).__init__()
self.save_hyperparameters()
self.net = nn.Sequential(self.b1(), self.b2(), self.b3(),self.b4(), self.b5(), nn.LazyLinear(num_classes))
self.net.apply(d2l.init_cnn)
후술될 layer normalization이 각 관측치 단위로 정규화를 하는데 반해 batch normalization은 각 채널 단위로 이루어지는 것이다 ref
벡터를 정규화하는 것은 함수의 복잡도를 제한하는 것과 더불어 파라미터를 비슷한 스케일로 만들어주는 효과를 갖는다
MLP 또는 CNN에서 중간층의 변수들은 다양한 범위의 값을 가질 수 있다. 배치 정규화는 이러한 배치의 분포하는 경향성이 네트워크 수렴에 방해가 될수 있다는 가정에서 만들어졌다
- 만약 이전 레이어에 비해 현재 레이어의 변수값이 100배정도 크다고 한다면, 학습률에 대하여 적절한 조정이 필요할 것이라고 직관적으로 알 수 있을 것이다.
- Adam이나 AdaGrad, Yogi등은 이러한 연유로 제작된 optimizer이다. 그러나 대안책으로 정규화를 통해서도 이러한 문제를 예방할 수 있을 것이다
네트워크가 더 복잡한 구조를 가질수록 오버피팅에 빠질 위험성은 커진다. 그렇기 때문에 정규화는 더욱 중요해진다
- 정규화를 위한 기본적인 테크닉은 노이즈 삽입이다.
- 또 다른 방법으론 dropout이 있다
배치 정규화는 각각의 레이어에 대하여 적용된다. 트레이닝의 반복iteration동안
1. 맨 처음 입력값에 대하여 정규화를 시행한다. 미니배치를 통해 평균과 표준편차를 추정하고, 각 입력값에 평균을 빼고 표준편차로 나눠주는 것이다.
2. 다음 scale coefficient와 offset을 적용하여 잃어버린 자유도를 회복한다
-
- : 표본 평균
- : 미니배치 의 표본 표준편차. 이때 division by zero 에러를 막기 위해 아주 작은 값 를 추가한 것이다.
배치 정규화는 미니배치의 사이즈가 1일 경우 계산이 불가능하다. 배치 정규화가 안정적이고 효과적이기 위해선 각 배치의 크기가 충분히 커야한다. 보통의 경우 50~100개의 미니배치가 좋은 성능을 보여주었다. 너무 작은 경우 유용한 시그널이 높은 분산값에 의해 손실되고, 너무 큰 배치에 경우 지나치게 안정적인 추정치로 인해 일반화가 잘 이루어지지 않았다
학습과정, 학습모드training mode에서는 샘플링하는 대상이 미니배치이므로 같은 입력값에 대하여 다르게 분류되는 현상이 발생할 수 있지만, 학습 종료후 예측모드prediction mode 에서는 샘플링 대상이 데이터셋 전체이므로 이러한 문제가 발생하지 않는다
Training set을 모집단으로 보자. 앞으로 prediction을 시행하고자 한다면 Training set의 모평균, 모표준편차를 필요로 할 것이다. 그러한 계산을 하기 위한 공식들은 존재한다. 하지만 연산을 간단히 하기 위하여 EMA(Exponential moving average/Exponentially weighted average)라는 방식을 활용한다
EMA는 다음과 같이 계산된다
-
- 이때 는 0과 1사이 하이퍼파라미터, 는 새로들어온 데이터, 현재 상태를 나타내는 변수로 볼 수 있다
- 인것을 고려하면
- 인데 이를 보면
- 의 계수는 로 현재상태 를 계산할 때 만큼의 이전상태 는 만큼 가중치를 갖는다는 것을 알 수 있다. 이는 오래된 데이터일수록 기하급수적으로 영향력이 감소하게 되는 결과를 낳는다. 그러한 이유로 Exponentially 라는 뜻이 붙게 되었다
- 활용 : 이 값은 근사적으로 단계의 데이터만을 활용하여 평균을 취한것과 같다고 알려져있다. 예컨데 이면 위 식의 값은 50으로, 대략 50일간의 데이터를 갖고 가중평균을 구한것과 유사해지는 것이다.
현재 알고리즘에선 다음과 같이 작성되었다
- moving_mean
:
- moving_var
:
import torch
from torch import nn
from d2l import torch as d2l
def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum):
"""
torch.is_grad_enabled()는 gradient 계산이 가능한 상태인지 T or F로
반환한다.만약 False라면 prediction mode로 간주한다
training 과정속에서 moving_mean과 moving_var를 학습시키는데 이 결과는
prediction에서 변수들을 정규화하는데 사용된다
"""
if not torch.is_grad_enabled():
X_hat = (X - moving_mean) / torch.sqrt(moving_var + eps)
else:
assert len(X.shape) in (2, 4)
"""
len(X.shape)==2라면 fully-connected layer로 간주하고
아닐경우 Convolutional layer로 간주한다
이때 FC layer의 경우 평균값과 분산을 feature 축(axis=0)에서 계산한다
convolutional layer일 경우 channel 축(axis=1)에서 계산한다
"""
if len(X.shape) == 2:
mean = X.mean(dim=0)
var = ((X - mean) ** 2).mean(dim=0)
else:
mean = X.mean(dim=(0, 2, 3), keepdim=True)
var = ((X - mean) ** 2).mean(dim=(0, 2, 3), keepdim=True)
"""training 모드라면, 현재 저장된 mean과 var를 통해 정규화를 진행한다
. 그리고 moving_average 와 moving_var를 업데이트하는데 활용한다"""
X_hat = (X - mean) / torch.sqrt(var + eps)
moving_mean = (1.0 - momentum) * moving_mean + momentum * mean
moving_var = (1.0 - momentum) * moving_var + momentum * var
"""각 피처or채널마다 배치 정규화가 된다.
이는 tensor-contradiction을 통해 구현된다"""
Y = gamma * X_hat + beta # Scale and shift
return Y, moving_mean.data, moving_var.data
class BatchNorm(nn.Module):
"""
num_features: FC-출력층의 출력 수 또는 Convolutional 층의 출력 채널 수
를 의미한다
num_dims : 2차원인경우 FC층, 4차원인경우 Convolutional층을 제작한다
"""
def __init__(self, num_features, num_dims):
super().__init__()
if num_dims == 2:
shape = (1, num_features)
else:
shape = (1, num_features, 1, 1)
"""
scale 파라미터와 shift 파라미터는 각각 1과 0 으로 초기화된다
moving_mean과 moving_var는 각각 0과 1로 초기화된다
"""
self.gamma = nn.Parameter(torch.ones(shape))
self.beta = nn.Parameter(torch.zeros(shape))
self.moving_mean = torch.zeros(shape)
self.moving_var = torch.ones(shape)
def forward(self, X):
"""
my_tensor=my_tensor.to(device)를 한다면
GPU에 my_tensor의 복사본이 저장되게 된다.
따라서 'X'란 텐서가 메인 메모리에 올라와있지 않을 경우, 메모리에
올리는 작업을 시행한다.
메모리에 올라왔다면, 앞에서 정의한 batch_norm 함수를 활용하여
해당 레이어에 대한 배치 정규화를 시행한다.
"""
if self.moving_mean.device != X.device:
self.moving_mean = self.moving_mean.to(X.device)
self.moving_var = self.moving_var.to(X.device)
# Save the updated `moving_mean` and `moving_var`
Y, self.moving_mean, self.moving_var = batch_norm(
X, self.gamma, self.beta, self.moving_mean,
self.moving_var, eps=1e-5, momentum=0.1)
return Y
class BNLeNetScratch(d2l.Classifier):
def __init__(self, lr=0.1, num_classes=10):
super().__init__()
self.save_hyperparameters()
self.net = nn.Sequential(
nn.LazyConv2d(6, kernel_size=5), BatchNorm(6, num_dims=4),
nn.Sigmoid(), nn.AvgPool2d(kernel_size=2, stride=2),
nn.LazyConv2d(16, kernel_size=5), BatchNorm(16,
num_dims=4),
nn.Sigmoid(), nn.AvgPool2d(kernel_size=2, stride=2),
nn.Flatten(),
nn.LazyLinear(120),BatchNorm(120, num_dims=2),
nn.Sigmoid(),
nn.LazyLinear(84),
BatchNorm(84, num_dims=2), nn.Sigmoid(),
nn.LazyLinear(num_classes))
- 이는 매우 깊은 컴퓨터 비전 모델을 설계할때 깊이 고려되었던 문제이다. ResNet의 아이디어는 모델에 새로운 레이어를 추가할때마다, 항등함수를 추가하는 것이다
- 이러한 고려는 residual block이란 아이디어로 구체화 되었다.
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
class Residual(nn.Module): #@save
"""Residual Block 구조
1. 출력 채널값=num_channels ,kerrnel_size=3, padding=1을 갖는
합성곱 레이어
2. 출력 채널값 동일 ,kerrnel_size=3, padding=1을 갖는
합성곱 레이어
(optional)3. 출력 채널값 동일, stride=strides 인 1x1 conv로 shape를
바꾸기 위해서 사용되는 레이어 """
def __init__(self, num_channels, use_1x1conv=False, strides=1):
super().__init__()
self.conv1 = nn.LazyConv2d(num_channels, kernel_size=3, padding=1,
stride=strides)
self.conv2 = nn.LazyConv2d(num_channels, kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.LazyConv2d(num_channels, kernel_size=1,
stride=strides)
else:
self.conv3 = None
self.bn1 = nn.LazyBatchNorm2d()
self.bn2 = nn.LazyBatchNorm2d()
def forward(self, X):
""" 전방향 전파 학습과정
1. conv1 -> 배치정규화 -> relu 활성함수
2. conv2 -> 배치정규화 -> relu 활성함수
3. conv3가 있으면 conv3 수행
4. 2번째 레이어 activation map 값에 입력값 X를 더하여
(conv3가 있었다면 아래의 식으로 바뀜)반환함
"""
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
Y += X
return F.relu(Y)
class ResNet(d2l.Classifier):
"""
1. 맨 앞에서 2개의 레이어. 7x7 Conv-> BN -> 3x3 MaxPoo로 구성되어 있다
"""
def b1(self):
return nn.Sequential(
nn.LazyConv2d(64, kernel_size=7, stride=2, padding=3),
nn.LazyBatchNorm2d(), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
@d2l.add_to_class(ResNet)
"""
Residual Block 제작 함수.
첫번째 레이어에서는 # of channels=num_channels 인데, max-pooling layer가
stride=2인 상태로 적용된 상태의 값을 받아 높이와 폭을 더 줄이지 않기 위해
1x1block을 사용하지 않는다. 이를 identity block이라 한다
그 다음 레이어에서 부터는 use_1x1conv=True, strides=2로서 크기가
floor((input+1)/2) 이 된다. 이를 convolution block이라 한다
"""
def block(self, num_residuals, num_channels, first_block=False):
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Residual(num_channels, use_1x1conv=True, strides=2))
else:
blk.append(Residual(num_channels))
return nn.Sequential(*blk)
@d2l.add_to_class(ResNet)
def __init__(self, arch, lr=0.1, num_classes=10):
super(ResNet, self).__init__()
self.save_hyperparameters()
self.net = nn.Sequential(self.b1())
for i, b in enumerate(arch):
"""
add_module(name, module) : 현재 모듈에 name이란 새로운 모듈을 추가한다
for i,b in enumerate(arch) 를 하면
0 (2, 64) 1 (2, 128) 2 (2, 256) 3 (2, 512) 로 출력되는데
이것이 각각 b2,b3,b4,b5 라 명명하기 위해 f'b{i+2}'를 쓴듯
여기서 튜플 첫번째 원소값은 residual layer수를, 두번째 원소값은 채널수를 의미한다
"""
self.net.add_module(f'b{i+2}', self.block(*b, first_block=(i==0)))
self.net.add_module('last', nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)), nn.Flatten(),
nn.LazyLinear(num_classes)))
self.net.apply(d2l.init_cnn)
class ResNet18(ResNet):
def __init__(self, lr=0.1, num_classes=10):
"""((2, 64), (2, 128), (2, 256), (2, 512))가 architecture
구조를 의미함 """
super().__init__(((2, 64), (2, 128), (2, 256), (2, 512)),
lr, num_classes)
ResNet18().layer_summary((1, 1, 96, 96))
""" result)
Sequential output shape: torch.Size([1, 64, 24, 24])
Sequential output shape: torch.Size([1, 64, 24, 24])
Sequential output shape: torch.Size([1, 128, 12, 12])
Sequential output shape: torch.Size([1, 256, 6, 6])
Sequential output shape: torch.Size([1, 512, 3, 3])
Sequential output shape: torch.Size([1, 10])
"""