[딥러닝-Autoencoder] 오토엔코더를 이용한 이상치(독버섯) 검출

1
post-thumbnail

04번째 게시물

안녕하세요. 이번 게시물에서는 01번째 게시물에서 classification을 실시한 버섯, 독버섯 데이터를 가지고 이상치 검출 모델을 생성하고 테스트 해보려고 합니다.

Auto encoder는 unspervised learning의 일종으로 라벨링 없이 학습을 진행할 수 있습니다.
학습을 진행할 때에는 정상데이터들만 input으로 넣습니다.

아키텍처는 인코더 - 디코더로 진행되며, 인코더에서 정보들을 압축하고 채널깊이를 쌓으면서 latent vector 형태로 만듭니다. 그리고 정보가 함축된 latent vector를 다시 디코딩하면서 정보들을 압축해제 하고 원래 shape으로 복구하게 됩니다.
그리고 이렇게 복구된 형태의 output데이터와 input데이터간의 차이를 loss함수로 설정하고 가중치를 업데이트 합니다.
최종적으로 모델이 도출 된 후에 모델이 만약 주로 학습한 정상데이터를 넣었을 때에는 복구된 데이터가 원본과 유사하기 때문에 loss가 적게 나오고_ 이상치가 입력 되었을 때는 학습하지 않은 유형이기 때문에 복구된 데이터가 원본 데이터와 큰 차이를 보여 loss가 정상치보다 비교적 높게 나오게 됩니다.
따라서 일정한 threshold값 이상되는 loss를 가지는 데이터를 이상치로 간주하는 것이 오토엔코더를 이용한 이상치 검출 방법입니다.

함축하면 아래와 같습니다.

1) 인코더에서 정보 압축, 디코더에서 정보 압축 해제 및 재구축
2) loss함수로 재구축된 데이터와 원본 데이터의 차이를 설정 후 back propagation 실시
3) 정상치 입력 -> 복원 잘되서 loss가 threshold보다 작음, 이상치 입력 -> 복원 잘안되서 loss가 threshold보다 큼

Mushroom Anomaly Detection by Autoencoder

데이터 : https://www.kaggle.com/datasets/uciml/mushroom-classification
먹을 수 있는 class를 정상데이터로 간주하고 독 버섯 class를 이상치로 간주하여 진행할 예정입니다.

0,1) 필요한 패키지 모듈 가져오기 및 데이터 로드

import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F

if __name__ == "__main__":
    ## devide label and train_data
    data = pd.read_csv(
        "./mushrooms.csv"
    )
    data = pd.get_dummies(data)

    val_num = 60
    test_num = 60

    poision = data[data["class_p"]==1]
    edible = data[data["class_e"]==1]

    edible_train = edible.iloc[:-val_num-test_num]
    edible_val = edible_train.iloc[-val_num-test_num:-test_num]
    edible_test = edible.iloc[-test_num:]

    posion_val = poision.iloc[:val_num]
    poison_test = poision.iloc[val_num:val_num+test_num]

    train_class_num = len(edible_train.columns)

모듈을 import 합니다. 그리고 데이터를 가져온 다음 라벨클래스인 posion, edibile에 따라 데이터를 나누고 drop으로 label class는 제거 합니다. 그리고 각각 train, valid, test셋으로 나눕니다. 여기서 우리는 정상데이터만 학습할 것이기 때문에 edible만 train데이터로 가져갑니다. 그리고 train 데이터의 column 개수를 반환합니다.

2) 딥러닝을 위한 Dataset Batch, Dataloader 설정

