!unzip -qq '/content/drive/MyDrive/ds_study/ds_study/dl_study/Plant_diseases/dataset.zip' -d './dataset'
import os
original_dataset_dir = './dataset'
classes_list = os.listdir(original_dataset_dir)
base_dir = './splitted'
os.mkdir(base_dir)
# 데이터 정리를 위한 목록 및 폴더 생성
import shutil
train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)
for cls in classes_list:
os.mkdir(os.path.join(train_dir, cls))
os.mkdir(os.path.join(validation_dir, cls))
os.mkdir(os.path.join(test_dir, cls))
# 데이터 현황 확인
import math
for cls in classes_list:
path = os.path.join(original_dataset_dir, cls)
fnames = os.listdir(path)
train_size = math.floor(len(fnames) * 0.6)
validation_size = math.floor(len(fnames) * 0.2)
test_size = math.floor(len(fnames) * 0.2)
train_fnames = fnames[:train_size]
for fname in train_fnames: # 새로 생성된 train dir에 파일 저장
src = os.path.join(path, fname)
dst = os.path.join(os.path.join(train_dir, cls), fname)
shutil.copyfile(src, dst)
validation_fnames = fnames[train_size:(validation_size + train_size)]
print("Validation size((",cls,") :", len(validation_fnames))
for fname in validation_fnames: # 새로 생성된 validation dir에 파일 저장
src = os.path.join(path, fname)
dst = os.path.join(os.path.join(validation_dir, cls), fname)
shutil.copyfile(src, dst)
test_fnames = fnames[(train_size + validation_size):(validation_size + train_size + test_size)]
print("Test size((",cls,") :", len(test_fnames))
for fname in test_fnames: # 새로 생성된 test dir에 파일 저장
src = os.path.join(path, fname)
dst = os.path.join(os.path.join(test_dir, cls), fname)
shutil.copyfile(src, dst)
💡 코랩에서 비어있는 디렉토리 삭제
코랩에서 디렉토리를 우클릭 버튼으로 삭제하려고 하니 비어있지 않는 디렉토리는 삭제 할 수 없다고 했다.
# ## 비어있지 않은 디렉토리 삭제
shutil.rmtree('./splitted', ignore_errors=True)
shutil.rmtree('./dataset', ignore_errors=True)
위와 같이 코드를 이용하여 디렉토리 삭제하면 가능하다.
# 학습 준비
import torch
import os
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
BATCH_SIZE = 256 # 너무 크면 속도가 빨라지지만 gpu 메모리 한계에 걸림
EPOCH = 30
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
transform_base = transforms.Compose([transforms.Resize((64, 64 )), transforms.ToTensor()])
# ImageFolder: 폴더 하나를 통으로 읽겠다 -> 폴더명을 라벨로 보겠다.
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base)
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)
from torch.utils.data import DataLoader
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=4)
# 구조 잡기
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
self.conv3 = nn.Conv2d(64, 64, 3, padding=1)
self.fc1 = nn.Linear(4096, 512)
self.fc2 = nn.Linear(512, 33)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.pool(x)
x = F.dropout(x, p=0.25, training=self.training)
x = self.conv2(x)
x = F.relu(x)
x = self.pool(x)
x = F.dropout(x, p=0.25, training=self.training)
x = self.conv3(x)
x = F.relu(x)
x = self.pool(x)
x = F.dropout(x, p=0.25, training=self.training)
x = x.view(-1, 4096)
x = self.fc1(x)
x = F.relu(x)
x = F.dropout(x, p=0.5, training = self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
model_base = Net().to(DEVICE)
optimizer = optim.Adam(model_base.parameters(), lr=0.001)
def train(model, train_loader, optimizer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(DEVICE), target.to(DEVICE)
optimizer.zero_grad()
output = model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step() # weight updata
def evaluate(model, test_loader):
model.eval()
test_loss = 0
correct = 0
# with 자원, 자원을 열면 닫지 않아도 with 구문이 끝나면 알아서 닫아줌(error대응 잘함)
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(DEVICE), target.to(DEVICE)
output = model(data)
test_loss += F.cross_entropy(output, target, reduction='sum').item()
pred = output.max(1, keepdim=True)[1]
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
test_accuracy = 100. * correct / len(test_loader.dataset) # 맞는 개수
return test_loss, test_accuracy
import time
import copy
def train_baseline(model, train_loader, val_loader, optimizer, num_epochs = 30):
best_acc = 0.0
best_model_wts = copy.deepcopy(model.state_dict()) # 정확도 가장 높은 모델의 weight 저장
for epoch in range(1, num_epochs + 1):
since = time.time()
train(model, train_loader, optimizer)
train_loss, train_acc = evaluate(model, train_loader)
val_loss, val_acc = evaluate(model, val_loader)
if val_acc > best_acc: # 30번의 epoch 중 가장 val_accuracy가 좋은 weight를 저장하겠다.
best_model_wts = copy.deepcopy(model.state_dict())
time_elapsed = time.time() - since
print('----------------- epoch {} -----------------'.format(epoch))
print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))
print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
print('Complieted in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
model.load_state_dict(best_model_wts)
return model
base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)
torch.save(base, 'baseline.pt')
epoch가 끝으로 갈 수록 accuracy가 좋아지고 있다.
과적합 방지를 위해 이미지를 돌리고, 상하좌우 반전, 이미지 자르기, 색상 변경 등을 한다. 하지만 가끔 상하 좌우 반전 등을 하면 안되는 경우도 있다.
ex) 의학, 숫자(2, 5) 등등
data_transforms = { # 과적합 방지용 -> 돌리고, 상하좌우 반전, 이미지 자르기, 색상
'train' : transforms.Compose([transforms.Resize([64, 64]),
transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),
transforms.RandomCrop(52), transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]), # RGB 색상값? 색상에 대한 학습 효과
'val' : transforms.Compose([transforms.Resize([64, 64]),
transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),
transforms.RandomCrop(52), transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),
}
data_dir = './splitted'
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x),
transform=data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x],
batch_size = BATCH_SIZE,
shuffle=True,
num_workers=4) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
from torchvision import models
resnet = models.resnet50(pretrained=True) # True - 학습이 완료된 weight 가져옴, False - 구조만 가져옴
num_ftrs = resnet.fc.in_features # in_features - 마지막 레이어 채널 숫자에 해당하는 것
resnet.fc = nn.Linear(num_ftrs, 33) # 33개로 수정
resnet = resnet.to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)
from torch.optim import lr_scheduler # epoch에 따라 running rate를 바꾸는 작업을 해줌
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) # 7 epoch마다 running rate를 0.1씩 감소 시킴
입력에 가까운 0-5번 레이어는 학습하지 않도록 고정하고 6-9 레이어는 학습을 하도록 한다.
ct = 0
for child in resnet.children():
ct += 1
if ct < 6: # 입력에 가까운 0-5번 레이어는 학습하지 않도록 고정, 6-9 레이어는 학습을 하도록
for param in child.parameters():
param.requires_grad = False
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
for epoch in range(num_epochs):
print('----------------- epoch {} -----------------'.format(epoch+1))
since = time.time()
for phase in ['train', 'val']:
if phase == 'train':
model.train()
else:
model.eval()
running_loss = 0.0
runnung_corrects = 0
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(DEVICE)
labels = labels.to(DEVICE)
optimizer.zero_grad() # 그래디언트 zero 세팅
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
if phase == 'train':
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
runnung_corrects += torch.sum(preds == labels.data)
if phase == 'train':
scheduler.step()
epoch_loss = running_loss/dataset_sizes[phase]
epoch_acc = runnung_corrects.double()/dataset_sizes[phase]
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
time_elapsed = time.time() - since
print('Complieted in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {.4f}'.format(best_acc))
model.load_state_dict(best_model_wts)
return model
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH)
torch.save(model_resnet50, 'resnet50.pt')
test를 사용하여 Transfer Learning 모델 성능 평가하기
# 과적합 방지용 -> 돌리고, 상하좌우 반전, 이미지 자르기, 색상
transform_resnet = transforms.Compose([
transforms.Resize([64, 64]),
transforms.RandomCrop(52),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
test_resNet = ImageFolder(root='./splitted/test', transform=transform_resnet)
test_loader_resNet = torch.utils.data.DataLoader(test_resNet,
batch_size = BATCH_SIZE,
shuffle=True,
num_workers=4)
resnet50 = torch.load('resnet50.pt')
resnet50.eval()
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)
print('ResNet test acc: ', test_accuracy)
"이 글은 제로베이스 데이터 취업 스쿨 강의를 듣고 작성한 내용으로 제로베이스 데이터 취업 스쿨 강의 자료 일부를 발췌한 내용이 포함되어 있습니다."