[졸프] 해외 안심 거래 서비스를 만들어보자!

두더지·2024년 11월 26일
0

24-2학기, 이화여자대학교에서 졸업프로젝트를 시작하게 되었다.
졸업프로젝트로 만들게 된 것은 해외 안심 거래 서비스!
간단하게 두 개의 서비스를 제공하는 어플을 만드는 것이다.

  1. ATM 기기의 스키밍 장치 탐지
  2. 실시간 거래 내역 중 이상거래 탐지

이 중, 내가 맡은 부분은 ATM 기기의 스키밍 장치를 탐지하는 것.

ATM 기기의 사진을 학습시켜, 이상탐지 알고리즘을 통해 스키밍 장치를 탐지하는 것이다.

좀 더 심화된 모델링은 다음학기 때 구현하는 것이기에,
이번 학기에는 어느정도 스키밍 장치를 탐지할 수 있는지에 대한 여부를 확인해보았다.

step1, 데이터셋 구하기

ATM 기기에 대한 데이터셋을 찾아보았지만 인터넷 상에서 구하기가 어려웠고, ATM기기마다 카드 투입구 부분이 달랐기에 먼저, 하나의 ATM기기만을 학습시켰을 때 스키밍 장치를 잘 탐지할 수 있는지에 대해 확인해보았다.
학교 근처 ATM기기를 정해 약 200개의 사진을 직접 찍었고, 각 이미지당 5종류의 Augmentation을 통해 약 1400장의 데이터셋을 확보하였다.

step2, AI 모델 정하기

여러 이상탐지 알고리즘 중, 먼저 쉽게 구현할 수 있는 Autoencoder를 먼저 시도해보았다. 메모리 기반의 MemSeg 모델을 사용해볼까도 고민해보았지만, 먼저 Autoencoder로 탐지를 해본 후, 필요하다면 시도해보기로 하였다.
Autoencoder 모델은 정상 데이터만을 학습을 한 후, 복원 오차를 기준으로 이상 데이터를 탐지할 수 있기에 스키밍 장치의 사진을 구하기 어려운 우리에겐 너무 좋은 모델이였다.

Autoencoder

Autoencoder는 encoder 부분과 decoder부분으로 나눠진다.
encoder 부분에서는 입력 이미지를 저차원으로 압축하여, 특정 feature들을 추출하는 것이고
decoder 부분에서는 추출한 feature를 통해 원래 이미지를 복원하는 것이다.

step3, 코드 구현

colab에서 코드를 구현하였기에, 1400장의 데이터셋을 가져올 방법이 필요했다.
이를 위해 kaggle API를 사용하였다.
kaggle에 데이터셋을 올리고 API를 가져와 colab에서도 우리가 준비한 데이터셋을 사용할 수 있었다.

전처리 과정

import os
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.pyplot as plt

# 데이터셋 정의
class ATMDataset(Dataset):
    def __init__(self, folder_path, transform=None):
        self.folder_path = folder_path
        self.image_paths = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("L")  # 흑백 변환 (1채널)
        if self.transform:
            image = self.transform(image)
        return image

# 데이터 경로
data_dir = "/root/.cache/kagglehub/datasets/j22s00/atm-cardslot/versions/1/ATM"  # Kaggle 데이터셋이 저장된 폴더 경로

# 전처리 파이프라인
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 조정
    transforms.ToTensor(),          # Tensor로 변환
    transforms.Normalize(mean=[0.5], std=[0.5])  # 정규화
])

# 데이터셋 및 데이터로더 생성
dataset = ATMDataset(data_dir, transform=transform)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# 데이터 시각화
def visualize_sample(dataset, num_samples=5):
    plt.figure(figsize=(12, 6))
    for i in range(num_samples):
        image = dataset[i]
        plt.subplot(1, num_samples, i + 1)
        plt.imshow(image.squeeze(0), cmap="gray")
        plt.axis("off")
    plt.show()

visualize_sample(dataset)

모델 정의

import torch
import torch.nn as nn

# Autoencoder 모델 정의
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