# dataset class
class dataset(Dataset):
    def __init__(self, x):
        self.x = x

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        single_x = self.x.iloc[index]
        single_x = torch.tensor(
            single_x, dtype=torch.float32
        )  # covert dataframe to Tensor
        return single_x

    def normalization(self):
        pass
        
        
    ed_train_dataset = dataset(edible_train)
    ed_val_dataset = dataset(edible_val)
    ed_test_dataset = dataset(edible_test)

    po_val_dataset = dataset(posion_val)
    po_test_dataset = dataset(poison_test)

    train_batch_size = 32
    ed_train_dataloader = DataLoader(ed_train_dataset, batch_size=train_batch_size,shuffle=True)
    ed_val_dataloader = DataLoader(ed_val_dataset,batch_size=1)
    ed_test_dataloader = DataLoader(ed_test_dataset,batch_size=1)

    po_val_dataloader = DataLoader(po_val_dataset,batch_size=1)
    po_test_dataloader = DataLoader(po_test_dataset,batch_size=1)
    

딥러닝 전용 커스터마이징 데이터셋으로 클래스(텐서화 포함)를 구성한 뒤 인스턴스를 생성합니다. 그리고 batch size만큼 분할해서 네트워크에 넣기 위해서 dataloader로 만들어 줍니다.

3) Network 설정

class autoencoder(nn.Module):
    def __init__(self,input_class):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_class, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True), nn.Linear(64, 12), nn.ReLU(True), nn.Linear(12, 3))
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True), nn.Linear(128, input_class), nn.Tanh())

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
        
    Network = autoencoder(input_class=train_class_num)
    summary_model = summary(Network)
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    Network = Network.to(device)
    print(f"\n### The device is --{device}-- that we use ###")
    print(f"### model is ready!!!###\n\n{Network}")
    

오토엔코더에 적용할 network구조를 class에 입력합니다. self.encoder 속성에서 정보를 압축하는 layer들을 저장하고 self.decoder에서 정보를 압축해제하는 layer를 쌓습니다.
후에 네트워크 인스턴스를 생성하고 네트워크를 설정한 device로 보냅니다.(저는 cpu로 설정했습니다!)

4) Train, Validation 실시

    lr = 0.0001
    total_epoch = 100
    step_size = 30
    gamma = 0.1
    save_path = "./model/mushroom_autoencoder_epoch100.pt"

    criterion = nn.MSELoss()
    train_signal = input(str("Do you want to train ? (y or n) : "))


    if train_signal == "y":
        # 옵티마이저, 스케쥴러, 손실함수(이진분류라서 binary선택) 선택
        optimizer = optim.Adam(Network.parameters(), lr=lr)
        scheduler = optim.lr_scheduler.StepLR(
            optimizer=optimizer, step_size=step_size, gamma=gamma
        )

        
        total_loss = {"po_val": [],"ed_val": [], "train": []}

        for epoch in range(total_epoch):
            Network.train()
            epoch_train_loss = 0.0
            for i, data in enumerate(ed_train_dataloader,0):
                inputs = data
                inputs = inputs.to(device)
                optimizer.zero_grad()  # 초기화
                outputs = Network(inputs)
                loss = criterion(outputs, inputs)
                batch_train_loss = loss.item()  # loss만 가져오기
                loss.backward()  # 역전파 기록 저장
                optimizer.step()  # 업데이트
                epoch_train_loss += batch_train_loss

            avg_train_loss = epoch_train_loss / len(ed_train_dataloader) 

            Network.eval()
            ed_val_epoch_loss = 0
            for i, data in enumerate(ed_val_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)

                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    val_batch_loss = loss.item()
                    ed_val_epoch_loss += val_batch_loss
            ed_avg_val_loss = ed_val_epoch_loss / len(ed_val_dataloader)

            po_val_epoch_loss = 0.0
            for i, data in enumerate(po_val_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)
                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    val_batch_loss = loss.item()
                    po_val_epoch_loss += val_batch_loss

            po_avg_val_loss = po_val_epoch_loss / len(po_val_dataloader)

            total_loss["train"].append(avg_train_loss)
            total_loss["po_val"].append(po_avg_val_loss)
            total_loss["ed_val"].append(ed_avg_val_loss)

            print(f"epoch : {epoch} The train loss is {avg_train_loss}, po_valid loss is {po_avg_val_loss}, ed_valid loss is{ed_avg_val_loss}")
            
            scheduler.step()

        x = np.arange(1, total_epoch + 1)
        plt.plot(x, total_loss["train"], "r", label="train")
        plt.plot(x, total_loss["po_val"], "b", label="po_val")
        plt.plot(x, total_loss["ed_val"],"g", label="ed_val")
        plt.legend()
        plt.title("Mushroom Autoencoder Deeplearning")
        plt.yscale("logit")
        plt.xlabel("epoch")
        plt.ylabel("loss")
        plt.show()

        torch.save(Network.state_dict(),save_path)

