
TranAD (Transformer-based Anomaly Detection)는 다변량 시계열 데이터(Multivariate Time Series Data)의 이상 탐지 성능을 혁신적으로 개선한 모델
트랜스포머의 강력한 어텐션(Attention) 메커니즘을 오토인코더(AutoEncoder) 구조에 통합 -> 적대적 학습 및 셀프-컨디셔닝 기법을 추가
TranAD는 입력 시계열 데이터를 효율적으로 압축하고 복원하는 오토인코더의 기본 틀을 따름
인코더 (Encoder):
디코더 (Decoder):
모델 학습이 완료되면, 이상 탐지는 재구성 오류(Reconstruction Error)를 기반으로 수행
TranAD는 이러한 독특한 구조와 학습 방식을 통해 복잡한 다변량 시계열 환경에서 빠른 속도와 높은 정확도를 모두 달성한 최신 이상 탐지 모델로 평가받고 있음
pip install deepod pandas numpy torch
import pandas as pd
import numpy as np
import torch
from deepod.models.time_series import TranAD
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
# --- 설정 (Config) ---
FILE_PATH = 'your_sensor_data.csv' # 실제 파일 경로로 변경하세요
OUTPUT_PATH = 'tranad_anomaly_scores.csv'
N_FEATURES = 10 # 센서 데이터의 개수
WINDOW_SIZE = 10 # TranAD가 고려할 시간 윈도우 크기
EPOCHS = 100
BATCH_SIZE = 128
TRAIN_RATIO = 0.8 # 학습 데이터 비율 (정상 데이터만으로 학습 가정)
# --- 1. 데이터 로드 및 전처리 함수 ---
def preprocess_data(df, window_size, train_ratio):
# DTTM 열 분리
dttm_col = df['DTTM']
data = df.drop(columns=['DTTM']).values.astype(np.float32)
# 정규화 (StandardScaler 사용)
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
# 학습 및 테스트 데이터 분할 (시간 순서대로)
train_size = int(len(data_scaled) * train_ratio)
train_data = data_scaled[:train_size]
test_data = data_scaled[train_size:]
test_dttm = dttm_col[train_size:]
# 시퀀스(윈도우) 분할 함수
def create_sequences(data, window):
sequences = []
for i in range(len(data) - window + 1):
sequences.append(data[i:i + window])
return np.array(sequences)
# 데이터셋 생성
train_sequences = create_sequences(train_data, window_size)
test_sequences = create_sequences(test_data, window_size)
# 윈도우 생성 후 손실로 인해 길이가 줄어든 DTTM 인덱스 맞추기
# TranAD는 윈도우의 마지막 시점(t)에 대한 이상 점수를 출력합니다.
test_dttm_aligned = test_dttm[window_size - 1:].reset_index(drop=True)
return train_sequences, test_sequences, test_dttm_aligned
# --- 2. 모델 학습 및 이상치 스코어 계산 함수 ---
def run_tranad(train_seqs, test_seqs, n_features, window_size, epochs, batch_size):
# PyTorch 장치 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 파라미터 튜닝: TranAD는 'seq_len'과 'n_features'가 중요합니다.
# 그 외 'epochs', 'lr', 'batch_size' 등을 튜닝할 수 있습니다.
model = TranAD(
seq_len=window_size,
n_features=n_features,
epochs=epochs,
batch_size=batch_size,
lr=0.001, # 학습률 (Learning Rate) 튜닝 가능
device=device
)
print(f"\n--- TranAD 학습 시작 (Device: {device}) ---")
# 학습: TranAD는 비지도 학습이므로 X_train만 사용
model.fit(train_seqs)
print("--- TranAD 테스트 시작 ---")
# 예측: 테스트 데이터에 대한 이상치 스코어 계산
# 'score_t'는 시점(time point)별 이상치 스코어입니다.
# scores_t의 길이는 test_seqs의 길이와 같습니다.
scores = model.predict(test_seqs, return_score=True)
anomaly_scores_t = scores['score_t']
return anomaly_scores_t
# --- 메인 실행 ---
if __name__ == '__main__':
# 1. 데이터 로드 (DTTM, Sensor1, Sensor2, ..., Sensor10)
try:
df = pd.read_csv(FILE_PATH)
if 'DTTM' not in df.columns:
raise ValueError("CSV 파일에 'DTTM' 열이 포함되어 있어야 합니다.")
# 데이터의 형태를 확인합니다.
print(f"원본 데이터 크기: {df.shape}")
except FileNotFoundError:
print(f"오류: 파일을 찾을 수 없습니다. 경로를 확인하세요: {FILE_PATH}")
# 예시 데이터프레임 생성 (테스트용)
# 실제 데이터를 사용하는 경우 이 부분을 주석 처리하고 위의 FILE_PATH를 수정하세요.
time_steps = 1000
df = pd.DataFrame({
'DTTM': pd.to_datetime(pd.Series(range(time_steps)).apply(lambda x: f'2024-01-01 00:00:{x:02}')),
**{f'Sensor{i}': np.sin(np.linspace(0, 10, time_steps)) + np.random.normal(0, 0.1, time_steps) + i for i in range(1, N_FEATURES + 1)}
})
# 인위적인 이상치 주입 (테스트용)
df.loc[800:810, 'Sensor1'] += 10
print("예시 데이터를 생성하여 사용합니다.")
# 2. 데이터 전처리
train_sequences, test_sequences, test_dttm_aligned = preprocess_data(df, WINDOW_SIZE, TRAIN_RATIO)
print(f"학습 시퀀스 크기: {train_sequences.shape}")
print(f"테스트 시퀀스 크기: {test_sequences.shape}")
print(f"정렬된 DTTM 크기: {test_dttm_aligned.shape}")
# 3. TranAD 모델 학습 및 스코어 계산
anomaly_scores_t = run_tranad(
train_sequences,
test_sequences,
N_FEATURES,
WINDOW_SIZE,
EPOCHS,
BATCH_SIZE
)
# 4. 결과물 생성 및 저장
# 초당(Row 단위) 스코어를 DataFrame으로 변환
results_df = pd.DataFrame({
'DTTM': test_dttm_aligned,
'Anomaly_Score': anomaly_scores_t.flatten() # 스코어는 1차원 벡터
})
# 스코어가 높을수록 이상치일 확률이 높습니다.
results_df.to_csv(OUTPUT_PATH, index=False)
print(f"\n--- 완료 ---")
print(f"이상치 스코어가 '{OUTPUT_PATH}' 파일에 저장되었습니다.")
print(f"저장된 결과 크기: {results_df.shape}")
| 변수 | 설명 | 중요도 |
|---|---|---|
| FILE_PATH | 분석할 센서 데이터 CSV 파일 경로 | 높음 |
| OUTPUT_PATH | 이상치 스코어를 저장할 CSV 파일 경로 | 중간 |
| N_FEATURES | 센서 데이터(DTTM 제외)의 개수 (열 개수) | 높음 |
| WINDOW_SIZE | TranAD가 이상 판단을 위해 참고할 시간 길이 (W) | 매우 높음 (튜닝 요소) |
| TRAIN_RATIO | 학습 데이터로 사용할 데이터의 비율 (0.8 = 80%) | 중간 |