모델 학습

import torch
import torch.nn as nn

# Autoencoder 모델 정의
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

의 코드를 통해 모델을 학습시키고 포토샵으로 스키밍 이미지를 만들어 테스트를 진행해보았다.

테스트 결과

하지만 결과는 처참하였다. 스키밍 사진은 탐지하지 못하였고, 오히려 각도가 반대되었던 사진만을 이상탐지한 것이다.

이러한 문제의 원인은
카트 투입구 부분을 attention하지 않았기에 주변 배경까지도 가중치를 두고 학습이 되었기 때문이라고 분석하였다.

step4, 이상탐지를 잘 하기 위한 수정

이를 해결하기 위해 카드 투입구 부분을 YOLO 모델을 사용하여 탐지하여 crop하려고 하였지만, YOLO 모델에 카드 투입구 부분을 또한 학습 시켜야했기 때문에 현재로썬 쉽지 않다고 생각이 되었고, 이는 다음 학기의 그로쓰 때 구현하기로 결정하였다...

새로운 데이터셋 구축

그렇기에 '다시' 데이터셋을 구하기 위해 근처 ATM기기에서 이번엔 카드 투입구 부분만을 확대하여 200장 정도의 사진을 찍어서 준비하였다.

새로운 모델

현재 사용하고 있는 autoencoder 모델이 이상탐지를 잘 못하는 것 같아, 데이터셋의 수도 적어진 것을 감안하여 모델과 학습 방법을 바꾸었다.

데이터 전처리

import os
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from sklearn.model_selection import train_test_split

# 데이터셋 경로 설정
dataset_path = '/content/drive/MyDrive/atm'

# 이미지 읽어오기
images = []
for filename in os.listdir(dataset_path):
    if filename.endswith('.jpg') or filename.endswith('.png'):
        img = cv2.imread(os.path.join(dataset_path, filename))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # 색상 채널 변환
        img = cv2.resize(img, (128, 128))  # 이미지 크기 조정
        images.append(img)

# 이미지 데이터를 numpy 배열로 변환
images = np.array(images) / 255.0  # 0-1 범위로 정규화

# 훈련 및 검증 데이터셋 나누기
X_train, X_test = train_test_split(images, test_size=0.2, random_state=42)

모델 정의

from tensorflow.keras import layers, models

# 모델 정의
input_img = layers.Input(shape=(128, 128, 3))

# 인코더 부분
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

# 디코더 부분
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

# 모델 생성
autoencoder = models.Model(input_img, decoded)

# 모델 컴파일
autoencoder.compile(optimizer='adam', loss='mean_squared_error')

# 모델 구조 요약
autoencoder.summary()

모델 학습

# Autoencoder 학습
autoencoder.fit(X_train, X_train, epochs=50, batch_size=16, validation_data=(X_test, X_test))

최종 test 결과

이렇게 학습한 모델에 총 3개의 test 이미지를 넣어보았다.
이 이미지들은 스키밍 장치를 포토샵으로 가져온 이미지이다.

테스트 이미지 전처리 및 테스트

import os
import numpy as np
import cv2
from tensorflow.keras.preprocessing import image
import matplotlib.pyplot as plt

# 테스트 데이터 경로
test_dataset_path = '/content/test_dir'

# 이미지 파일 리스트 가져오기
test_images = [f for f in os.listdir(test_dataset_path) if f.endswith(('.jpg', '.jpeg', '.png', '.bmp', '.tiff'))]
print(f"Found {len(test_images)} images for testing.")

# 이미지 로드 및 전처리 함수
def load_and_preprocess_image(image_path):
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # 색상 채널 변환
    img = cv2.resize(img, (128, 128))  # 모델에 맞는 크기로 조정
    img = np.array(img) / 255.0  # 0-1 범위로 정규화
    return img

