Developing a Deep Learning Model Trainer Using PyTorch

최혜주·2024년 6월 7일

1. Introduction

딥러닝 모델을 활용한 문제 해결을 위해서는 해당하는 기능을 수행할 수 있도록 구성된 학습기가 필요하다. 따라서 지도학습과 딥러닝의 관계를 정리하며 모델 학습에 대한 분류를 해봤고, 지도학습을 위한 학습기가 동작하기 위해 필요한 구성 요소들을 살펴봤다. 또, 일부 필수 구성 요소들에 대해서는 프로젝트 때 사용할 PyTorch 프레임워크를 활용한 코드도 작성하였다.

2. 모델 학습 분류

2-1. 지도학습 (Supervised Learning)

지도학습은 간단히 말해 정답을 알려주며 학습시키는 것으로 분류 (classification: kNN, Navie Bayes, Support Vector, Machine Decision)회귀 (regression: Linear Regression, Locally Weighted Linear, Ridge, Lasso)가 이에 해당한다.

2-2. 비지도학습

비지도학습은 label이 없으며, 라벨링 되어있지 않은 데이터로부터 패턴이나 형태를 찾아 비슷한 데이터들을 군집화하는 것으로 Clustering, Dimensionality Reduction, Hidden Markov Model등이 있다.

2-3. 강화학습

강화학습은 분류할 수 있는 데이터가 있는 것도 아니고, 데이터가 있어도 정답이 따로 정해져 있지 않으며, 자신이 한 행동에 대해 보상(reward)을 받으며 학습하는 것이다. 알파고 또한 이 강화학습 모델로 만들어졌다.

2-4. 딥러닝과의 관계 정리

딥러닝은 기계를 학습시켜 AI를 만드는 것이라는 점에서는 머신러닝과 동일하지만, 보다 더 세부적이고 깊은 심층학습이 이루어진다. 이때 신경망(Neural Network)을 Deep하게 쌓아서 인공지능(인공신경망)을 만든다.

따라서, 딥러닝은 머신러닝의 한 부분으로 볼 수 있으며, 머신러닝이 포함하는 여러 방법 중 하나가 딥러닝인 것이다.

딥러닝도 머신러닝의 지도학습을 사용할 수 있다. 즉, 딥러닝 모델도 입력 데이터와 해당하는 정답을 이용해 학습할 수 있다. 이 경우, 딥러닝 모델은 매우 복잡한 패턴을 학습할 수 있어서(feature들을 사람이 직접 설계해야했던 전통적인 머신러닝 기법들과 달리 자동으로 특징을 학습할 수 있으므로) 이미지 인식, 음성 인식, 자연어 처리 등에서 뛰어난 성능을 보이게 된다.

3. Model Trainer - 필수 구성 요소들

진행하는 프로젝트와 관련(MIT-BIH, PTB-DB database)하여 전체 코드를 블로그에 기재할 수는 없어서 필수 구성 요소들 중 일부에 대한 코드들만 아래 항목에 작성했다.

3-1. DataSet, data_loader

: 학습 & 평가를 위한 (입력, 정답) 쌍으로 구성된 학습용 dataset

# DATASET

import os
import csv
import pywt
import numpy as np
import pandas as pd
from scipy import stats
import torch
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
from sklearn.utils import resample

def denoise(data, wavelet='sym4', threshold=0.04):
    w = pywt.Wavelet(wavelet)
    maxlev = pywt.dwt_max_level(len(data), w.dec_len)
    coeffs = pywt.wavedec(data, wavelet, level=maxlev)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
    return pywt.waverec(coeffs, wavelet)

