기본적인 데이터 전처리는 어려울게 없었지만, torch로 autoencoder를
class 형태로 구현하고 적용하는 건 아직 노력이 많이 필요한 것 같다.
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
# 앞에서 불러온, creditcard_df 데이터 사용
# v1~v28 컬럼 선택
features = creditcard_df.iloc[:, 1:29]
# 그래프 그리기
plt.figure(figsize = (20,20))
plt.subtitle('distribution of v1~v28', fontsize = 18)
for i, feature in enumerate(features):
plt.subplot(7,4, i+1)
# kde는 비모수적인 커널을 그려주는 것
sns.histplot(v_features[feature],bins=50, kde=True)
plt.title(feature)
plt.xlabel('')
plt.ylabel('')
plt.tight_layout(rect = [0, 0.03, 1, 0.95] )
plt.show()
# 1. 원본 Amount 컬럼의 분포 시각화
plt.figure(figsize=(18, 5))
plt.subplot(1, 3, 1)
sns.histplot(creditcard_df['Amount'], bins=50, kde=True)
plt.title('Original Amount Distribution')
plt.xlabel('Amount')
plt.ylabel('Count')
plt.xlim(0, 2000) # x축 범위 제한으로 밀집된 구간 확인
# 2. 로그 변환만 적용한 Amount 컬럼의 분포 시각화
amount_log_transformed = np.log1p(creditcard_df['Amount'])
plt.subplot(1, 3, 2)
sns.histplot(amount_log_transformed, bins=50, kde=True)
plt.title('Log-transformed Amount Distribution')
plt.xlabel('Log(1 + Amount)')
plt.ylabel('Count')
# 3. 로그 변환 후 스케일링까지 적용한 Amount 컬럼의 분포 시각화
scaler = StandardScaler()
# reshape(-1, 1)을 사용하여 1D 데이터를 2D 배열로 변환
amount_scaled = scaler.fit_transform(amount_log_transformed.values.reshape(-1, 1))
amount_scaled_series = pd.Series(amount_scaled.flatten(), name='Scaled Amount')
plt.subplot(1, 3, 3)
sns.histplot(amount_scaled_series, bins=50, kde=True)
plt.title('Log-transformed & Scaled Amount Distribution')
plt.xlabel('Scaled Amount')
plt.ylabel('Count')
plt.tight_layout() # 서브플롯 간 간격 조절
plt.show()
# 스케일링된 데이터의 평균과 표준편차 확인
print("스케일링된 데이터의 평균:", amount_scaled_series.mean())
print("스케일링된 데이터의 표준편차:", amount_scaled_series.std())
01) 데이터 불러오기
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
# 1. Time column 제거
data_df = creditcard_df.drop(['Time'], axis=1)
# 2. Amount column 로그 변환 후 표준화
# np.log 사용해서 0인 경우도 안전하게 처리
data_df['Amount'] = np.log(data_df['Amount'])
# StandardScaler 적용
scaler = StandardScaler()
data_scaled_features = scaler.fit_transform(data_df.drop('Class'), axis=1)
02) 데이터 분리
# 원본 데이터에서 정상/사기 분리
normal_df = data_df[data_df['Class']==0]
fraud_df = data_df[data_df['Class']==1]
train_normal, val_normal = train_test_split(normal_df, test_size=0.2, random_state=42)
test_data_all = pd.concat([val_normal, fraud_df])
train_X = train_normal.drop('Class', axis=1).values
train_y = train_normal['Class'].values
val_X = val_normal.drop('Class', axis=1).values
val_y = val_normal['Class'].values
test_X = test_data_all.drop('Class', axis=1).values
test_y = test_data_all['Class'].values
train_data_tensor = torch.tensor(train_X, dtype=torch.float32)
train_labels_tesnor = torch.tensor(train_y, dtype=torch.float32)
val_data_tensor = torch.tensor(val_X, dtype=torch.float32)
val_labels_tensor = torch.tensor(val_y, dtype=torch.floaat32)
test_data_tensor = torch.tensor(test_X, dtype=torch.float32)
test_labels_tensor = torch.tensor(test_y, dtype=torch.float32)
from torch.utils.data import Dataset, DataLoader
class CreditCardDataset(Dataset):
def __init__(self, data, labels):
self.data = data
self.labels = labels
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
# Dataset 만들기
train_dataset = CreditCardDataset(train_data_tensor, train_labels_tensor)
val_dataset = CreditCardDataset(val_data_tensor, val_labels_tensor)
test_dataset = CreditCardDataset(test_data_tensor, test_labels_tensor)
# DataLoader 만들기
train_loader = DataLoader(train_dataset, batch_size = 1024, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = 1024, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = 1024, shuffle = True)
03) 오토인코더 모델 클래스 정의
class Autoencoder(nn.Module):
def __init__(self, input_size, hidden_sizes):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_size, hidden_sizes[0]),
nn.ReLU(),
nn.Linear(hidden_sizes[0], hidden_sizes[1]),
nn.ReLU()
)
self.decoder = nn.Sequential(
nn.Linear(hidden_sizes[1], hidden_sizes[0]),
nn.ReLU(),
nn.Linear(hidden_sizes[0], input_size)
)
def forward(self, x):
return self.decoder(self.encoder(x))
04) 모델 학습하기
class Trainer:
def __init__(self, model, train_loader, val_loader, optimizer, criterion, epochs, device):
self.model = model
self.train_loader = train_loader
self.val_loader = val_loader
self.optimizer = optimizer
self.criterion = criterion
self.epochs = epochs
self.device = device
self.history = {
'train_loss': [], 'val_loss': []
}
def train(self, patience=3, min_delta=1e-4):
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(self.epochs):
self.model.train()
train_loss = 0
for x, _ in self.train_loader:
x = x.to(self.device)
self.optimizer.zero_grad()
output = self.model(x)
loss = self.criterion(output, x)
loss.backward()
self.optimizer.step()
train_loss += loss.item()
avg_train_loss = train_loss / len(self.train_loader)
# validation
self.model.eval()
val_loss = 0
with torch.no_grad():
for x, _ in self.val_loader:
x = x.to(self.device)
output = self.model(x)
loss = self.criterion(output, x)
val_loss += loss.item()
avg_val_loss = val_loss / len(self.val_loader)
print(f"[{epoch+1}/{self.epochs}] Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
self.history['train_loss'].append(avg_train_loss)
self.history['val_loss'].append(avg_val_loss)
# early stopping
if avg_val_loss < best_val_loss - min_delta:
best_val_loss = avg_val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
print(f"Early stopping at epoch {epoch+1}")
break
epochs = 100
patience = 5
min_delta = 0.0003
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Autoencoder(input_size=train_X.shape[1], hidden_sizes=[20,10, 5]).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
trainer = Trainer(model, train_loader, val_loader, optimizer, criterion, epochs=epochs, device=device)
trainer.train(patience=patience, min_delta=min_delta)
05) 시각화하기
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(trainer.history['train_loss'], label='Train Loss')
plt.plot(trainer.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('AutoEncoder Training & Validation Loss')
plt.legend()
plt.grid(True)
plt.show()
06) 모델 테스트
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, classification_report
class Tester:
def __init__(self, model, data_loader, device):
self.model = model
self.data_loader = data_loader
self.device = device
self.reconstruction_errors = []
self.y_true = []
def calculate_errors_and_labels(self):
self.model.eval()
with torch.no_grad():
for x, y in self.data_loader:
x = x.to(self.device)
output = self.model(x)
mse = torch.mean((x - output) ** 2, dim=1)
self.reconstruction_errors.extend(mse.cpu().numpy())
self.y_true.extend(y.cpu().numpy())
self.reconstruction_errors = np.array(self.reconstruction_errors)
self.y_true = np.array(self.y_true)
def evaluate(self, threshold):
y_pred = (self.reconstruction_errors > threshold).astype(int)
print("\n[평가 결과]")
print(f"Threshold: {threshold:.4f}")
print(f"Precision: {precision_score(self.y_true, y_pred):.4f}")
print(f"Recall: {recall_score(self.y_true, y_pred):.4f}")
print(f"F1 Score: {f1_score(self.y_true, y_pred):.4f}")
print("\n[Confusion Matrix]\n", confusion_matrix(self.y_true, y_pred))
print("\n[Classification Report]\n", classification_report(self.y_true, y_pred))