# 테스트 이미지 처리
for test_image in test_images:
    image_path = os.path.join(test_dataset_path, test_image)
    
    # 이미지를 로드하고 전처리
    img = load_and_preprocess_image(image_path)
    img_batch = np.expand_dims(img, axis=0)  # 배치 차원 추가
    
    # 모델 예측 (복원된 이미지 계산)
    reconstructed_img = autoencoder.predict(img_batch)
    
    # 원본 이미지와 복원된 이미지 출력
    fig, axes = plt.subplots(1, 2, figsize=(10, 5))
    
    # 원본 이미지 출력
    axes[0].imshow(img)
    axes[0].set_title(f"Original: {test_image}")
    axes[0].axis('off')
    
    # 복원된 이미지 출력
    axes[1].imshow(reconstructed_img[0])  # 배치 차원 제거
    axes[1].set_title(f"Reconstructed: {test_image}")
    axes[1].axis('off')
    
    plt.show()

임계값 설정

import numpy as np

# 훈련 데이터의 평균 재구성 오차와 표준편차를 구해서 임계값 설정
train_reconstruction_errors = []  # 훈련 중 재구성 오차 리스트

for img in X_train:
    reconstructed_img = autoencoder.predict(np.expand_dims(img, axis=0))
    error = np.mean(np.square(img - reconstructed_img[0]))  # MSE 계산
    train_reconstruction_errors.append(error)

# 평균과 표준편차 구하기
mean_error = np.mean(train_reconstruction_errors)
std_error = np.std(train_reconstruction_errors)

# 임계값을 평균 + 2*표준편차로 설정 (재구성 오차가 크게 나올 확률이 낮은 값)
threshold = mean_error + 2 * std_error

# 이상 여부 판단
for test_image in test_images:
    image_path = os.path.join(test_dataset_path, test_image)
    
    # 이미지를 로드하고 전처리
    img = load_and_preprocess_image(image_path)
    img_batch = np.expand_dims(img, axis=0)  # 배치 차원 추가
    
    # 모델 예측 (복원된 이미지 계산)
    reconstructed_img = autoencoder.predict(img_batch)
    
    # 재구성 오차 계산
    reconstruction_error = calculate_reconstruction_error(img, reconstructed_img[0])
    
    # 재구성 오차 출력
    print(f"Image: {test_image}, Reconstruction Error: {reconstruction_error:.4f}")
    
    # 이상 여부 판단
    if reconstruction_error > threshold:
        print(f"Image: {test_image} is Anomalous.")
    else:
        print(f"Image: {test_image} is Normal.")
    
    # 원본 이미지와 복원된 이미지 출력
    fig, axes = plt.subplots(1, 2, figsize=(10, 5))
    
    # 원본 이미지 출력
    axes[0].imshow(img)
    axes[0].set_title(f"Original: {test_image}")
    axes[0].axis('off')
    
    # 복원된 이미지 출력
    axes[1].imshow(reconstructed_img[0])  # 배치 차원 제거
    axes[1].set_title(f"Reconstructed: {test_image}")
    axes[1].axis('off')
    
    plt.show()

이 결과로 3개의 테스트 이미지 중, 하나의 이미지를 이상데이터로 잘 탐지했다는 것을 알 수 있었다.

아직은, 성능이 부족하지만 이후로 다양한 하이퍼파라미터와 코드 수정 등을 통해 스키밍 장치를 잘 탐지하는 모델을 만들어보려고 한다.

step5, 이상데이터 모두를 잘 이상데이터로 탐지하기 위한 수정

추가 성능 향상을 위해
1. 모델 복잡도를 줄이고
2. 학습 데이터에 노이즈를 포함한 이미지를 입력하였다.

그 이유는 Autoencoder가 너무 복잡하다면 데이터의 세부적인 노이즈까지 복원할 수 있기에 이를 방지하고 노이즈를 포함한 이미지를 입력으로 주었을 때, 깨끗한 정상 데이터를 복원하도록 훈련한다면 이상 데이터가 학습되지 않도록 할 수 있기 때문이다.

따라서,

모델 정의

from tensorflow.keras import layers, models

# 간단한 Autoencoder 모델 정의
input_img = layers.Input(shape=(128, 128, 3))

