[DL] 식물잎의 사진으로 질병 분류

박미영·2023년 6월 29일
0

📌 식물잎의 사진으로 질병 분류


📍 파일 정리

  • 압축파일 해제
!unzip -qq '/content/drive/MyDrive/ds_study/ds_study/dl_study/Plant_diseases/dataset.zip' -d './dataset'



import os

original_dataset_dir = './dataset'
classes_list = os.listdir(original_dataset_dir)

base_dir = './splitted'
os.mkdir(base_dir)

  • train, val, test 디렉토리 생성
# 데이터 정리를 위한 목록 및 폴더 생성
import shutil

train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

for cls in classes_list:
    os.mkdir(os.path.join(train_dir, cls))
    os.mkdir(os.path.join(validation_dir, cls))
    os.mkdir(os.path.join(test_dir, cls))



  • 각자 디렉토리에 이미지 데이터 분류
# 데이터 현황 확인
import math

for cls in classes_list:
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path)

    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)

    train_fnames = fnames[:train_size]
    for fname in train_fnames:  # 새로 생성된 train dir에 파일 저장
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, cls), fname)
        shutil.copyfile(src, dst)

    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size((",cls,") :", len(validation_fnames))
    for fname in validation_fnames:  # 새로 생성된 validation dir에 파일 저장
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)

    test_fnames = fnames[(train_size + validation_size):(validation_size + train_size + test_size)]
    print("Test size((",cls,") :", len(test_fnames))
    for fname in test_fnames:  # 새로 생성된 test dir에 파일 저장
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)



💡 코랩에서 비어있는 디렉토리 삭제
코랩에서 디렉토리를 우클릭 버튼으로 삭제하려고 하니 비어있지 않는 디렉토리는 삭제 할 수 없다고 했다.

# ## 비어있지 않은 디렉토리 삭제
shutil.rmtree('./splitted', ignore_errors=True)
shutil.rmtree('./dataset', ignore_errors=True)

위와 같이 코드를 이용하여 디렉토리 삭제하면 가능하다.



📍 학습 준비

# 학습 준비
import torch 
import os

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
BATCH_SIZE = 256    # 너무 크면 속도가 빨라지지만 gpu 메모리 한계에 걸림
EPOCH = 30
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder

transform_base = transforms.Compose([transforms.Resize((64, 64 )), transforms.ToTensor()])
# ImageFolder: 폴더 하나를 통으로 읽겠다 -> 폴더명을 라벨로 보겠다.
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base)
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)
from torch.utils.data import DataLoader

train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True,
                                           num_workers=4)

val_loader = torch.utils.data.DataLoader(val_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True,
                                           num_workers=4)



- 구조 잡기

# 구조 잡기
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)

        self.fc1 = nn.Linear(4096, 512)
        self.fc2 = nn.Linear(512, 33)

    def forward(self, x):

        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p=0.25, training=self.training)

        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p=0.25, training=self.training)

        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p=0.25, training=self.training)

        x = x.view(-1, 4096)
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training = self.training)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)
    
model_base = Net().to(DEVICE)
optimizer = optim.Adam(model_base.parameters(), lr=0.001)

def train(model, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()    # weight updata

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0

    # with 자원, 자원을 열면 닫지 않아도 with 구문이 끝나면 알아서 닫아줌(error대응 잘함)
    with torch.no_grad():   
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            test_loss += F.cross_entropy(output, target, reduction='sum').item()

            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)   # 맞는 개수
    return test_loss, test_accuracy



import time
import copy 

def train_baseline(model, train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict()) # 정확도 가장 높은 모델의 weight 저장

    for epoch in range(1, num_epochs + 1):
        since = time.time()
        train(model, train_loader, optimizer)
        train_loss, train_acc = evaluate(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)

        if val_acc > best_acc:  # 30번의 epoch 중 가장 val_accuracy가 좋은 weight를 저장하겠다. 
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since
        print('----------------- epoch {} -----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Complieted in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    model.load_state_dict(best_model_wts)
    return model

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)
torch.save(base, 'baseline.pt')


epoch가 끝으로 갈 수록 accuracy가 좋아지고 있다.



📍 전이학습

과적합 방지를 위해 이미지를 돌리고, 상하좌우 반전, 이미지 자르기, 색상 변경 등을 한다. 하지만 가끔 상하 좌우 반전 등을 하면 안되는 경우도 있다.
ex) 의학, 숫자(2, 5) 등등

data_transforms = {     # 과적합 방지용 -> 돌리고, 상하좌우 반전, 이미지 자르기, 색상
    'train' : transforms.Compose([transforms.Resize([64, 64]),  
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(), 
        transforms.RandomCrop(52), transforms.ToTensor(),    
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),  # RGB 색상값? 색상에 대한 학습 효과
    
    'val' : transforms.Compose([transforms.Resize([64, 64]),
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),
}

data_dir = './splitted'
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), 
                                 transform=data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], 
                                              batch_size = BATCH_SIZE,
                                              shuffle=True,
                                              num_workers=4) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

from torchvision import models

resnet = models.resnet50(pretrained=True)   # True - 학습이 완료된 weight 가져옴, False - 구조만 가져옴
num_ftrs = resnet.fc.in_features # in_features - 마지막 레이어 채널 숫자에 해당하는 것 
resnet.fc = nn.Linear(num_ftrs, 33) # 33개로 수정 
resnet = resnet.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)

from torch.optim import lr_scheduler    # epoch에 따라 running rate를 바꾸는 작업을 해줌
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)    # 7 epoch마다 running rate를 0.1씩 감소 시킴



입력에 가까운 0-5번 레이어는 학습하지 않도록 고정하고 6-9 레이어는 학습을 하도록 한다.

ct = 0
for child in resnet.children():
    ct += 1
    if ct < 6:  # 입력에 가까운 0-5번 레이어는 학습하지 않도록 고정, 6-9 레이어는 학습을 하도록
        for param in child.parameters():
            param.requires_grad = False



def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('----------------- epoch {} -----------------'.format(epoch+1))
        since = time.time()
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            runnung_corrects = 0


            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)

                optimizer.zero_grad()   # 그래디언트 zero 세팅

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                running_loss += loss.item() * inputs.size(0)
                runnung_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()
            
            epoch_loss = running_loss/dataset_sizes[phase]
            epoch_acc = runnung_corrects.double()/dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since
        print('Complieted in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {.4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)

    return model
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH)

torch.save(model_resnet50, 'resnet50.pt')




- 전이학습 평가

test를 사용하여 Transfer Learning 모델 성능 평가하기

# 과적합 방지용 -> 돌리고, 상하좌우 반전, 이미지 자르기, 색상
transform_resnet = transforms.Compose([
        transforms.Resize([64, 64]),  
        transforms.RandomCrop(52), 
        transforms.ToTensor(),    
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])

test_resNet = ImageFolder(root='./splitted/test', transform=transform_resnet)
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, 
                                              batch_size = BATCH_SIZE,
                                              shuffle=True,
                                              num_workers=4)
resnet50 = torch.load('resnet50.pt')
resnet50.eval()
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)

print('ResNet test acc:  ', test_accuracy)




"이 글은 제로베이스 데이터 취업 스쿨 강의를 듣고 작성한 내용으로 제로베이스 데이터 취업 스쿨 강의 자료 일부를 발췌한 내용이 포함되어 있습니다."

0개의 댓글

관련 채용 정보