필요한 parameter인 lr, 총학습 횟수(epoch)와 스텝러닝레이트를 위한 step_lr, gamma 그리고 모델을 저장할 위치+이름을 지정합니다.
원본데이터와 복원된 데이터의 차이를 계산하는 loss함수로 MSE함수를 선택합니다.

1) Train
학습을 실시할 경우(train_signal =="y") 네트워크를 train모드로 설정한 뒤 train을 시작합니다. 가중치를 초기화하고 설정한 네트워크에 데이터를 넣어 예측 값을 도출합니다. 그 뒤 loss 함수에서 초기 input(원본) 값과 model에 input값을 입력시킨 output(복원) 값의 차이를 계산합니다. 그리고 이 loss값으로 가중치를 업데이트 합니다.

2) Valid
학습이 1epoch 완료된 후 Valid 과정을 거칩니다. Val과정 같은 경우는 정상치(ed)를 입력했을경우와 이상치(po)를 입력했을 경우 두 가지 경우를 입력 시켰습니다. train과 같지만 torch.no_grad구문으로 자동 미분을 멈추고 loss를 계산합니다.

3) 시각화
계산된 loss를 시각화합니다. train_loss와 po_val, ed_val 3가지를 동시에 그립니다.

결과를 보니 train의 경우 잘 감소하였습니다. 결과에서 po_val(이상치)과 ed_val(정상) loss 값이 확연히 차이나는 것을 확인 할 수 있습니다. 따라서 추후 test에서 threshold 값으로 5*10e-2 값을 입력한다면 현재 그래프 상에서도 po 와 ed loss 값을 구분할 수 있기 때문에 test에서도 이상치와 정상을 구분할 수 있을 것이라고 예상됩니다.

모든과정을 마친 뒤 도출된 가중치 값들을 저장합니다.

5) Test 실시

    elif train_signal == "n":
        thresholds = np.arange(1,10)*0.01
        Network.load_state_dict(torch.load(save_path))
        Network.eval()

        total_ed_accuracy = []
        total_po_accuracy = []

        for threshold in thresholds:
            ed_result = []
            for i, data in enumerate(ed_test_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)
                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    ed_test_loss = loss.item()

                if ed_test_loss > threshold:
                    ed_result.append(False)

                elif ed_test_loss < threshold:
                    ed_result.append(True)
        
            po_result = []

            for i, data in enumerate(po_test_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)

                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    po_test_loss = loss.item()

                if po_test_loss > threshold:
                    po_result.append(True)

                elif po_test_loss < threshold:
                    po_result.append(False)

            ed_accuracy = (ed_result.count(True) / len(ed_result))*100
            po_accuracy = (1 - po_result.count(True) / len(po_result))*100
            total_ed_accuracy.append(ed_accuracy)
            total_po_accuracy.append(po_accuracy)

            num_ed_result = len(ed_result)
            num_po_result = len(po_result)

            print(f"Threshold 설정 값은 {threshold} 입니다!!")
            print(f"ed 판별 횟수 : {num_ed_result}, 정확도 = {ed_accuracy}%, po 판별횟수 : {num_po_result}, 정확도 = {po_accuracy}%")

        plt.plot(thresholds, total_ed_accuracy, "r", label="ed_acuuracy")
        plt.plot(thresholds, total_po_accuracy, "b", label="po_acuuracy")
        plt.legend()
        plt.title("Mushroom Autoencoder Deeplearning threshold - accuracy")
        plt.xlabel("threshold")
        plt.ylabel("accuracy")
        plt.show()