# 인코더
x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

# 디코더
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

# 모델 생성
autoencoder = models.Model(input_img, decoded)
autoencoder.compile(optimizer='adam', loss='mean_squared_error')

모델 학습

# 훈련 데이터에 랜덤 노이즈 추가
noisy_X_train = X_train + np.random.normal(loc=0.0, scale=0.1, size=X_train.shape)
noisy_X_train = np.clip(noisy_X_train, 0., 1.)  # 픽셀 값을 [0, 1]로 클리핑

# Autoencoder 학습
autoencoder.fit(noisy_X_train, X_train, epochs=50, batch_size=16, validation_split=0.2)

이렇게 수정한 후, 테스트 부분에선 임계값을 수정하는 부분을 빼고 테스트를 한 결과

3개의 이상데이터 모두 이상 데이터로 잘 탐지한 결과를 얻을 수 있었다!!

이젠 정상데이터도 테스트를 하였을 때, 정상 데이터로 잘 탐지하는지를 추가로 테스트 시작하였다.
테스트셋엔 이젠, 이상데이터 3개와 정상데이터 6개를 넣었을 때, 테스트를 해보았지만
이상데이터 3개는 이상데이터로 하였지만
정상데이터에선 3개는 정상으로, 3개는 이상으로 탐지하였다.

이후 정상데이터 또한, 정상 데이터라고 잘 탐지할 수 있도록 성능을 향상시켜보려고 한다.

step6, 정상데이터 또한 정상데이터로 탐지하기 위한 수정

단순히 복원 오차(MSE)만으로는 정상과 이상 데이터를 완벽히 구분하기에는 어려울 수 있기에 추가적인 분석 방법인 SSIM을 계산하였다. 이는 복원된 이미지와 입력 이미지 간의 구조적 유사성을 계산하는 것으로

먼저 모델을 학습시킨 후, 학습한 데이터를 기반으로 SSIM 임계값을 설정해준다.

훈련 데이터 기반 SSIM 임계값 설정

# 훈련 데이터에서 SSIM 계산
train_ssim_scores = []
for img in X_train:
    reconstructed_img = autoencoder.predict(np.expand_dims(img, axis=0))
    ssim_score = calculate_ssim(img, reconstructed_img[0])
    train_ssim_scores.append(ssim_score)

# 평균과 표준편차 계산
mean_ssim = np.mean(train_ssim_scores)
std_ssim = np.std(train_ssim_scores)

# SSIM 임계값 설정 (평균 - 2*표준편차)
ssim_threshold = mean_ssim - 2 * std_ssim
print(f"SSIM Threshold: {ssim_threshold}")

이상탐지 테스트

for test_image in test_images:
    image_path = os.path.join(test_dataset_path, test_image)
    
    # 이미지 로드 및 전처리
    img = load_and_preprocess_image(image_path)
    img_batch = np.expand_dims(img, axis=0)  # 배치 차원 추가
    
    # 복원된 이미지 계산
    reconstructed_img = autoencoder.predict(img_batch)
    
    # 재구성 오차 (MSE) 계산
    reconstruction_error = calculate_reconstruction_error(img, reconstructed_img[0])
    
    # SSIM 계산
    ssim_score = calculate_ssim(img, reconstructed_img[0])
    
    # 이상 여부 판단
    if reconstruction_error > threshold or ssim_score < ssim_threshold:
        print(f"Image {test_image} is Anomalous.")
    else:
        print(f"Image {test_image} is Normal.")

이후 이를 고려하여 이상탐지를 진행하였고
그 결과

3개의 이상데이터는 이상데이터로,
6개의 정상데이터는 정상데이터로 잘 탐지하였다.

이로써 ATM기기의 스키밍 장치를 잘 탐지할 수 있는 모델을 완성하였고
이후엔 더욱 많은 데이터셋을 훈련 및 테스트해보며 더욱 성능을 향상 시켜보려 한다.

작성자 : (Team 28 HUK) 2171008 김지수

profile
컴공생의 밍기적

0개의 댓글