기본적인 데이터 전처리는 어려울게 없었지만, torch로 autoencoder를
class 형태로 구현하고 적용하는 건 아직 노력이 많이 필요한 것 같다.

01. 데이터 확인, 전처리

  • 데이터 출처는 kaggle에서의 신용카드 사기 데이터였다.
  • 이 데이터는 PCA로 차원축소가 이뤄진 데이터이고, v1~v28처럼
    의미를 추측하기 힘든 컬럼으로 구성되어 있다.
    • PCA로 차원을 축소할 때, 각 PC(Pricipal Component, 주성분)는
      원래 있던 데이터의 design-matrix(y를 빼고 x값으로 이뤄진 행렬)
      각 컬럼의 선형결합으로 표현이 된다. 그래서 각 PC의 의미를 해석하는 것이 어렵다.
    • 또한 금융권 데이터의 특성상 보안이나 개인정보 보호 등의 목적에서
      컬럼의 의미를 추측할 수 없게, 컬럼의 선형결합으로 PC를 구성했을 수도 있다.
  • 전체 31개 컬럼은 v1~v28, Time, Amount, class로 이뤄져 있는데
    class는 0(정상)과 1(사기)로 이뤄져 있고, Time은 중복 제거하니
    11만개여서 사실 의미가 없다.
  • 당시 프로젝트 수행 시 Amount는 별도로 변수변환을 했고, 일단
    v1~v28을 시각화해서 분포를 확인해 보았다.
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# 앞에서 불러온, creditcard_df 데이터 사용
# v1~v28 컬럼 선택
features = creditcard_df.iloc[:, 1:29] 

# 그래프 그리기
plt.figure(figsize = (20,20))
plt.subtitle('distribution of v1~v28', fontsize = 18)

for i, feature in enumerate(features):
    plt.subplot(7,4, i+1)
    # kde는 비모수적인 커널을 그려주는 것
    sns.histplot(v_features[feature],bins=50, kde=True)
    plt.title(feature)
    plt.xlabel('')
    plt.ylabel('')
plt.tight_layout(rect = [0, 0.03, 1, 0.95] )
plt.show()

  • Amount column이 변수변환 필요한지 확인
# 1. 원본 Amount 컬럼의 분포 시각화
plt.figure(figsize=(18, 5))
plt.subplot(1, 3, 1)
sns.histplot(creditcard_df['Amount'], bins=50, kde=True)
plt.title('Original Amount Distribution')
plt.xlabel('Amount')
plt.ylabel('Count')
plt.xlim(0, 2000) # x축 범위 제한으로 밀집된 구간 확인

# 2. 로그 변환만 적용한 Amount 컬럼의 분포 시각화
amount_log_transformed = np.log1p(creditcard_df['Amount'])
plt.subplot(1, 3, 2)
sns.histplot(amount_log_transformed, bins=50, kde=True)
plt.title('Log-transformed Amount Distribution')
plt.xlabel('Log(1 + Amount)')
plt.ylabel('Count')

# 3. 로그 변환 후 스케일링까지 적용한 Amount 컬럼의 분포 시각화
scaler = StandardScaler()
# reshape(-1, 1)을 사용하여 1D 데이터를 2D 배열로 변환
amount_scaled = scaler.fit_transform(amount_log_transformed.values.reshape(-1, 1))
amount_scaled_series = pd.Series(amount_scaled.flatten(), name='Scaled Amount')
plt.subplot(1, 3, 3)
sns.histplot(amount_scaled_series, bins=50, kde=True)
plt.title('Log-transformed & Scaled Amount Distribution')
plt.xlabel('Scaled Amount')
plt.ylabel('Count')

plt.tight_layout() # 서브플롯 간 간격 조절
plt.show()

# 스케일링된 데이터의 평균과 표준편차 확인
print("스케일링된 데이터의 평균:", amount_scaled_series.mean())
print("스케일링된 데이터의 표준편차:", amount_scaled_series.std())

02. Auto-encoder 적용해보기

01) 데이터 불러오기

  • 데이터를 불러오고 스케일링하기(딥러닝 모델은 scale에 민감)
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

# 1. Time column 제거
data_df = creditcard_df.drop(['Time'], axis=1)

# 2. Amount column 로그 변환 후 표준화
# np.log 사용해서 0인 경우도 안전하게 처리

data_df['Amount'] = np.log(data_df['Amount'])

# StandardScaler 적용
scaler = StandardScaler()
data_scaled_features = scaler.fit_transform(data_df.drop('Class'), axis=1)

02) 데이터 분리

  • Autoencoder는 정상만 학습한 후, decoder로 복원하는 과정에서
    오차를 최소화하여 학습함
# 원본 데이터에서 정상/사기 분리
normal_df = data_df[data_df['Class']==0]
fraud_df = data_df[data_df['Class']==1]

train_normal, val_normal = train_test_split(normal_df, test_size=0.2, random_state=42)
test_data_all = pd.concat([val_normal, fraud_df])
  • DataFrame → numpy 배열로 바꾸는 이유는 torch tensor로 바꾸기 위함
