TranAD

이수진·2025년 10월 26일
0

TranAD: 시계열 이상 탐지를 위한 딥 트랜스포머 네트워크 분석

TranAD (Transformer-based Anomaly Detection)는 다변량 시계열 데이터(Multivariate Time Series Data)의 이상 탐지 성능을 혁신적으로 개선한 모델
트랜스포머의 강력한 어텐션(Attention) 메커니즘을 오토인코더(AutoEncoder) 구조에 통합 -> 적대적 학습셀프-컨디셔닝 기법을 추가

1. Transformer 기반 오토인코더 구조: 특징 추출과 재구성의 핵심

TranAD는 입력 시계열 데이터를 효율적으로 압축하고 복원하는 오토인코더의 기본 틀을 따름

  • 인코더 (Encoder):

    • 역할: 입력된 시계열 데이터 윈도우(Window)에서 장기적인 시간적 패턴다변수 간의 복잡한 상호 관계를 추출하여 잠재 공간(Latent Space) 벡터로 압축
    • 핵심: 트랜스포머의 셀프-어텐션을 사용하여, 시퀀스 내 모든 시간 단계와 변수 간의 연관성을 동시에 파악
  • 디코더 (Decoder):

    • 역할: 인코더의 잠재 벡터를 다시 원래의 시계열 형태로 재구성(Reconstruct)합니다.
    • 특징: TranAD는 후술할 적대적 학습을 위해 특별히 두 개의 디코더(Decoder 1, Decoder 2)를 사용

2. 핵심 메커니즘: 모델 안정성과 이상 증폭

🌟 핵심 1: 셀프-컨디셔닝 (Self-Conditioning)

  • 목적: 모델이 데이터의 다양한 잠재적 특징(Multi-modal Features)을 포착하고 특징 추출 능력을 강화
  • 원리: 인코더 입력에 포커스 스코어(Focus Score)라는 보조 정보를 도입
    • 이 스코어는 모델이 학습 과정에서 스스로 중요한 시간 단계나 특징에 더 많은 주의를 기울이도록 유도
    • 즉, 모델이 자체적으로 가장 정보가 풍부한 부분에 집중하도록 '컨디셔닝'을 가하는 것입니다.

⚔️ 핵심 2: 적대적 학습 (Adversarial Training)

  • 목적: 정상 데이터는 정확하게 재구성하고, 이상 데이터는 재구성에 실패하도록(오류가 커지도록) 유도하여 이상 신호를 증폭
  • 원리: GAN(Generative Adversarial Network) 아이디어를 차용하여 두 디코더가 경쟁적으로 학습
    • Decoder 1 (Reconstruction Decoder): 실제 입력 데이터와 재구성 결과가 유사해지도록(정상 패턴을 잘 복원하도록) 학습
    • Decoder 2 (Prediction Decoder): 디코더 1의 출력과 다른 방향으로 재구성하도록 유도
      • 결과적으로, 정상 데이터에 대해서는 두 디코더의 결과가 모두 정확하지만 이상 데이터가 들어오면 이 둘의 재구성 결과가 크게 벌어지면서 이상 점수가 극대화

3. 이상 탐지 (Anomaly Detection) 과정

모델 학습이 완료되면, 이상 탐지는 재구성 오류(Reconstruction Error)를 기반으로 수행

  • 정상 데이터: 모델은 정상 패턴만으로 학습되었기 때문에 정상 시퀀스가 입력되면 재구성된 데이터 X^\hat{X}와 원본 데이터 XX 간의 차이(오류)가 작음
  • 이상 데이터: 이상 징후를 포함한 시퀀스가 입력되면 모델이 이를 [정상]으로 재구성하지 못해 XXX^\hat{X} 간의 차이가 매우 커짐
  • 이상 점수 (Anomaly Score): 이 재구성 오류(예: L1L_1 또는 L2L_2 거리)가 바로 이상 점수가 되며, 이 점수가 미리 정한 임계값(Threshold)을 초과하면 해당 시점을 최종적으로 이상(Anomaly)으로 판별

Anomaly Score=XX^\text{Anomaly Score} = || X - \hat{X} ||

TranAD는 이러한 독특한 구조와 학습 방식을 통해 복잡한 다변량 시계열 환경에서 빠른 속도높은 정확도를 모두 달성한 최신 이상 탐지 모델로 평가받고 있음

pip install deepod pandas numpy torch

import pandas as pd
import numpy as np
import torch
from deepod.models.time_series import TranAD
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset

# --- 설정 (Config) ---
FILE_PATH = 'your_sensor_data.csv'  # 실제 파일 경로로 변경하세요
OUTPUT_PATH = 'tranad_anomaly_scores.csv'
N_FEATURES = 10  # 센서 데이터의 개수
WINDOW_SIZE = 10  # TranAD가 고려할 시간 윈도우 크기
EPOCHS = 100
BATCH_SIZE = 128
TRAIN_RATIO = 0.8  # 학습 데이터 비율 (정상 데이터만으로 학습 가정)

# --- 1. 데이터 로드 및 전처리 함수 ---

