[AI] 알파카 데이터 학습

Bora Kwon·2022년 6월 24일
0

data split 코드

import os
import glob
import numpy as np
import cv2
from sklearn.model_selection import train_test_split

data_path = "./DATASET/TRAIN"
data_dir = os.listdir(data_path)

print("data_path: ", data_dir)

for folder in data_dir:
    if folder not in ".DS_Store":

        if folder == "R":
            file_list_R = glob.glob(os.path.join(data_path, folder, "*.jpg"))

        elif folder == "O":
            file_list_O = glob.glob(os.path.join(data_path, folder, "*.jpg"))


r_data_size = len(file_list_R)
o_data_size = len(file_list_O)

print("r_data_size: ", r_data_size, "o_data_size: ", o_data_size)

r_indices = list(range(r_data_size))
o_indices = list(range(o_data_size))


r_data_split_number = 0.04
o_data_split_number = 0.032

r_split = int(np.floor(r_data_split_number * r_data_size))
o_split = int(np.floor(o_data_split_number * o_data_size))
print(r_split)
print(o_split)

r_data_indices, o_data_indices = r_indices[:r_split+1], o_indices[:o_split+1]

r_data = []
for i in r_data_indices:
    path = file_list_R[i]
    r_data.append(path)

o_data = []
for i in o_data_indices:
    path = file_list_O[i]
    o_data.append(path)

all_data = r_data + o_data
x_train, x_valid = train_test_split(
    all_data, test_size=0.2, shuffle=False, random_state=777)

print("x_train size >> ", len(x_train))  # train data
print("y_train size >> ", len(x_valid))  # val data

train 코드

from cProfile import label
import glob
import os
import torch
import torchvision.transforms as transforms
import torchvision.models as models
import torch.nn as nn
import numpy as np

from torch import optim, save
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import data_split

CLASS_NAME = {"O": 0, "R": 1}
device = torch.device("mps")
"""
CPU 사용자 
device = torch.device("cpu")

CUDA 사용자 
device = torch.device("cuda")
"""

"""Customdataset 구성"""


class Customdataset(Dataset):
    def __init__(self, data_path, transform=None):
        """정의"""
        self.data_path = data_path
        self.transform = transform

    def __getitem__(self, index):
        """데이터 경로에서 하나씩 데이터 가져오기"""
        path = self.data_path[index]
        """데이터 경로에서 폴더 명 가지고와서 라벨 변경"""
        path_split = path.split("/")
        label_temp = path_split[3]
        label = CLASS_NAME[label_temp]
        """이미지 오픈"""
        img = Image.open(path).convert("RGB")
        """Augmentation"""
        if self.transform is not None:
            img = self.transform(img)

        """라벨 이미지 리턴"""
        return img, label

    def __len__(self):
        return len(self.data_path)


"""augmentation 구성"""
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.4),
    transforms.RandomVerticalFlip(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.2, 0.2, 0.2])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.2, 0.2, 0.2])
])

"""데이터 셋 구성"""
train_data = Customdataset(
    data_path=data_split.x_train, transform=train_transform)
val_data = Customdataset(data_path=data_split.x_valid, transform=val_transform)

"""라벨 별 이미지 갯수"""
# train_one_label_cnt = 0  # O
# train_two_label_cnt = 0  # R

# for i in train_data:
#     image, labels = i
#     if labels == 0:
#         train_one_label_cnt += 1
#     elif labels == 1:
#         train_two_label_cnt += 1

# print(f"Train 라벨갯수  >> [{train_one_label_cnt}/{train_two_label_cnt}]")
"""데이터 로더 구성"""
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)


def train(num_epoch, model, train_loader, val_loader, criterion,
          optimizer, save_dir, val_every, device):
    print("String train... !! ")
    best_loss = 9999
    for epoch in range(num_epoch):
        for i, (image, label) in enumerate(train_loader):
            image, label = image.to(device), label.to(device)
            """모델에 image & data 넣기 """
            output = model(image)
            """loss function"""
            loss = criterion(output, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _, argmax = torch.max(output, 1)
            acc = (label == argmax).float().mean()
            lr = optimizer.param_groups[0]['lr']
            print(
                "Epoch [{}/{}], lr [{}], Step [{}/{}], Loss : {:.4f}, accuracy : {:.2f}%"
                .format(
                    epoch + 1, num_epoch, lr, i +
                    1, len(train_loader), loss.item(),  acc.item() * 100
                ))

            """val loop call"""
            if (epoch + 1) % val_every == 0:
                avg_loss = validation(
                    epoch + 1, model, val_loader, criterion, device)

                if avg_loss < best_loss:
                    print("Best prediction at epoch {}".format(epoch + 1))
                    print("save model in", save_dir)
                    best_loss = avg_loss
                    save_model(model, save_dir)

    save_model(model, save_dir, file_name="last.pt")


def validation(epoch, model, val_loader, criterion, device):
    print("Start validation at epoch {}".format(epoch))
    model.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        total_loss = 0
        cnt = 0

        for i, (image, label) in enumerate(val_loader):
            image, label = image.to(device), label.to(device)
            """모델에 데이터를 넣어서 예측 값을 뽑기 """
            output = model(image)
            """loss function"""
            loss = criterion(output, label)
            total += image.size(0)
            _, argmax = torch.max(output, 1)
            correct += (label == argmax).sum().item()
            total_loss += loss
            cnt += 1

        """avg loss """
        avg_loss = total_loss / cnt
        print("Validation # {} Acc : {:.2f}% Average Loss : {:.4f}%".format(
            epoch, correct / total * 100, avg_loss
        ))

    model.train()

    return avg_loss


def save_model(model, save_dir, file_name="best.pt"):
    output = os.path.join(save_dir, file_name)
    torch.save(model.state_dict(), output)


def get_model(n_classes):
    model_ft = models.mobilenet_v2(pretrained=True)
    num_ft = model_ft.last_channel
    model_ft.classifier[1] = nn.Linear(num_ft, n_classes)

    return model_ft


""" 하이퍼 파라메타 지정 """
num_epoch = 20
val_every = 5
net = get_model(2)
net = net.to(device)


criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(
    filter(lambda p: p.requires_grad, net.parameters()), lr=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer, step_size=4, gamma=0.1)

save_weights_dir = "./weights"
os.makedirs(save_weights_dir, exist_ok=True)

if __name__ == "__main__":
    train(num_epoch, net, train_loader, val_loader,
          criterion, optimizer, save_weights_dir, val_every, device)
profile
Software Developer

0개의 댓글