class ECGDataset(Dataset):
    def __init__(self, data_dir, window_size=93, max_count=10000):
        self.data_dir = data_dir
        self.window_size = window_size
        self.max_count = max_count
        self.classes = ['N', 'L', 'R', 'A', 'V']
        self.X, self.y = self.load_data()
        self.X, self.y = self.balance_classes(self.X, self.y)

    def load_data(self):
        X, y = [], []
        records, annotations = self.get_files()
        for r in range(len(records)):
            signals = self.load_signals(records[r])
            annotations_data = self.load_annotations(annotations[r])
            X, y = self.extract_features(signals, annotations_data, X, y)
        return np.array(X), np.array(y)

    def get_files(self):
        records, annotations = [], []
        for root, _, files in os.walk(self.data_dir):
            records.extend([os.path.join(root, f) for f in files if f.endswith('.csv')])
            annotations.extend([os.path.join(root, f) for f in files if f.endswith('.txt')])
        records.sort(), annotations.sort()
        return records, annotations

    def load_signals(self, file_path):
        signals = []
        with open(file_path, 'rt') as csvfile:
            reader = csv.reader(csvfile)
            for row_index, row in enumerate(reader):
                if row_index > 0:
                    signals.append(int(row[1]))
        return stats.zscore(denoise(signals))

    def load_annotations(self, file_path):
        with open(file_path, 'r') as file:
            return file.readlines()

    def extract_features(self, signals, annotations, X, y):
        for d in range(1, len(annotations)):
            pos, arrhythmia_type = map(int, annotations[d].split()[1:3])
            if arrhythmia_type in self.classes:
                arrhythmia_index = self.classes.index(arrhythmia_type)
                if self.window_size <= pos < (len(signals) - self.window_size):
                    start_idx, end_idx = pos - self.window_size, pos + self.window_size + 1
                    X.append(signals[start_idx:end_idx])
                    y.append(arrhythmia_index)
        return X, y

    def balance_classes(self, X, y):
        df = pd.DataFrame(X)
        df['label'] = y
        balanced_dfs = [resample(df[df['label'] == i], n_samples=5000, random_state=42+i, replace=True)
                        for i in range(len(self.classes))]
        df_balanced = pd.concat(balanced_dfs)
        return df_balanced.drop('label', axis=1).values, df_balanced['label'].values

    def plot_class_distribution(self, y, title="Class Distribution"):
        plt.figure(figsize=(8, 6))
        values, counts = np.unique(y, return_counts=True)
        labels = [self.classes[v] for v in values]
        plt.pie(counts, labels=labels, autopct='%1.1f%%', startangle=90)
        plt.title(title)
        plt.axis('equal')
        plt.show()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.tensor(self.X[idx], dtype=torch.float32).unsqueeze(0), torch.tensor(self.y[idx], dtype=torch.long)
# DATA LOADER

import torch

from base import BaseDataLoader
from dataset.dataset import ECGDataset


class ECGDataLoader(BaseDataLoader):
    def __init__(self, data_dir, batch_size=8, shuffle=True, validation_split=0.2, num_workers=1, training=True):
        self.data_dir = data_dir
        dataset = ECGDataset(data_dir)
        print(len(dataset))
        super().__init__(dataset, batch_size, shuffle, validation_split, num_workers)

3-2. Model

: 추론을 위한 딥러닝 모델

import torch
import torch.nn as nn
import torch.nn.functional as F
from base import BaseModel

class ECGModel(BaseModel):
    def __init__(self, num_classes=5):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 32, kernel_size=5)
        self.conv_layers = nn.ModuleList([nn.Conv1d(32, 32, kernel_size=5, padding=2) for _ in range(10)])
        self.pool = nn.MaxPool1d(kernel_size=5, stride=2)
        self.flatten = nn.Flatten()
        self.fc_layers = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(32, num_classes),
            nn.Softmax(dim=1)
        )
        self.leaky_relu = nn.LeakyReLU()

    def forward(self, x):
        x = self.conv1(x)
        for i in range(0, len(self.conv_layers), 2):
            x1 = self.leaky_relu(self.conv_layers[i](x))
            x1 = self.conv_layers[i + 1](x1)
            x = F.relu(x1 + x)
            x = self.pool(x)
        x = self.flatten(x)
        return self.fc_layers(x)

class ECGModel2(BaseModel):
    def __init__(self, num_classes=5):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(16, 32, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(1376, 120),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(120, num_classes),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        return self.fc_layers(x)

3-3. Criterion

: loss 연산을 위한 함수

import torch.nn as nn


def ce_loss(output, target):
    criterion = nn.CrossEntropyLoss()
    return criterion(output, target)

3-4. Other elements & Learning process using the elements

: 이외에도, 코드를 이곳에 작성하지는 않겠지만, Optimizer(학습을 위함), Train Iterator(학습을 위한 train step), Evaluation Logic(모델의 학습정도를 확인하기 위한 평가) 등의 요소들도 있다.

<학습과정>

지도학습에서는 입력된 데이터를 바탕으로 모델이 정답을 추론하도록 하며, 모델이 추론한 결과와 실제 값을 비교하여 손실(loss)을 계산하고, 이를 역전파하여 모델을 학습시킨다. 위 요소들(3-1 ~ 3-4)을 통해 이러한 과정이 이루어지며, 이 과정을 반복함으로써 모델이 점차적으로 정답을 정확하게 추론할 수 있게 된다.

0개의 댓글