이번 시간에는 Pytorch를 CNN 데이터에 적용하는 방법에 대한 코드를 알아보도록 하겠습니다.
Sckit-learn에서 제공하는 MNIST 데이터를 불러와 코딩을 진행하겠습니다.
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
from torchvision.datasets import CIFAR100,MNIST
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device('cuda:0'if USE_CUDA else 'cpu')
print('torch' , torch.__version__, '사용DEVICE : ', DEVICE)
import torchvision.transforms as tr #torchvision은 vision 쪽에서 많이들 사용함
transf = tr.Compose([tr.Resize(56), tr.ToTensor()]) #56, 56, 1로 리사이즈 후 torch tensor 형태로 변환
#1. 데이터
path = './study/torch/_data/'
train_dataset = MNIST(path, train=True, download=False, transform=transf)
test_dataset = MNIST(path, train=False, download=False, transform=transf)
print(type(train_dataset))
print(train_dataset[0][0].shape) #torch.Size([1, 56, 56]) torch에서는 batchsize, chanel, size, size
print(train_dataset[0][1]) #5
# x_train, y_train = train_dataset.data/255. , train_dataset.targets
# x_test, y_test = test_dataset.data/255. , test_dataset.targets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
bbb = iter(train_loader)
aaa = next(bbb)
print(aaa[0].shape) #torch.Size([32, 1, 56, 56]) #x의 사이즈
print(aaa[1].shape) #torch.Size([32]) #y의 size
print(len(train_loader)) #1875
#2. 모델
class CNN(nn.Module):
def __init__(self, num_features):
super(CNN, self).__init__()
self.hidden_layer1 = nn.Sequential(
nn.Conv2d(num_features, 64, kernel_size=(3,3), stride=1), # (n, 64, 54, 54)
# model.Conv2d(64, (3,3), stride=1, input_shape(56, 56, 1))
nn.ReLU(),
nn.MaxPool2d(kernel_size=(2,2)), # (n, 64, 27, 27)
nn.Dropout(0.2)
)
self.hidden_layer2 = nn.Sequential(
nn.Conv2d(64, 32, kernel_size=(3,3), stride=1), # (n, 32, 25, 25)
nn.ReLU(),
nn.MaxPool2d(kernel_size=(2,2)), # (n, 32, 12, 12)
nn.Dropout(0.2)
)
self.hidden_layer3 = nn.Sequential(
nn.Conv2d(32, 16, kernel_size=(3,3), stride=1), # (n, 16, 10, 10)
nn.ReLU(),
nn.MaxPool2d(kernel_size=(2,2)), # (n, 16, 5, 5)
nn.Dropout(0.2)
)
self.hidden_layer4 = nn.Linear(16*5*5, 16)
self.output_layer = nn.Linear(in_features=16, out_features=10)
def forward(self, x):
x = self.hidden_layer1(x)
x = self.hidden_layer2(x)
x = self.hidden_layer3(x)
x = x.view(x.shape[0], -1) # x = flatten() #케라스 버전
x = self.hidden_layer4(x)
x = self.output_layer(x)
return x
model = CNN(1).to(DEVICE) # w x h가 input이 안되었음.. 알아서 사이즈 맞춰줌
#3. 컴파일, 훈련
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
def train(model, criterion, optimizer, loader):
epoch_loss = 0
epoch_acc = 0
for x_batch, y_batch in loader:
x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE)
optimizer.zero_grad()
hypothesis = model(x_batch)
loss = criterion(hypothesis, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
y_predict = torch.argmax(hypothesis, 1)
acc = (y_predict == y_batch).float().mean()
epoch_acc += acc.item()
return epoch_loss / len(loader), epoch_acc / len(loader)
def evaluate(model, criterion, loader):
model.eval()
epoch_loss = 0
epoch_acc = 0
with torch.no_grad():
for x_batch, y_batch in loader:
x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE)
hypothesis = model(x_batch)
loss = criterion(hypothesis, y_batch)
epoch_loss += loss.item()
y_predict = torch.argmax(hypothesis,1)
acc = (y_predict == y_batch).float().mean()
epoch_acc += acc.item()
return epoch_loss / len(loader), epoch_acc / len(loader)
# loss, acc = model.evaluate(x_test, y_test)
epochs = 10
for epoch in range(1, epochs + 1):
loss, acc = train(model, criterion, optimizer, train_loader)
val_loss, val_acc = evaluate(model, criterion, test_loader)
print('epoch: {}, loss : {:.4f}, acc:{:.3f}, val_loss:{:.4f}, val_acc{:.3f}'.format(
epoch, loss, acc, val_loss, val_acc))
#4. 평가 예측
loss, acc = evaluate(model, criterion, test_loader)
print('loss : {:.4f}, acc:{:.3f}'.format(loss, acc))
# loss : 0.0462, acc:0.985
x, y 데이터를 split 해준 뒤 tensor 형태로 변환해준 뒤 아래의 단계를 거칩니다.
import torchvision.transforms as tr trans = tr.Compose([tr.Resize(56), tr.toTensor()]) # 56,56, 1 로 리사이즈 후 토치 형태로 변환
torchvision.transforms를 tr로 불러옵니다.
trans라는 변수는 tr에 있는 Compose함수를 사용합니다. tr.Resize를 통해 56, 56, 1로 리사이즈를 해주며 이후 toTensor를 통해 torch의 Tensor형태로 변환해줍니다.
path = './study/torch/_data' train_dataset = MNIST(path, train=True, download=False, transform=transf) test_dataset = MNIST(path, train=False, download=False, transform=transf)
이후 transform의 파라미터를 transf 변수로 주어 train 데이터와 test 데이터를 56,56,1로 리사이즈 후 Tensor형태로 변환합니다.
또한 train=True라는 값은 train이 필요한 train_dataset에만 적용합니다.
class CNN(nn.Module): def __init__(self, num_features): super(CNN, self).__init__() ### self.hidden_layer1 = nn.Sequential( nn.Conv2d(num_features, 64, kernel_size=(3,3), stride=1), # (n, 64, 54, 54) # model.Conv2d(64, (3,3), stride=1, input_shape(56, 56, 1)) nn.ReLU(), nn.MaxPool2d(kernel_size=(2,2)), # (n, 64, 27, 27) nn.Dropout(0.2) )
이전에 class명을 CNN으로 정의합니다. CNN에 nn.Module을 불러옵니다.
init함수는 self와 num_features를 매개변수로 사용합니다.
self.hidden_layer1는 Sequential(nn.Conv2d(num_features, 64, kernel_size=(3,3), stride=1)은 기존 keras의 model.Conv2d(64, (3,3), stride=1, input_shape(56, 56, 1)과 같습니다.
nn.ReLU(), nn.MaxPool2d(kernel_size=(2,2)), nn.Dropout(0.2)로 줍니다.
이와 같은 layer를 구성하고자하는 수만큼 구성한다.
self.hidden_layer4 = nn.Linear(16*5*5, 16) self.output_layer = nn.Linear(in_features=16, out_features=10)
layer의 말미에 input되는 shape를 모두 곱하여 Linear layer에 사용할 수 있도록 shape를 변환시킵니다.
최종 out_features는 MNIST의 class개수인 10개로 설정합니다.
def forward(self, x): x = self.hidden_layer1(x) x = self.hidden_layer2(x) x = self.hidden_layer3(x) x = x.view(x.shape[0], -1) # x = flatten() #케라스 버전 x = self.hidden_layer4(x) x = self.output_layer(x) return x
forward 함수를 통하여 미리 선언했던 layer들을 실행시켜줍니다.
기존의 형식과 다른 점은 x = x.view(x.shape[0], -1)이 keras에서 flatten()과 같은 역할을 하는 것입니다.
def evaluate(model, criterion, loader): model.eval() #평가모드 // 역전파 x, 가중치 갱신 x, 기울기 계산 할 수도 안 할 수도 # 드롭아웃, 배치노말 x total_loss = 0 for x_batch, y_batch in loader: with torch.no_grad(): y_predict = model(x_batch) loss2 = criterion(y_batch, y_predict) total_loss += loss2.item() return total_loss / len(loader) last_loss = evaluate(model, criterion, test_loader) print('최종 loss : ', last_loss)
x_test = [] y_test = [] for i, a in test_loader2: x_test.extend(i.detach().cpu().numpy()) y_test.extend(a.detach().cpu().numpy()) x_test = np.array(x_test) y_test = np.array(y_test) print(x_test.shape) x_test = torch.FloatTensor(x_test).to(DEVICE) y_test = torch.FloatTensor(y_test).to(DEVICE) y_pred = model(x_test)