def preprocess_data(df, window_size, train_ratio):
    # DTTM 열 분리
    dttm_col = df['DTTM']
    data = df.drop(columns=['DTTM']).values.astype(np.float32)

    # 정규화 (StandardScaler 사용)
    scaler = StandardScaler()
    data_scaled = scaler.fit_transform(data)

    # 학습 및 테스트 데이터 분할 (시간 순서대로)
    train_size = int(len(data_scaled) * train_ratio)
    train_data = data_scaled[:train_size]
    test_data = data_scaled[train_size:]
    test_dttm = dttm_col[train_size:]

    # 시퀀스(윈도우) 분할 함수
    def create_sequences(data, window):
        sequences = []
        for i in range(len(data) - window + 1):
            sequences.append(data[i:i + window])
        return np.array(sequences)

    # 데이터셋 생성
    train_sequences = create_sequences(train_data, window_size)
    test_sequences = create_sequences(test_data, window_size)

    # 윈도우 생성 후 손실로 인해 길이가 줄어든 DTTM 인덱스 맞추기
    # TranAD는 윈도우의 마지막 시점(t)에 대한 이상 점수를 출력합니다.
    test_dttm_aligned = test_dttm[window_size - 1:].reset_index(drop=True)

    return train_sequences, test_sequences, test_dttm_aligned

# --- 2. 모델 학습 및 이상치 스코어 계산 함수 ---

def run_tranad(train_seqs, test_seqs, n_features, window_size, epochs, batch_size):
    # PyTorch 장치 설정
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # 파라미터 튜닝: TranAD는 'seq_len'과 'n_features'가 중요합니다.
    # 그 외 'epochs', 'lr', 'batch_size' 등을 튜닝할 수 있습니다.
    model = TranAD(
        seq_len=window_size,
        n_features=n_features,
        epochs=epochs,
        batch_size=batch_size,
        lr=0.001,  # 학습률 (Learning Rate) 튜닝 가능
        device=device
    )

    print(f"\n--- TranAD 학습 시작 (Device: {device}) ---")
    # 학습: TranAD는 비지도 학습이므로 X_train만 사용
    model.fit(train_seqs)

    print("--- TranAD 테스트 시작 ---")
    # 예측: 테스트 데이터에 대한 이상치 스코어 계산
    # 'score_t'는 시점(time point)별 이상치 스코어입니다.
    # scores_t의 길이는 test_seqs의 길이와 같습니다.
    scores = model.predict(test_seqs, return_score=True)
    anomaly_scores_t = scores['score_t']

    return anomaly_scores_t

# --- 메인 실행 ---
if __name__ == '__main__':
    # 1. 데이터 로드 (DTTM, Sensor1, Sensor2, ..., Sensor10)
    try:
        df = pd.read_csv(FILE_PATH)
        if 'DTTM' not in df.columns:
            raise ValueError("CSV 파일에 'DTTM' 열이 포함되어 있어야 합니다.")
        
        # 데이터의 형태를 확인합니다.
        print(f"원본 데이터 크기: {df.shape}")
        
    except FileNotFoundError:
        print(f"오류: 파일을 찾을 수 없습니다. 경로를 확인하세요: {FILE_PATH}")
        # 예시 데이터프레임 생성 (테스트용)
        # 실제 데이터를 사용하는 경우 이 부분을 주석 처리하고 위의 FILE_PATH를 수정하세요.
        time_steps = 1000
        df = pd.DataFrame({
            'DTTM': pd.to_datetime(pd.Series(range(time_steps)).apply(lambda x: f'2024-01-01 00:00:{x:02}')),
            **{f'Sensor{i}': np.sin(np.linspace(0, 10, time_steps)) + np.random.normal(0, 0.1, time_steps) + i for i in range(1, N_FEATURES + 1)}
        })
        # 인위적인 이상치 주입 (테스트용)
        df.loc[800:810, 'Sensor1'] += 10
        print("예시 데이터를 생성하여 사용합니다.")
        

    # 2. 데이터 전처리
    train_sequences, test_sequences, test_dttm_aligned = preprocess_data(df, WINDOW_SIZE, TRAIN_RATIO)
    
    print(f"학습 시퀀스 크기: {train_sequences.shape}")
    print(f"테스트 시퀀스 크기: {test_sequences.shape}")
    print(f"정렬된 DTTM 크기: {test_dttm_aligned.shape}")


    # 3. TranAD 모델 학습 및 스코어 계산
    anomaly_scores_t = run_tranad(
        train_sequences, 
        test_sequences, 
        N_FEATURES, 
        WINDOW_SIZE, 
        EPOCHS, 
        BATCH_SIZE
    )

    # 4. 결과물 생성 및 저장
    
    # 초당(Row 단위) 스코어를 DataFrame으로 변환
    results_df = pd.DataFrame({
        'DTTM': test_dttm_aligned,
        'Anomaly_Score': anomaly_scores_t.flatten()  # 스코어는 1차원 벡터
    })

    # 스코어가 높을수록 이상치일 확률이 높습니다.
    results_df.to_csv(OUTPUT_PATH, index=False)
    
    print(f"\n--- 완료 ---")
    print(f"이상치 스코어가 '{OUTPUT_PATH}' 파일에 저장되었습니다.")
    print(f"저장된 결과 크기: {results_df.shape}")
   
변수설명중요도
FILE_PATH분석할 센서 데이터 CSV 파일 경로높음
OUTPUT_PATH이상치 스코어를 저장할 CSV 파일 경로중간
N_FEATURES센서 데이터(DTTM 제외)의 개수 (열 개수)높음
WINDOW_SIZETranAD가 이상 판단을 위해 참고할 시간 길이 (W)매우 높음 (튜닝 요소)
TRAIN_RATIO학습 데이터로 사용할 데이터의 비율 (0.8 = 80%)중간
profile
뇽안

0개의 댓글