모델이 학습이 완료되어 저장된 경우(train_signal=="n") 테스트를 실시합니다.threshold 값을 0.01~0.09로 iterable 하게 설정하였습니다.
테스트를 실시 할 때에 threshold에 따른 po와 ed accuracy를 보기 위해서 po, ed를 따로 입력합니다.
ed(정상)일 경우에는 threshold 값을 넘지 않는 경우(원본과 거의 유사) True로 반환했고
po(이상치)일 경우에는 threshold 값을 넘는 경우(원본과 다름)을 True로 반환하였습니다.

결과는 위와 같이 threshold가 0.01 에서 0.09 까지 변함에 따라서 두 곡선의 교차점인 0.5로 threshold를 설정할 경우 가장 높은 정확도로 posion, edible독버섯을 구별할 수 있음을 알 수 있습니다.

이상치 검출 autoencoder 프로젝트 진행할 때 도움이 되었으면 좋겠습니다.

전체코드

import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F

from torchinfo import summary
from torch.utils.data import Dataset, DataLoader

# dataset class
class dataset(Dataset):
    def __init__(self, x):
        self.x = x

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        single_x = self.x.iloc[index]
        single_x = torch.tensor(
            single_x, dtype=torch.float32
        )  # covert dataframe to Tensor
        return single_x

    def normalization(self):
        pass

 
class autoencoder(nn.Module):
    def __init__(self,input_class):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_class, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True), nn.Linear(64, 12), nn.ReLU(True), nn.Linear(12, 3))
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True), nn.Linear(128, input_class), nn.Tanh())

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