train_X = train_normal.drop('Class', axis=1).values
train_y = train_normal['Class'].values

val_X = val_normal.drop('Class', axis=1).values
val_y = val_normal['Class'].values

test_X = test_data_all.drop('Class', axis=1).values
test_y = test_data_all['Class'].values
  • PyTorch는 학습 데이터를 tensor로 받기 때문에 numpy에서 torch로 바꿔야함
train_data_tensor = torch.tensor(train_X, dtype=torch.float32)
train_labels_tesnor = torch.tensor(train_y, dtype=torch.float32)

val_data_tensor = torch.tensor(val_X, dtype=torch.float32)
val_labels_tensor = torch.tensor(val_y, dtype=torch.floaat32)

test_data_tensor = torch.tensor(test_X, dtype=torch.float32)
test_labels_tensor = torch.tensor(test_y, dtype=torch.float32)
  • PyTorch 학습을 하려면 Dataset → DataLoader 구조가 필요
from torch.utils.data import Dataset, DataLoader

class CreditCardDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Dataset 만들기
train_dataset = CreditCardDataset(train_data_tensor, train_labels_tensor)
val_dataset = CreditCardDataset(val_data_tensor, val_labels_tensor)
test_dataset = CreditCardDataset(test_data_tensor, test_labels_tensor)

# DataLoader 만들기
train_loader = DataLoader(train_dataset, batch_size = 1024, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = 1024, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = 1024, shuffle = True)

03) 오토인코더 모델 클래스 정의

class Autoencoder(nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_sizes[0]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[0], hidden_sizes[1]),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(hidden_sizes[1], hidden_sizes[0]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[0], input_size)
        )

    def forward(self, x):
        return self.decoder(self.encoder(x))

04) 모델 학습하기

class Trainer:
    def __init__(self, model, train_loader, val_loader, optimizer, criterion, epochs, device):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.criterion = criterion
        self.epochs = epochs
        self.device = device
        self.history = {
            'train_loss': [], 'val_loss': []
        }

    def train(self, patience=3, min_delta=1e-4):
        best_val_loss = float('inf')
        patience_counter = 0

        for epoch in range(self.epochs):
            self.model.train()
            train_loss = 0

            for x, _ in self.train_loader:
                x = x.to(self.device)
                self.optimizer.zero_grad()
                output = self.model(x)
                loss = self.criterion(output, x)
                loss.backward()
                self.optimizer.step()
                train_loss += loss.item()

            avg_train_loss = train_loss / len(self.train_loader)

            # validation
            self.model.eval()
            val_loss = 0
            with torch.no_grad():
                for x, _ in self.val_loader:
                    x = x.to(self.device)
                    output = self.model(x)
                    loss = self.criterion(output, x)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(self.val_loader)

            print(f"[{epoch+1}/{self.epochs}] Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

            self.history['train_loss'].append(avg_train_loss)
            self.history['val_loss'].append(avg_val_loss)

            # early stopping
            if avg_val_loss < best_val_loss - min_delta:
                best_val_loss = avg_val_loss
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping at epoch {epoch+1}")
                    break
  • 모델 학습 시작
epochs = 100
patience = 5
min_delta = 0.0003

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Autoencoder(input_size=train_X.shape[1], hidden_sizes=[20,10, 5]).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

trainer = Trainer(model, train_loader, val_loader, optimizer, criterion, epochs=epochs, device=device)
trainer.train(patience=patience, min_delta=min_delta)

05) 시각화하기

import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(trainer.history['train_loss'], label='Train Loss')
plt.plot(trainer.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('AutoEncoder Training & Validation Loss')
plt.legend()
plt.grid(True)
plt.show()

06) 모델 테스트

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, classification_report

class Tester:
    def __init__(self, model, data_loader, device):
        self.model = model
        self.data_loader = data_loader
        self.device = device
        self.reconstruction_errors = []
        self.y_true = []

    def calculate_errors_and_labels(self):
        self.model.eval()
        with torch.no_grad():
            for x, y in self.data_loader:
                x = x.to(self.device)
                output = self.model(x)
                mse = torch.mean((x - output) ** 2, dim=1)
                self.reconstruction_errors.extend(mse.cpu().numpy())
                self.y_true.extend(y.cpu().numpy())

        self.reconstruction_errors = np.array(self.reconstruction_errors)
        self.y_true = np.array(self.y_true)

    def evaluate(self, threshold):
        y_pred = (self.reconstruction_errors > threshold).astype(int)

        print("\n[평가 결과]")
        print(f"Threshold: {threshold:.4f}")
        print(f"Precision: {precision_score(self.y_true, y_pred):.4f}")
        print(f"Recall:    {recall_score(self.y_true, y_pred):.4f}")
        print(f"F1 Score:  {f1_score(self.y_true, y_pred):.4f}")
        print("\n[Confusion Matrix]\n", confusion_matrix(self.y_true, y_pred))
        print("\n[Classification Report]\n", classification_report(self.y_true, y_pred))
profile
2025화이팅!

0개의 댓글