if __name__ == "__main__":
    ## devide label and train_data
    data = pd.read_csv(
        "./mushrooms.csv"
    )
    data = pd.get_dummies(data)

    poision = data[data["class_p"]==1]
    edible = data[data["class_e"]==1]

    val_num = 60
    test_num = 60



    poision.drop(columns=["class_e","class_p"],inplace=True)
    edible.drop(columns=["class_e","class_p"],inplace=True)

    edible_train = edible.iloc[:-val_num-test_num]
    edible_val = edible_train.iloc[-val_num-test_num:-test_num]
    edible_test = edible.iloc[-test_num:]

    posion_val = poision.iloc[:val_num]
    poison_test = poision.iloc[val_num:val_num+test_num]
    
    train_class_num = len(edible_train.columns)

    ed_train_dataset = dataset(edible_train)
    ed_val_dataset = dataset(edible_val)
    ed_test_dataset = dataset(edible_test)

    po_val_dataset = dataset(posion_val)
    po_test_dataset = dataset(poison_test)

    train_batch_size = 32
    ed_train_dataloader = DataLoader(ed_train_dataset, batch_size=train_batch_size,shuffle=True)
    ed_val_dataloader = DataLoader(ed_val_dataset,batch_size=1)
    ed_test_dataloader = DataLoader(ed_test_dataset,batch_size=1)

    po_val_dataloader = DataLoader(po_val_dataset,batch_size=1)
    po_test_dataloader = DataLoader(po_test_dataset,batch_size=1)

    Network = autoencoder(input_class=train_class_num)
    summary_model = summary(Network)
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    Network = Network.to(device)

    print(f"\n### The device is --{device}-- that we use ###")
    print(f"### model is ready!!!###\n\n{Network}")

    lr = 0.0001
    total_epoch = 100
    step_size = 30
    gamma = 0.1

    save_path = "./model/mushroom_autoencoder_epoch100.pt"

    criterion = nn.MSELoss()
    train_signal = input(str("Do you want to train ? (y or n) : "))


    if train_signal == "y":
        # 옵티마이저, 스케쥴러, 손실함수(이진분류라서 binary선택) 선택
        optimizer = optim.Adam(Network.parameters(), lr=lr)
        scheduler = optim.lr_scheduler.StepLR(
            optimizer=optimizer, step_size=step_size, gamma=gamma
        )

        
        total_loss = {"po_val": [],"ed_val": [], "train": []}

        for epoch in range(total_epoch):
            Network.train()
            epoch_train_loss = 0.0
            for i, data in enumerate(ed_train_dataloader,0):
                inputs = data
                inputs = inputs.to(device)
                optimizer.zero_grad()  # 초기화
                outputs = Network(inputs)
                loss = criterion(outputs, inputs)
                batch_train_loss = loss.item()  # loss만 가져오기
                loss.backward()  # 역전파 기록 저장
                optimizer.step()  # 업데이트
                epoch_train_loss += batch_train_loss

            avg_train_loss = epoch_train_loss / len(ed_train_dataloader) 

            Network.eval()

            ed_val_epoch_loss = 0
            for i, data in enumerate(ed_val_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)

                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    val_batch_loss = loss.item()
                    ed_val_epoch_loss += val_batch_loss
            ed_avg_val_loss = ed_val_epoch_loss / len(ed_val_dataloader)

            po_val_epoch_loss = 0.0
            for i, data in enumerate(po_val_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)
                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    val_batch_loss = loss.item()
                    po_val_epoch_loss += val_batch_loss

            po_avg_val_loss = po_val_epoch_loss / len(po_val_dataloader)

            total_loss["train"].append(avg_train_loss)
            total_loss["po_val"].append(po_avg_val_loss)
            total_loss["ed_val"].append(ed_avg_val_loss)

            print(f"epoch : {epoch} The train loss is {avg_train_loss}, po_valid loss is {po_avg_val_loss}, ed_valid loss is{ed_avg_val_loss}")
            scheduler.step()

        x = np.arange(1, total_epoch + 1)
        plt.plot(x, total_loss["train"], "r", label="train")
        plt.plot(x, total_loss["po_val"], "b", label="po_val")
        plt.plot(x, total_loss["ed_val"],"g", label="ed_val")
        plt.legend()
        plt.title("Mushroom Autoencoder Deeplearning")
        plt.yscale("logit")
        plt.xlabel("epoch")
        plt.ylabel("loss")
        plt.show()

        torch.save(Network.state_dict(),save_path)

 

    elif train_signal == "n":
        thresholds = np.arange(1,10)*0.01
        Network.load_state_dict(torch.load(save_path))
        Network.eval()

        total_ed_accuracy = []
        total_po_accuracy = []

        for threshold in thresholds:
            ed_result = []
            for i, data in enumerate(ed_test_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)
                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    ed_test_loss = loss.item()

                if ed_test_loss > threshold:
                    ed_result.append(False)

                elif ed_test_loss < threshold:
                    ed_result.append(True)
        
            po_result = []
            for i, data in enumerate(po_test_dataloader, 0):
                inputs = data
                inputs = inputs.to(device)
                with torch.no_grad():
                    outputs = Network(inputs)
                    loss = criterion(outputs, inputs)
                    po_test_loss = loss.item()

                print(po_test_loss)
                if po_test_loss > threshold:
                    po_result.append(True)

                elif po_test_loss < threshold:
                    po_result.append(False)

            ed_accuracy = (ed_result.count(True) / len(ed_result))*100
            po_accuracy = (po_result.count(True) / len(po_result))*100
            total_ed_accuracy.append(ed_accuracy)
            total_po_accuracy.append(po_accuracy)

            num_ed_result = len(ed_result)
            num_po_result = len(po_result)

            print(f"Threshold 설정 값은 {threshold} 입니다!!")
            print(f"ed 판별 횟수 : {num_ed_result}, 정확도 = {ed_accuracy}%, po 판별횟수 : {num_po_result}, 정확도 = {po_accuracy}%")

        plt.figure(figsize=(10,10))
        plt.plot(thresholds, total_ed_accuracy, "r", label="ed_acuuracy")
        plt.plot(thresholds, total_po_accuracy, "b", label="po_acuuracy")
        plt.legend()
        plt.title("Mushroom Autoencoder Deeplearning threshold - accuracy")
        plt.xlabel("threshold")
        plt.ylabel("accuracy")
        plt.show()
profile
재미로 해보는 다양한 AI프로젝트 모음집

0개의 댓글