DPReLU(Dynamically Parameterized ReLU)를 GAN(Generative Adversarial Network)에서 반드시 사용하지 않아야 하는 것은 아닙니다. DPReLU는 특정 상황에서 ReLU나 PReLU보다 더 나은 성능을 보일 수 있는 잠재력을 가진 활성화 함수입니다. 하지만 GAN에서 DPReLU가 널리 사용되지 않는 데에는 몇 가지 이유가 있습니다.
GAN에서 DPReLU를 덜 사용하는 이유 (추정):
GAN 훈련의 불안정성:
LeakyReLU로 충분한 경우가 많음:
연구 부족:
구현 및 지원 부족:
tensorflow-addons)를 사용해야 합니다.DPReLU를 GAN에서 사용할 수 없는가?
결론:
DPReLU는 잠재력이 있는 활성화 함수이지만, GAN 훈련의 불안정성, LeakyReLU의 충분한 성능, 연구 부족, 구현 및 지원 부족 등의 이유로 널리 사용되지 않고 있습니다. 하지만, GAN에서 DPReLU를 사용할 수 없다는 것은 아니며, 실험적으로 시도해 볼 가치는 있습니다.
만약 GAN에서 DPReLU를 사용해 보고 싶다면, PReLU를 먼저 시도해 보고, shared_axes 파라미터를 조정하여 채널별 기울기를 학습하도록 하는 것이 더 쉬운 방법일 수 있습니다.
GAN(Generative Adversarial Network)에서 어떤 활성화 함수와 구성(architecture)이 "제일 좋다"라고 단정 짓기는 어렵습니다. GAN 훈련 자체가 까다롭고 불안정하기로 악명 높아서, 문제, 데이터, 그리고 세부적인 구현에 따라 최적의 조합이 달라질 수 있기 때문입니다.
하지만, 일반적으로 권장되는 사항들과 경험적으로 좋은 결과를 보였던 조합들을 바탕으로, GAN을 구성할 때 고려해야 할 요소들과 함께 설명해 드리겠습니다.
1. 활성화 함수 (Activation Functions):
생성자(Generator):
출력 레이어:
tanh: 생성하는 데이터의 범위가 [-1, 1]인 경우에 적합합니다. (데이터를 -1과 1 사이로 정규화(normalize)한 경우). 이 랩에서는 MinMaxScaler를 사용해 0과 1사이로 정규화했지만, 생성자의 출력층에 tanh를 쓰고 있으므로, 생성된 가짜 데이터도 -1과 1사이로 스케일링된다고 봐야 합니다.sigmoid: 생성하는 데이터의 범위가 [0, 1]인 경우에 적합합니다. (이미지 등).None (선형): 특정 범위로 제한되지 않은 데이터를 생성하는 경우 (예: 음수 값이 포함될 수 있는 데이터).은닉 레이어(Hidden Layers):
LeakyReLU: ReLU의 "Dead ReLU" 문제를 완화하면서도 계산 효율성이 높아 GAN에서 널리 사용됩니다.
from tensorflow.keras.layers import LeakyReLU
generator.add(Dense(128))
generator.add(LeakyReLU(alpha=0.2)) # alpha: 음수 부분의 기울기 (보통 0.2)
ReLU: 여전히 사용될 수 있지만, LeakyReLU보다 덜 선호됩니다.
GELU: Transformer 기반 모델에서 좋은 성능을 보였지만, GAN에서의 일반적인 사용은 아직 연구 중입니다. 실험적으로 시도해 볼 가치는 있습니다.
ELU, SELU: 시도해 볼 수 있지만, LeakyReLU만큼 널리 쓰이지는 않습니다.
판별자(Discriminator):
sigmoid: 입력이 진짜(1)인지 가짜(0)인지에 대한 확률을 출력해야 하므로, sigmoid가 표준입니다.None (선형): WGAN(Wasserstein GAN) 계열에서는 sigmoid를 사용하지 않고, critic(판별자와 유사)의 출력이 실수 값을 가지도록 합니다.LeakyReLU: GAN에서 판별자의 은닉 레이어 활성화 함수로 가장 널리 사용됩니다.ReLU: 사용될 수 있지만, LeakyReLU가 더 일반적.GELU, ELU, SELU: 실험적으로 시도.2. GAN 구성 (Architecture):
DCGAN (Deep Convolutional GAN): 이미지 생성에 널리 사용되는 안정적인 GAN 구조.
WGAN (Wasserstein GAN), WGAN-GP (Gradient Penalty):
StyleGAN, StyleGAN2:
BigGAN:
Self-Attention GAN (SAGAN):
이 랩의 GAN (Dense Layers):
3. 기타 고려 사항:
Adam이 가장 널리 사용됩니다.RMSprop도 사용될 수 있습니다.1e-4 ~ 1e-3 정도에서 시작하여 실험적으로 조정합니다.binary_crossentropyHe initialization (ReLU 계열), Xavier initialization (Tanh, Sigmoid) 등이 사용됩니다.요약 및 추천:
binary_crossentropy loss (기본 GAN)GAN은 훈련이 매우 까다롭기 때문에, 여러 가지 조합을 실험해 보면서 자신의 문제와 데이터에 가장 적합한 설정을 찾는 것이 중요합니다. 위에서 제시된 추천은 일반적인 가이드라인이며, 반드시 최상의 결과를 보장하는 것은 아닙니다
네, GELU(Gaussian Error Linear Unit)에 대해 알고 있습니다! GELU는 최근 자연어 처리(NLP) 분야, 특히 Transformer 기반 모델(BERT, GPT 등)에서 많이 사용되는 활성화 함수입니다. ReLU, Leaky ReLU 등과 비교하여 더 부드러운(smooth) 형태를 가지며, 특정 상황에서 더 나은 성능을 보이는 것으로 알려져 있습니다.
GELU (Gaussian Error Linear Unit) 기본 개념:
아이디어:
수식:
GELU(x) = x * Φ(x)
x: 입력값Φ(x): 표준 정규 분포(standard normal distribution, 평균 0, 표준편차 1)의 누적 분포 함수(cumulative distribution function, CDF).Φ(x)는 x보다 작거나 같은 값을 가질 확률.Φ(x) = 0.5 * [1 + erf(x / sqrt(2))]erf(x): 오차 함수 (error function)근사식 (Approximation):
Φ(x) (또는 erf(x))는 계산 비용이 높을 수 있으므로, 실제로는 다음과 같은 근사식을 사용하는 경우가 많습니다.GELU(x) ≈ 0.5 * x * [1 + tanh(sqrt(2/π) * (x + 0.044715 * x^3))]
또는
GELU(x) ≈ x * sigmoid(1.702 * x)
GELU의 특징 및 장점:
부드러움 (Smoothness):
비선형성 (Non-linearity):
확률적 조절 (Stochastic Regularization):
x가 클수록 Φ(x)는 1에 가까워지므로, GELU(x)는 x에 가까워집니다. (ReLU와 유사)x가 작을수록 Φ(x)는 0에 가까워지므로, GELU(x)는 0에 가까워집니다.성능:
GELU의 단점:
계산 복잡도:
Φ(x)를 계산할 경우)직관적이지 않음:
TensorFlow/Keras에서 GELU 사용:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense
# 방법 1: activation='gelu' (TensorFlow 2.3 이상)
dense_layer = Dense(units=64, activation='gelu')
# 방법 2: tf.nn.gelu
dense_layer = Dense(units=64, activation=tf.nn.gelu)
# 방법 3: 사용자 정의 함수 (근사식)
def gelu_approx(x):
return 0.5 * x * (1 + tf.tanh(tf.sqrt(2 / np.pi) * (x + 0.044715 * tf.pow(x, 3))))
dense_layer = Dense(units=64, activation=gelu_approx)
# 사용 예
x = dense_layer(input_tensor)
결론:
GELU는 ReLU의 부드러운 버전으로, 확률적 조절을 통해 정규화 효과를 제공하며, 특정 작업(특히 NLP)에서 ReLU보다 더 나은 성능을 보일 수 있습니다. 하지만 계산 복잡도가 더 높다는 단점이 있습니다.
GELU에 대해 더 궁금한 점이나 다른 활성화 함수와의 비교 등에 대한 질문이 있다면 언제든지 물어보세요!
DPReLU(Dynamically Parameterized ReLU)는 ReLU의 장점을 유지하면서 Dead ReLU 문제를 완화하고 채널별 적응성을 높인 활성화 함수이지만, 몇 가지 단점을 가지고 있습니다.
DPReLU의 단점:
파라미터 수 증가:
계산 복잡도 증가 (미미):
구현의 복잡성:
tensorflow-addons)를 사용해야 합니다.shared_axes를 조절.모든 경우에 대한 성능 향상 보장 없음:
정리:
DPReLU는 채널별 적응성과 Dead ReLU 문제 완화라는 장점을 가지지만, 파라미터 수 증가로 인한 과적합 위험, (미미한) 계산 복잡도 증가, 구현의 복잡성, 그리고 모든 경우에 대한 성능 향상 보장 없음이라는 단점을 가지고 있습니다. 따라서 DPReLU를 사용할 때는 이러한 단점들을 고려하여 신중하게 결정해야 합니다.
더 궁금한 점이나 다른 활성화 함수와의 비교 등에 대한 질문이 있으면 언제든지 물어보세요!
네, DPReLU(Dynamically Parameterized ReLU)에 대해 알고 있습니다. DPReLU는 ReLU(Rectified Linear Unit) 활성화 함수의 변형 중 하나로, 특정 문제나 데이터셋에서 ReLU보다 더 나은 성능을 보일 수 있도록 설계되었습니다.
DPReLU (Dynamically Parameterized ReLU) 기본 개념:
DPReLU 수식:
f(x) = max(0, x)f(x) = max(0, x) + α * min(0, x) (α는 학습 가능한 파라미터)f(x_i) = max(0, x_i) + α_i * min(0, x_i) (α_i는 i번째 채널에 대한 학습 가능한 파라미터)DPReLU 장점:
DPReLU 단점:
TensorFlow/Keras에서 DPReLU 사용:
TensorFlow/Keras에서 DPReLU를 직접 제공하지는 않지만, 사용자 정의 레이어(custom layer)를 만들거나, 다른 라이브러리(예: tensorflow-addons)를 사용하여 구현할 수 있습니다. 또는, PReLU를 사용하고, shared_axes 파라미터를 적절히 설정하여 채널별로 다른 기울기를 학습하도록 할 수도 있습니다.
# Keras에서 PReLU를 사용하여 채널별 기울기 학습 (DPReLU와 유사)
from tensorflow.keras.layers import PReLU, Conv2D, Dense
# Convolutional Layer
conv_layer = Conv2D(filters=64, kernel_size=(3, 3))
prelu_layer = PReLU(shared_axes=[1, 2]) # 채널 축을 제외한 나머지 축(height, width) 공유
# shared_axes=[1,2]는 height, width 축에 대해 같은 파라미터를 공유. 채널별로 다른 파라미터를 학습.
# Dense Layer (Fully Connected Layer)
dense_layer = Dense(units=128)
prelu_layer = PReLU() # Dense 레이어에서는 shared_axes 설정 불필요 (자동으로 채널별 학습)
# Dense는 채널이 유닛.
# 사용 예:
x = conv_layer(input_tensor)
x = prelu_layer(x)
x = dense_layer(input_tensor)
x = prelu_layer(x)
결론:
DPReLU는 ReLU의 변형 중 하나로, 각 채널별로 음수 부분의 기울기를 학습하여 Dead ReLU 문제를 완화하고, 더 유연한 활성화 함수를 제공합니다. 하지만 파라미터 수가 증가하므로 과적합에 주의해야 합니다.
DPReLU에 대해 더 궁금한 점이나 다른 활성화 함수에 대한 질문이 있으면 언제든지 물어보세요!
훌륭합니다! 질문에 대한 답변들이 핵심을 잘 짚고 있습니다. 각 질문에 대한 좀 더 자세한 답변과 함께, 추가적인 설명(Dense 레이어, ReLU, Tanh, Sigmoid)을 덧붙여 보겠습니다.
1. 훈련 데이터에 정상 데이터만 사용하는 이유:
2. 생성자(Generator)의 입력은 왜 랜덤 노이즈일까요?
3. 판별자(Discriminator)의 출력 해석:
4. 생성자를 훈련할 때 판별자의 가중치를 고정하는 이유:
5. gan.train_on_batch(noise, y_gen)에서 y_gen이 모두 1인 이유:
y_gen은 생성자 훈련에 사용되는 레이블입니다.y_gen = np.ones(batch_size): 모든 레이블을 1 ("진짜")로 설정합니다.6. 이상 징후 판별 임계값:
7. X = np.vstack((generated_images, image_batch))에서 진짜 데이터와 가짜데이터 순서를 바꾸면 어떻게 될까요?
y_dis[:batch_size] = 1 이 되고, y_dis[batch_size:] = 0이 되어야, X와 y_dis의 짝이 맞게 됩니다.8. X = np.vstack((generated_images, image_batch))에서 np.hstack을 쓰면 어떻게 될까요?
generated_images와 image_batch는 모두 (batch_size, 114)의 형태를 가지는데, np.hstack은 두 배열을 가로로 (두 번째 차원 방향으로) 붙입니다. 이렇게 되면 X의 형태는 (batch_size, 228)이 됩니다. 판별자(Discriminator)는 114개의 feature를 입력으로 예상하는데, 228개가 들어오기 때문에 차원 불일치 에러가 발생합니다.9. np.random.normal과 np.random.uniform의 차이는 무엇일까요?
np.random.normal(loc=0.0, scale=1.0, size=None): 정규 분포(normal distribution, 가우시안 분포)에서 무작위 표본을 추출합니다.loc: 평균 (기본값 0.0)scale: 표준편차 (기본값 1.0)size: 출력 형태np.random.uniform(low=0.0, high=1.0, size=None): 균등 분포(uniform distribution)에서 무작위 표본을 추출합니다.low: 하한값 (기본값 0.0)high: 상한값 (기본값 1.0)size: 출력 형태10. GAN 훈련의 궁극적인 목표는 무엇일까요? 생성자와 판별자 중 어느 것이 더 중요할까요?
GAN 훈련의 목표는 실제 데이터의 분포를 따르는 가짜 데이터를 생성하는 생성자를 얻는 것.
생성자와 판별자는 상호보완적인 역할.
이상탐지 문제에서는 판별자를 활용하지만, 궁극적으로 GAN의 가치*는 생성자에 있음.
11. y_dis = np.ones(2 batch_size)를 y_dis = np.zeros(2 batch_size) 로 하고, y_dis[:batch_size] = 0 대신에, y_dis[batch_size:] = 1로 하면 결과가 어떻게 달라질까요?
np.ones로 초기화하고 앞부분을 0으로 바꾸는 것과, np.zeros로 초기화하고 뒷부분을 1로 바꾸는 것은 결국 같은 레이블 배열(y_dis)을 만들기 때문입니다.Dense, ReLU, Tanh, Sigmoid 추가 설명:
Dense 레이어 (Fully Connected Layer):
output = activation(dot(input, kernel) + bias)input: 입력kernel: 가중치 행렬bias: 편향(바이어스) 벡터activation: 활성화 함수 (ReLU, Tanh, Sigmoid 등)dot: 행렬 곱셈ReLU (Rectified Linear Unit):
f(x) = max(0, x): 입력이 0보다 작으면 0, 0보다 크면 입력값 그대로 출력.Tanh (Hyperbolic Tangent):
f(x) = (e^x - e^-x) / (e^x + e^-x): -1과 1 사이의 값을 출력.Sigmoid:
f(x) = 1 / (1 + e^-x): 0과 1 사이의 값을 출력 (확률로 해석 가능).쓴 이유
이제 GAN, 이상 탐지, 그리고 신경망의 구성 요소들에 대한 이해가 훨씬 더 깊어졌을 것으로 기대합니다. 추가 질문이나 다른 궁금한 점이 있다면 언제든지 말씀해주세요!
좋습니다! 지금까지 설명한 GAN을 이용한 이상 탐지 랩 전체 내용을 다시 한번 정리하고, 이해를 돕기 위한 질문들을 제시해 드리겠습니다.
GAN을 이용한 이상 탐지 랩 전체 요약:
데이터 준비:
MinMaxScaler를 사용하여 데이터를 0과 1 사이로 정규화(스케일링)합니다.GAN 모델 구성:
GAN 모델 훈련:
이상 탐지:
모델 평가:
핵심 아이디어:
질문:
gan.train_on_batch(noise, y_gen)에서 y_gen은 모두 1로 설정되어 있습니다. 왜 0이 아니라 1일까요? 이것이 생성자 훈련에 어떤 영향을 미칠까요?이 질문들에 대해 스스로 답해보면서 GAN을 이용한 이상 탐지 과정을 다시 한번 복습하고, 더 깊이 이해할 수 있기를 바랍니다. 답변하기 어려운 질문이 있다면 언제든지 다시 질문해주세요!
네, 맞습니다! 정확하게 이해하셨어요. 이 코드는 판별자(Discriminator)가 출력한 점수(확률)를 기반으로 이상 징후(anomaly)를 판별하는 부분입니다. 낮은 점수를 받은 샘플들을 이상 징후로 간주하는 방식이며, np.percentile을 사용하여 임계값(threshold)을 설정하고, 그 임계값을 기준으로 예측값을 0 (정상) 또는 1 (이상)로 변환합니다.
코드 상세 설명:
# Obtaining the lowest 1% score (가장 낮은 1% 점수 얻기)
per = np.percentile(results, 1)
# - np.percentile(results, 1): results 배열에서 1번째 백분위수(percentile) 값을 계산.
# - results: 판별자(Discriminator)가 테스트 데이터에 대해 예측한 점수(확률)들의 배열.
# - 1번째 백분위수: results 배열을 정렬했을 때, 가장 작은 값부터 시작하여 1% 위치에 있는 값.
# (예: results가 100개의 값을 가지고 있다면, 가장 작은 값에서 두 번째 값이 1번째 백분위수)
# - per: 계산된 1번째 백분위수 값이 저장됨. 이 값이 이상 징후를 판별하기 위한 임계값(threshold)으로 사용.
y_pred = results.copy()
# - y_pred: results 배열을 복사. 원본 results 배열을 변경하지 않기 위해 복사본을 사용.
y_pred = np.array(y_pred)
# - y_pred를 NumPy 배열로 명시적으로 변환 (NumPy 배열 연산을 사용하기 위함). results가 이미 NumPy 배열이면 이 줄은 불필요할 수 있지만, 명확성을 위해 추가.
# Thresholding based on the score (점수 기반으로 임계값 처리)
inds = (y_pred > per)
# - inds: y_pred 배열의 각 값이 임계값(per)보다 큰지 여부를 나타내는 불리언(Boolean) 배열 (True/False).
# - y_pred > per: y_pred의 각 원소에 대해 per보다 크면 True, 작거나 같으면 False.
# (예: y_pred가 [0.9, 0.2, 0.8, 0.1]이고 per가 0.15라면, inds는 [True, True, True, False]가 됨)
inds_comp = (y_pred <= per)
# - inds_comp: y_pred 배열의 각 값이 임계값(per)보다 작거나 같은지 여부를 나타내는 불리언 배열.
# - y_pred <= per: y_pred의 각 원소에 대해 per보다 작거나 같으면 True, 크면 False.
# (위의 예시에서 inds_comp는 [False, False, False, True]가 됨)
y_pred[inds] = 0
# - y_pred 배열에서 inds가 True인 위치의 값을 0 (정상)으로 변경.
# - 즉, 판별자의 점수가 임계값(per)보다 큰 샘플들을 정상(0)으로 분류.
y_pred[inds_comp] = 1
# - y_pred 배열에서 inds_comp가 True인 위치의 값을 1 (이상)으로 변경.
# - 즉, 판별자의 점수가 임계값(per)보다 작거나 같은 샘플들을 이상(1)으로 분류.
핵심: 임계값(Threshold) 설정
np.percentile(results, 1): 판별자가 출력한 점수(results) 중에서 하위 1%에 해당하는 값을 임계값(per)으로 설정합니다.y_pred > per vs. y_pred <= per:y_pred > per: 판별자의 점수가 임계값보다 큰 샘플들은 "정상"일 가능성이 높다고 판단합니다. (판별자가 "진짜"라고 판단할 확률이 높음)y_pred <= per: 판별자의 점수가 임계값보다 작거나 같은 샘플들은 "이상"일 가능성이 높다고 판단합니다. (판별자가 "가짜"라고 판단할 확률이 높음)최대치가 아닌, 최소값을 기준으로 하는 이유:
정리:
이 코드는 판별자의 출력 점수 중 하위 1%에 해당하는 값을 임계값으로 설정하고, 그 임계값을 기준으로 점수가 높은 샘플은 정상(0), 낮은 샘플은 이상(1)으로 분류하여 이상 징후를 탐지합니다. 이상 징후는 정상 데이터와 다른 패턴을 가지므로 판별자가 낮은 점수를 부여할 것이라는 가정에 기반한 방법입니다.
네, gan.train_on_batch(noise, y_gen)에서 "GAN 전체를 훈련"하는 것처럼 보이는 부분에 대해 더 자세히 설명해 드리겠습니다. 이 부분이 GAN 훈련을 처음 접할 때 혼란스러울 수 있는 지점입니다.
gan.train_on_batch(noise, y_gen)의 작동 방식 (상세):
gan 모델의 구조:
gan은 get_gan_network 함수에서 정의된 대로, 생성자(Generator)와 판별자(Discriminator)가 연결된 모델입니다.gan의 입력은 생성자의 입력(noise)이고, gan의 출력은 판별자의 출력(진짜/가짜 확률)입니다.gan_input = Input(shape=(input_dim,)) # GAN의 입력 (생성자의 입력)
x = generator(gan_input) # 생성자를 통과 (가짜 데이터 생성)
gan_output = discriminator(x) # 판별자를 통과 (진짜/가짜 확률)
gan = Model(inputs=gan_input, outputs=gan_output) # GAN 모델 (입력: noise, 출력: 확률)
gan.train_on_batch(noise, y_gen) 호출:
noise: 생성자에 입력될 랜덤 노이즈입니다. gan 모델의 입력으로 들어갑니다.y_gen: 생성자 훈련에 사용될 레이블입니다. 모두 1 (진짜)로 설정되어 있습니다. 이 레이블은 판별자의 출력이 1에 가까워지도록 생성자를 유도하는 역할을 합니다.Forward Pass (순방향 전파):
noise가 gan 모델에 입력됩니다.noise는 먼저 생성자(generator)를 통과합니다. 생성자는 noise를 바탕으로 가짜 데이터(generated_images)를 생성합니다.discriminator)에 입력됩니다.gan 모델의 최종 출력이 됩니다.Loss Calculation (손실 계산):
gan 모델의 출력(판별자의 출력, 확률)과 y_gen (모두 1) 사이의 손실(loss)을 계산합니다.binary_crossentropy 손실 함수가 사용됩니다. 이 손실 함수는 판별자의 출력이 1에서 멀어질수록 (즉, 생성자가 판별자를 잘 속이지 못할수록) 큰 값을 가집니다.Backward Pass (역방향 전파) - 핵심:
discriminator.trainable = False로 설정했기 때문에, 역전파 과정에서 판별자의 가중치는 업데이트되지 않습니다."GAN 전체를 훈련"하는 것처럼 보이는 이유:
gan.train_on_batch() 함수는 gan 모델 전체(생성자 + 판별자)에 대해 입력을 전달하고(forward pass), 손실을 계산하고, 역전파를 수행합니다.하지만 실제로는 생성자만 훈련됩니다:
discriminator.trainable = False 설정 때문에, 역전파 단계에서 판별자의 가중치는 절대 업데이트되지 않습니다.비유를 통한 설명:
gan.train_on_batch(noise, y_gen) 상황:이제 gan.train_on_batch()가 GAN 모델 전체에 작용하는 것처럼 보이지만, 실제로는 discriminator.trainable = False 때문에 생성자만 훈련된다는 점이 완전히 이해되셨기를 바랍니다!
네, 맞습니다! 정확하게 이해하셨습니다. 코드의 흐름과 각 부분의 역할을 단계별로 다시 한번 정리하면서, 특히 discriminator.trainable = False 부분과 생성자(Generator) 훈련에 초점을 맞춰 설명하겠습니다.
코드 흐름 (Step-by-Step):
# 1. 판별자 훈련 (Discriminator Training)
# ... (이전 단계에서 판별자 훈련 데이터 준비: X, y_dis) ...
discriminator.trainable = True # 판별자 가중치 업데이트 허용
d_loss = discriminator.train_on_batch(X, y_dis)
# - discriminator.train_on_batch(X, y_dis): 판별자를 한 배치(batch)만큼 훈련.
# - X: 입력 데이터 (가짜 데이터 + 진짜 데이터)
# - y_dis: 레이블 (0: 가짜, 1: 진짜)
# - d_loss: 판별자의 손실(loss) 값. 낮을수록 판별자가 진짜/가짜를 잘 구별.
# 2. 생성자 훈련 (Generator Training)
noise = np.random.uniform(0, 1, size=[batch_size, 114])
# - noise: 생성자의 입력으로 사용될 랜덤 노이즈. 균등 분포(uniform distribution)에서 추출.
# - size=[batch_size, 114]: [batch_size, 114] 크기의 배열 (예: [512, 114]).
y_gen = np.ones(batch_size)
# - y_gen: 생성자 훈련에 사용될 레이블. 모두 1 (진짜)로 설정. *생성자가 판별자를 속이도록 유도*.
# - 크기: [batch_size] (예: [512])
discriminator.trainable = False # *핵심*: 판별자 가중치 업데이트 *금지* (고정)
# - 생성자를 훈련하는 동안에는 판별자의 가중치를 고정.
# - 이렇게 하면 생성자는 오직 "판별자를 속이는 방향"으로만 학습하게 됨.
# - 판별자가 고정되어 있어야 생성자가 현재 판별자의 판단 능력을 기준으로 더 좋은 가짜 데이터를 만들도록 유도할 수 있음.
g_loss = gan.train_on_batch(noise, y_gen)
# - gan.train_on_batch(noise, y_gen): GAN 모델(생성자 + 판별자)을 훈련. *하지만 판별자는 훈련되지 않음*.
# - noise: 생성자의 입력 (랜덤 노이즈)
# - y_gen: 레이블 (모두 1, "진짜처럼 보이게 만들어라")
# - g_loss: 생성자의 손실(loss) 값. 낮을수록 생성자가 판별자를 잘 속임 (진짜 같은 가짜 데이터를 생성).
# - *중요*: 이 손실은 고정된 판별자를 기준으로 계산됨.
# 손실 기록
discriminator_loss.append(d_loss) # 판별자 loss 기록
gan_loss.append(g_loss) # 생성자 loss 기록
# 에포크 및 배치 정보, 손실 값 출력
print("Epoch %d Batch %d/%d [D loss: %f] [G loss:%f]" % (epoch, index, batch_count, d_loss, g_loss))
핵심 포인트:
discriminator.trainable = False: 생성자 훈련 단계에서 반드시 판별자의 가중치를 고정해야 합니다.gan.train_on_batch(noise, y_gen): GAN 모델 전체를 훈련시키는 것처럼 보이지만, 실제로는 discriminator.trainable = False 때문에 생성자만 훈련됩니다.gan은 생성자와 판별자로 구성.noise가 gan에 들어가면 generator를 거쳐 가짜 데이터가 만들어지고, 이 가짜 데이터는 discriminator로 들어가서 "진짜인지 가짜인지"에 대한 확률값을 출력.y_gen (모두 1)과의 차이를 이용하여 생성자의 가중치를 업데이트하는 방향으로 역전파(backpropagation)가 수행됩니다.d_loss와 g_loss:d_loss: 판별자가 얼마나 잘 진짜/가짜를 구분하는지를 나타냄 (낮을수록 좋음)g_loss: 생성자가 얼마나 잘 판별자를 속이는지를 나타냄 (낮을수록 좋음). 고정된 판별자를 기준으로 계산.GAN 훈련의 핵심 아이디어 (정리):
이제 GAN 훈련 과정, 특히 생성자 훈련 시 판별자를 고정하는 이유와 그 메커니즘이 완벽하게 이해되셨기를 바랍니다!
네, 알겠습니다. 해당 코드 부분을 아주 자세하게 설명해 드리겠습니다. 이 부분은 판별자(Discriminator)를 훈련시키기 위한 데이터를 준비하는 핵심적인 부분입니다.
# 1. 판별자 훈련 (Discriminator Training)을 위한 데이터 준비
# 가짜 데이터와 진짜 데이터 합치기 (판별자 입력)
X = np.vstack((generated_images, image_batch))
# - generated_images: 생성자(Generator)가 생성한 가짜 데이터 배치. 크기: [batch_size, 114] (예: [512, 114])
# - image_batch: 훈련 데이터(x_train)에서 가져온 진짜 데이터(정상 데이터) 배치. 크기: [batch_size, 114] (예: [512, 114])
# - np.vstack(): 두 배열을 세로로 쌓는 함수 (vertical stack).
# - X: 판별자(Discriminator)의 입력으로 사용될 데이터. 크기: [2 * batch_size, 114] (예: [1024, 114]). 가짜 데이터와 진짜 데이터가 순서대로 쌓임.
y_dis = np.ones(2 * batch_size)
# - y_dis: 판별자(Discriminator)의 훈련에 사용될 레이블(정답). 크기: [2 * batch_size] (예: [1024])
# - np.ones(): 모든 원소가 1인 배열을 생성. 초기에는 모든 데이터가 "진짜(real)"라고 가정.
y_dis[:batch_size] = 0
# - y_dis 배열의 앞부분(처음부터 batch_size 개수만큼)을 0으로 변경.
# - [:batch_size]: 슬라이싱(slicing). 처음부터 batch_size - 1 인덱스까지 선택.
# - 가짜 데이터(generated_images)에 해당하는 레이블을 "가짜(fake)" (0)로 설정.
# - 결과적으로 y_dis는 [0, 0, ..., 0, 1, 1, ..., 1] 형태 (0이 batch_size개, 1이 batch_size개).
# Train discriminator (판별자 훈련)
discriminator.trainable = True
# - 판별자(Discriminator)의 가중치가 훈련(업데이트)될 수 있도록 설정. 생성자(Generator) 훈련 시에는 판별자를 고정해야 하므로, 여기서는 다시 훈련 가능하게 변경.
데이터 준비 과정 (상세 설명):
X = np.vstack((generated_images, image_batch)):
generated_images: 생성자가 이전 단계에서 생성한 가짜 데이터 배치입니다. batch_size (예: 512)개의 가짜 데이터 샘플을 포함하며, 각 샘플은 114개의 특징(feature)을 가집니다.image_batch: 훈련 데이터(x_train, 정상 데이터만 포함)에서 현재 배치 인덱스(index)에 해당하는 batch_size (예: 512)개의 진짜 데이터 샘플을 가져옵니다. 각 샘플은 114개의 특징을 가집니다.np.vstack(): generated_images와 image_batch를 세로 방향으로 쌓습니다. 마치 두 개의 블록을 위아래로 쌓는 것과 같습니다.X는 2 * batch_size (예: 1024)개의 샘플을 가지게 됩니다. 앞쪽 절반(batch_size개)은 가짜 데이터, 뒤쪽 절반(batch_size개)은 진짜 데이터입니다.y_dis = np.ones(2 * batch_size):
y_dis는 판별자의 훈련에 사용될 레이블(정답)입니다. 판별자는 입력된 데이터가 진짜(real)인지 가짜(fake)인지를 판별해야 하므로, 각 데이터 샘플에 대해 "진짜" 또는 "가짜" 레이블이 필요합니다.np.ones(2 * batch_size): 초기에는 모든 레이블을 1 (진짜)로 설정합니다. X에 있는 데이터 샘플의 개수(2 * batch_size)만큼 1로 채워진 배열을 생성합니다.y_dis[:batch_size] = 0:
y_dis 배열의 앞부분(가짜 데이터에 해당하는 부분)을 0 (가짜)으로 변경합니다.[:batch_size]: 파이썬 슬라이싱 문법입니다. y_dis 배열의 처음부터 batch_size - 1 인덱스까지의 요소를 선택합니다.y_dis[:batch_size] = 0: 선택된 요소(앞쪽 절반)의 값을 모두 0으로 변경합니다.왜 이렇게 데이터를 구성할까요? (핵심)
X는 판별자에게 "이 데이터는 진짜일까, 가짜일까?"라는 질문을 던지는 것과 같습니다.y_dis는 그 질문에 대한 정답을 제공합니다. "앞쪽 절반은 가짜(0)이고, 뒤쪽 절반은 진짜(1)이야"라고 알려주는 것입니다.X와 y_dis를 통해 진짜 데이터와 가짜 데이터의 특징을 학습하고, 점점 더 정확하게 둘을 구별할 수 있게 됩니다.discriminator.trainable = True의 중요성:
discriminator.trainable = False)하여 생성자가 판별자를 속이는 방향으로만 학습하도록 합니다.discriminator.trainable = True로 설정해야 합니다.이제 판별자 훈련을 위한 데이터 준비 과정과 그 의미가 완벽하게 이해되셨기를 바랍니다!
네, 맞습니다. batch_count가 1425가 되는 이유를 다시 한번 자세히 설명드리겠습니다.
batch_count 계산:
batch_count = x_train.shape[0] // batch_size # 배치 수 계산
x_train.shape[0]: 훈련 데이터(x_train)의 샘플 개수입니다. 앞서 print(x_train.shape)의 결과가 (729620, 114)였으므로, x_train.shape[0]은 729620입니다. (정상 데이터만 남긴 후의 개수)batch_size: 배치 사이즈로, 코드에서 512로 설정했습니다.//: 정수 나눗셈 연산자입니다. 소수점 이하를 버리고 몫만 취합니다.따라서 batch_count는 다음과 같이 계산됩니다:
batch_count = 729620 // 512 = 1425.039... => 1425
batch_count의 의미:
batch_count는 전체 훈련 데이터(x_train)를 batch_size (512) 크기의 배치로 나누었을 때 생성되는 배치의 개수입니다.batch_count가 1425라는 것은 훈련 데이터가 1425개의 배치로 나뉘어 모델에 입력된다는 의미입니다.코드에서의 활용:
for epoch in range(epochs): # 에포크 반복
for index in range(batch_count): # 배치 반복
# ... (배치 단위 훈련 코드) ...
epochs (전체 데이터셋 반복 횟수)만큼 반복됩니다.batch_count (1425)만큼 반복하면서 각 배치에 대해 훈련을 수행합니다.index 변수는 현재 배치의 인덱스를 나타냅니다 (0부터 1424까지).정리:
batch_count는 훈련 데이터를 배치 사이즈로 나누었을 때 생성되는 배치의 개수를 나타내며, 훈련 루프에서 각 배치에 대한 훈련을 반복하는 데 사용됩니다. 이 경우, 훈련 데이터는 1425개의 배치로 나뉘어 훈련됩니다.
이제 batch_count의 의미와 계산 방법, 코드 내에서의 역할이 완전히 이해되셨기를 바랍니다!
# Import system packages (시스템 패키지 임포트)
import os # 운영체제와 상호작용 (파일 경로 등)
import sys # 파이썬 인터프리터와 상호작용 (시스템 경로 등)
import time # 시간 관련 함수 (훈련 시간 측정 등)
import logging # 로깅 (훈련 과정 기록)
import importlib # 모듈 다시 로드 (개발 중 유용)
# Import data manipulation libraries (데이터 조작 라이브러리 임포트)
import numpy as np # 수치 연산 (배열, 행렬)
import pandas as pd # 데이터 분석 (DataFrame)
import collections # 데이터 구조 (Counter)
from tqdm import tqdm # 진행률 표시줄
# Import visualization libraries (시각화 라이브러리 임포트)
import matplotlib.pyplot as plt # 그래프 그리기
# Importing ML/DL libraries (머신러닝/딥러닝 라이브러리 임포트)
from sklearn.preprocessing import MinMaxScaler, label_binarize # 데이터 정규화, 레이블 이진화
from sklearn.model_selection import train_test_split # 데이터 분할 (훈련/테스트)
from sklearn.metrics import roc_curve, auc, precision_recall_fscore_support, average_precision_score # 평가 지표
from sklearn.metrics import precision_recall_curve, auc, confusion_matrix, accuracy_score # 평가 지표
import tensorflow as tf # 텐서플로우
from tensorflow import keras # 케라스 (텐서플로우 기반 딥러닝 API)
from tensorflow.keras import initializers # 가중치 초기화
from tensorflow.keras.models import Model, Sequential # 모델 (함수형, 순차형)
from tensorflow.keras.optimizers import Adam, RMSprop # 옵티마이저
from tensorflow.keras.callbacks import ModelCheckpoint # 콜백 (모델 저장 등)
from tensorflow.keras.layers import Dense, Dropout # 레이어 (밀집, 드롭아웃)
from tensorflow.keras.layers import LeakyReLU # 고급 활성화 함수 (Leaky ReLU)
from tensorflow.keras.layers import Input, BatchNormalization, ReLU, Dense, Reshape, Flatten, Activation # 다양한 레이어
from tensorflow.keras.layers import Dropout, multiply, GaussianNoise, MaxPooling2D, concatenate # 다양한 레이어
import pickle # 파이썬 객체 저장/로드
import random # 난수 생성
random.seed(123) # 난수 시드 고정 (재현성)
data_path = './data/' # 데이터 경로 설정 (필요에 따라 수정)
# 1. 데이터셋 (Dataset)
# loading the pickled file (pickle 파일 로드)
filename = data_path + 'preprocessed_data_full.pkl' # pickle 파일 경로
input_file = open(filename, 'rb') # 바이너리 읽기 모드('rb')로 파일 열기
preprocessed_data = pickle.load(input_file) # pickle 파일에서 데이터 로드
input_file.close() # 파일 닫기
# preprocessed_data 딕셔너리에서 데이터 추출
le = preprocessed_data['le'] # 레이블 인코더 (문자열 레이블을 숫자로 변환)
x_train = preprocessed_data['x_train'] # 훈련 데이터 (특징, feature)
y_train = preprocessed_data['y_train'] # 훈련 데이터 (레이블, label)
x_test = preprocessed_data['x_test'] # 테스트 데이터 (특징)
y_test = preprocessed_data['y_test'] # 테스트 데이터 (레이블)
# Converting labels to Binary (레이블을 이진 형식으로 변환, 0과 1)
y_test[y_test != 11] = 1 # 'normal'(11)이 아닌 레이블을 모두 1 (이상, anomaly)로 변경
y_test[y_test == 11] = 0 # 'normal' 레이블을 0 (정상, normal)으로 변경
y_train[y_train != 11] = 1 # 훈련 데이터도 동일하게 처리
y_train[y_train == 11] = 0
# Subsetting only Normal Network packets in our training set (훈련 세트에 정상 패킷만 남김)
temp_df = x_train.copy() # 훈련 데이터 복사 (원본 데이터 보존)
temp_df['label'] = y_train # 훈련 데이터에 레이블 열 추가
temp_df = temp_df.loc[temp_df['label'] == 0] # 레이블이 0 (정상)인 데이터만 선택
temp_df = temp_df.drop('label', axis=1) # 레이블 열 제거
x_train = temp_df.copy() # 정상 데이터만 남은 훈련 데이터
# Normalize the above splits using the MinMaxScaler (MinMaxScaler를 사용하여 데이터 정규화)
scaler = MinMaxScaler() # MinMaxScaler 객체 생성 (0과 1 사이로 값 변환)
# Make sure to only fit the scaler on the training data (훈련 데이터에만 scaler를 fit)
x_train = scaler.fit_transform(x_train) # 훈련 데이터 변환 (fit: 훈련 데이터의 최소/최대값 계산, transform: 변환)
x_test = scaler.transform(x_test) # 테스트 데이터 변환 (훈련 데이터에서 계산된 최소/최대값 사용)
# Creating dataset dictionary (데이터셋 딕셔너리 생성)
dataset = {}
dataset['x_train'] = x_train.astype(np.float32) # 훈련 데이터 (float32로 타입 변환, 메모리 절약)
dataset['y_train'] = y_train.astype(np.float32) # 훈련 레이블
dataset['x_test'] = x_test.astype(np.float32) # 테스트 데이터
dataset['y_test'] = y_test.astype(np.float32) # 테스트 레이블
# check how many anomalies are in our Testing set (테스트 세트의 이상 징후 개수 확인, 데이터 불균형 확인)
print('Number of Normal Network packets in the Training set:', x_train.shape[0]) # 훈련 세트 정상 패킷 수, x_train.shape[0]는 훈련 데이터의 샘플 개수를 나타냄
print('Number of Normal Network packets in the Testing set:', collections.Counter(y_test)[0]) # 테스트 세트 정상 패킷 수
print('Number of Anomalous Network packets in the Testing set:', collections.Counter(y_test)[1]) # 테스트 세트 이상 패킷 수
print("x_train shape:", x_train.shape) # (729620, 114) 729620개의 훈련 샘플, 각 샘플은 114개의 특성(feature)
# 2. Generative Adversarial Networks (GAN)
# 2.1 생성자 네트워크 (Generator Network)
def get_generator(optimizer): # 옵티마이저를 인자로 받는 생성자 함수
generator = Sequential() # 순차적으로 레이어를 쌓는 Sequential 모델 생성
# 첫 번째 Dense 레이어
generator.add(Dense(64, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42)))
# - 64개의 뉴런(유닛)
# - 입력 차원(input_dim): 114 (데이터의 특징 개수), x_train의 두 번째 차원(114)과 일치
# - kernel_initializer: 가중치 초기화 방법 (glorot_normal, Xavier initialization)
generator.add(Activation('tanh')) # tanh 활성화 함수 적용
# - tanh: -1과 1 사이의 값을 출력 (sigmoid보다 gradient vanishing 문제 덜 발생)
# 두 번째 Dense 레이어
generator.add(Dense(128))
generator.add(Activation('tanh'))
# 세 번째 Dense 레이어
generator.add(Dense(256))
generator.add(Activation('tanh'))
# 네 번째 Dense 레이어
generator.add(Dense(256))
generator.add(Activation('tanh'))
# 다섯 번째 Dense 레이어
generator.add(Dense(512))
generator.add(Activation('tanh'))
# 출력 레이어
generator.add(Dense(114, activation='tanh'))
# - 114개의 뉴런 (입력 데이터와 동일한 차원)
# - tanh 활성화 함수 (생성된 데이터도 -1과 1 사이로 맞춤, MinMaxScaler와 호환)
generator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
# - loss: binary_crossentropy (이진 분류 손실 함수, 판별자를 속이는 데 사용)
# - optimizer: Adam, RMSprop 등 (가중치 업데이트 방법)
return generator # 생성자 모델 반환
# 2.2 판별자 네트워크 (Discriminator Network)
def get_discriminator(optimizer):
discriminator = Sequential() # 순차 모델
# 첫 번째 Dense 레이어
discriminator.add(Dense(256, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42)))
# - 256개의 뉴런
# - 입력 차원(input_dim): 114 (데이터의 특징 개수), x_train, x_test의 두 번째 차원과 일치
# - kernel_initializer: 가중치 초기화
discriminator.add(Activation('relu')) # ReLU 활성화 함수
# - ReLU (Rectified Linear Unit): 0보다 작으면 0, 0보다 크면 그대로 출력 (계산 효율, gradient vanishing 문제 완화)
discriminator.add(Dropout(0.2)) # 드롭아웃 (과적합 방지)
# - Dropout: 훈련 중에 무작위로 뉴런을 비활성화 (20% 비율)
# 두 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2)) # 드롭아웃
# 세 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2))# 드롭아웃
# 네 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu'))# ReLU
discriminator.add(Dropout(0.2))# 드롭아웃
# 다섯 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2)) # 드롭아웃
# 출력 레이어
discriminator.add(Dense(1)) # 1개의 뉴런 (진짜/가짜 확률)
discriminator.add(Activation('sigmoid')) # sigmoid 활성화 함수
# - sigmoid: 0과 1 사이의 값을 출력 (확률)
discriminator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
# - loss: binary_crossentropy (진짜/가짜 이진 분류)
# - optimizer
return discriminator # 판별자 모델 반환
# 2.3 GAN 네트워크 구성
def get_gan_network(discriminator, generator, optimizer, input_dim=114):
discriminator.trainable = False # 판별자 가중치 고정 (생성자만 훈련)
gan_input = Input(shape=(input_dim,)) # GAN의 입력 (Input 레이어, 함수형 API), shape=(114,)
x = generator(gan_input) # 생성자를 통과 (가짜 데이터 생성)
gan_output = discriminator(x) # 판별자를 통과 (진짜/가짜 확률)
gan = Model(inputs=gan_input, outputs=gan_output) # GAN 모델 생성 (입력, 출력)
gan.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
return gan # GAN 모델 반환
# 2.4 하이퍼파라미터 설정
learning_rate = 0.00001 # 학습률 (작은 값, 안정적인 학습)
batch_size = 512 # 배치 크기 (한 번에 처리하는 데이터 샘플 수), 512개의 샘플을 하나의 배치로 묶음
epochs = 10 # 에포크 수 (전체 데이터셋 반복 횟수)
adam = Adam(learning_rate=learning_rate, beta_1=0.5) # Adam 옵티마이저 (beta_1: 모멘텀 관련)
# 2.5 모델 훈련
x_train, y_train, x_test, y_test = dataset['x_train'], dataset['y_train'], dataset['x_test'], dataset['y_test'] # 데이터셋
batch_count = x_train.shape[0] // batch_size # 배치 수 계산, 전체 훈련 샘플 수를 배치 사이즈로 나눔 (729620 // 512)
pbar = tqdm(total=epochs * batch_count) # 진행률 표시줄
gan_loss = [] # GAN 손실 기록
discriminator_loss = [] # 판별자 손실 기록
generator = get_generator(adam) # 생성자 모델
discriminator = get_discriminator(adam) # 판별자 모델
gan = get_gan_network(discriminator, generator, adam, input_dim=114) # GAN 모델
for epoch in range(epochs): # 에포크 반복
for index in range(batch_count): # 배치 반복
pbar.update(1) # 진행률 표시
# 1. 판별자 훈련 (Discriminator Training)
# 랜덤 노이즈 생성 (생성자 입력)
noise = np.random.normal(0, 1, size=[batch_size, 114]) # 정규 분포 (평균 0, 표준편차 1), [512, 114] 크기의 노이즈 생성
# 배치 사이즈(512)만큼의 노이즈 샘플 생성, 각 노이즈 샘플은 114차원
# 가짜 데이터 생성
generated_images = generator.predict_on_batch(noise) # [512, 114] 크기의 가짜 데이터 생성
# 진짜 데이터 (정상 데이터) 배치 가져오기
image_batch = x_train[index * batch_size: (index + 1) * batch_size] # 슬라이싱을 사용하여 배치 사이즈만큼의 실제 데이터 선택
# x_train에서 현재 배치에 해당하는 데이터(512개 샘플)를 가져옴
# 진짜 데이터와 가짜 데이터 합치기 (판별자 입력)
X = np.vstack((generated_images, image_batch)) # [1024, 114] 크기, 가짜 데이터와 진짜 데이터를 세로로 쌓음 (총 1024개 샘플)
y_dis = np.ones(2 * batch_size) # 레이블 생성 (1: 진짜, 0: 가짜), [1024] 크기
y_dis[:batch_size] = 0 # 앞부분(가짜)은 0, [0, 0, ..., 1, 1] 형태
# 판별자 훈련
discriminator.trainable = True # 판별자 훈련 가능
d_loss = discriminator.train_on_batch(X, y_dis) # 판별자 훈련 (입력, 레이블), 배치 단위로 훈련
# X: [1024, 114], y_dis: [1024]
# 2. 생성자 훈련 (Generator Training)
# 랜덤 노이즈 생성 (생성자 입력)
noise = np.random.uniform(0, 1, size=[batch_size, 114]) # 균등 분포 (0과 1 사이), [512, 114] 크기의 노이즈
y_gen = np.ones(batch_size) # 생성자 훈련용 레이블 (모두 1, 판별자를 속이도록), [512] 크기
discriminator.trainable = False # 판별자 훈련 불가능 (고정)
g_loss = gan.train_on_batch(noise, y_gen) # GAN 훈련 (생성자만 훈련), 배치 단위로 훈련
# noise: [512, 114], y_gen: [512]
# 손실 기록
discriminator_loss.append(d_loss)
gan_loss.append(g_loss)
# 에포크별 손실 출력
print("Epoch %d Batch %d/%d [D loss: %f] [G loss:%f]" % (epoch, index, batch_count, d_loss, g_loss))
# 2.6 훈련 손실 시각화
fig, ax = plt.subplots()
plt.plot(discriminator_loss, label='Discriminator') # 판별자 손실
plt.plot(gan_loss, label='Generator') # 생성자 손실
plt.title("Training Losses")
plt.legend()
plt.show()
# 2.7 이상 탐지
nr_batches_test = np.ceil(x_test.shape[0] // batch_size).astype(np.int32) # 테스트 배치 수
results = [] # 판별자 예측 결과 (확률)
# 테스트 데이터에 대한 판별자 예측
for t in range(nr_batches_test + 1):
ran_from = t * batch_size
ran_to = (t + 1) * batch_size
image_batch = x_test[ran_from:ran_to] # 테스트 데이터에서 배치만큼 가져오기
tmp_rslt = discriminator.predict(x=image_batch, batch_size=128, verbose=0) # 예측, 예측 시에는 배치 사이즈를 128로 사용 (메모리 문제 등 고려)
results = np.append(results, tmp_rslt) # 예측 결과를 results 리스트에 추가
# 정상/이상 데이터 평균 점수 계산 (확률)
pd.options.display.float_format = '{:20,.7f}'.format # 소수점 형식
results_df = pd.concat([pd.DataFrame(results), pd.DataFrame(y_test)], axis=1) # 결과, 레이블
results_df.columns = ['results', 'y_test'] # 열 이름
print('Mean score for normal packets :', results_df.loc[results_df['y_test'] == 0, 'results'].mean()) # 정상
print('Mean score for anomalous packets :', results_df.loc[results_df['y_test'] == 1, 'results'].mean()) # 이상
# 이상 징후 판별 (임계값 기반)
per = np.percentile(results, 1) # 가장 낮은 1% 점수 (임계값)
y_pred = results.copy()
y_pred = np.array(y_pred)
inds = (y_pred > per) # 임계값보다 크면 정상 (0)
inds_comp = (y_pred <= per) # 임계값 이하면 이상 (1)
y_pred[inds] = 0
y_pred[inds_comp] = 1
# 3. 평가
# 정확도, 정밀도, 재현율, F1-score
precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='binary')
print('Accuracy Score :', accuracy_score(y_test, y_pred))
print('Precision :', precision)
print('Recall :', recall)
print('F1 :', f1)
# 혼동 행렬
cm = confusion_matrix(y_test, y_pred) # 혼동 행렬
print('Confusion Matrix :')
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Greens):
plt.figure(figsize=(10, 10))
plt.imshow(cm, interpolation='nearest', cmap=cmap) # 이미지
plt.title(title)
plt.colorbar()
plt.tight_layout()
width, height = cm.shape
for x in range(width):
for y in range(height):
plt.annotate(str(cm[x][y]), xy=(y, x),
horizontalalignment='center',
verticalalignment='center')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
plot_confusion_matrix(cm)
# ROC 곡선
fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_test, y_pred) # ROC
auc_keras = auc(fpr_keras, tpr_keras) # AUC
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--') # 대각선
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.2f})'.format(auc_keras)) # ROC
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
이제 코드의 각 부분, 특히 배치 사이즈와 훈련 데이터 형태가 어떻게 사용되는지 훨씬 더 명확하게 이해되셨을 겁니다. 궁금한 점이 있다면 언제든지 다시 질문해주세요!
좋습니다! 배치 사이즈(Batch Size)와 훈련 데이터의 형태(Train Shape)에 대해 더 자세히 설명하고, 코드 내에서 어떻게 사용되는지 명확하게 짚어보겠습니다.
1. 배치 사이즈 (Batch Size)
batch_size = 512 # 배치 크기 설정
# ... (중략) ...
for epoch in range(epochs): # 에포크 반복
for index in range(batch_count): # 배치 반복
# ...
# 데이터 배치를 가져오는 부분
image_batch = x_train[index * batch_size: (index + 1) * batch_size]
# ...
# 모델 훈련 (배치 단위)
d_loss = discriminator.train_on_batch(X, y_dis) # 판별자 훈련
g_loss = gan.train_on_batch(noise, y_gen) # 생성자 훈련
batch_size = 512: 배치 사이즈를 512로 설정합니다. 이 값은 실험을 통해 조정할 수 있습니다.image_batch = x_train[index * batch_size: (index + 1) * batch_size]: 훈련 데이터(x_train)에서 현재 배치에 해당하는 데이터를 가져옵니다. 슬라이싱([start:end])을 사용하여 배치 사이즈만큼 데이터를 선택합니다.train_on_batch(): Keras의 train_on_batch() 함수는 주어진 배치 데이터(X, y_dis 또는 noise, y_gen)를 사용하여 모델(판별자 또는 GAN)을 한 번 훈련시킵니다. (가중치를 한 번 업데이트)2. 훈련 데이터 형태 (Train Shape)
x_train)의 차원(dimension)을 나타냅니다.# 훈련 데이터 형태 확인
print(x_train.shape)
# ... (중략) ...
# 생성자 네트워크 정의
def get_generator(optimizer):
generator = Sequential()
generator.add(Dense(64, input_dim=114, ...)) # 입력 차원: 114
# ...
# 판별자 네트워크 정의
def get_discriminator(optimizer):
discriminator = Sequential()
discriminator.add(Dense(256, input_dim=114, ...)) # 입력 차원: 114
# ...
# GAN 네트워크 구성
def get_gan_network(discriminator, generator, optimizer, input_dim=114):
gan_input = Input(shape=(input_dim,)) # 입력 형태: (114,)
# ...
print(x_train.shape): 훈련 데이터의 형태를 출력합니다. 예를 들어, (729620, 114)와 같이 출력될 수 있습니다.729620: 훈련 데이터 샘플의 개수 (정상 데이터만 남긴 후)114: 각 데이터 샘플의 특징(feature) 개수 (KDD-99 데이터셋의 특징 수)input_dim=114: 생성자와 판별자 모델의 첫 번째 Dense 레이어에서 input_dim을 114로 설정합니다. 이는 훈련 데이터의 특징 개수와 일치해야 합니다.gan_input = Input(shape=(input_dim,)) 에서 input_dim을 114로 설정하여, 훈련 데이터의 특징 개수와 일치합니다.요약:
이 설명이 배치 사이즈와 훈련 데이터 형태에 대한 이해를 돕고, 코드 내에서 어떻게 사용되는지 명확하게 보여주었기를 바랍니다! 추가 질문이 있으면 언제든지 물어보세요.
좋습니다! 생략 없이, ReLU와 TanH 활성화 함수까지 포함하여 아주 자세하게 코드 설명을 다시 작성해 보겠습니다.
전체 코드 (Full Code with Detailed Explanations):
# Import system packages (시스템 패키지 임포트)
import os # 운영체제와 상호작용 (파일 경로 등)
import sys # 파이썬 인터프리터와 상호작용 (시스템 경로 등)
import time # 시간 관련 함수 (훈련 시간 측정 등)
import logging # 로깅 (훈련 과정 기록)
import importlib # 모듈 다시 로드 (개발 중 유용)
# Import data manipulation libraries (데이터 조작 라이브러리 임포트)
import numpy as np # 수치 연산 (배열, 행렬)
import pandas as pd # 데이터 분석 (DataFrame)
import collections # 데이터 구조 (Counter)
from tqdm import tqdm # 진행률 표시줄
# Import visualization libraries (시각화 라이브러리 임포트)
import matplotlib.pyplot as plt # 그래프 그리기
# Importing ML/DL libraries (머신러닝/딥러닝 라이브러리 임포트)
from sklearn.preprocessing import MinMaxScaler, label_binarize # 데이터 정규화, 레이블 이진화
from sklearn.model_selection import train_test_split # 데이터 분할 (훈련/테스트)
from sklearn.metrics import roc_curve, auc, precision_recall_fscore_support, average_precision_score # 평가 지표
from sklearn.metrics import precision_recall_curve, auc, confusion_matrix, accuracy_score # 평가 지표
import tensorflow as tf # 텐서플로우
from tensorflow import keras # 케라스 (텐서플로우 기반 딥러닝 API)
from tensorflow.keras import initializers # 가중치 초기화
from tensorflow.keras.models import Model, Sequential # 모델 (함수형, 순차형)
from tensorflow.keras.optimizers import Adam, RMSprop # 옵티마이저
from tensorflow.keras.callbacks import ModelCheckpoint # 콜백 (모델 저장 등)
from tensorflow.keras.layers import Dense, Dropout # 레이어 (밀집, 드롭아웃)
from tensorflow.keras.layers import LeakyReLU # 고급 활성화 함수 (Leaky ReLU)
from tensorflow.keras.layers import Input, BatchNormalization, ReLU, Dense, Reshape, Flatten, Activation # 다양한 레이어
from tensorflow.keras.layers import Dropout, multiply, GaussianNoise, MaxPooling2D, concatenate # 다양한 레이어
import pickle # 파이썬 객체 저장/로드
import random # 난수 생성
random.seed(123) # 난수 시드 고정 (재현성)
data_path = './data/' # 데이터 경로 설정 (필요에 따라 수정)
# 1. 데이터셋 (Dataset)
# loading the pickled file (pickle 파일 로드)
filename = data_path + 'preprocessed_data_full.pkl' # pickle 파일 경로
input_file = open(filename, 'rb') # 바이너리 읽기 모드('rb')로 파일 열기
preprocessed_data = pickle.load(input_file) # pickle 파일에서 데이터 로드
input_file.close() # 파일 닫기
# preprocessed_data 딕셔너리에서 데이터 추출
le = preprocessed_data['le'] # 레이블 인코더 (문자열 레이블을 숫자로 변환)
x_train = preprocessed_data['x_train'] # 훈련 데이터 (특징, feature)
y_train = preprocessed_data['y_train'] # 훈련 데이터 (레이블, label)
x_test = preprocessed_data['x_test'] # 테스트 데이터 (특징)
y_test = preprocessed_data['y_test'] # 테스트 데이터 (레이블)
# Converting labels to Binary (레이블을 이진 형식으로 변환, 0과 1)
y_test[y_test != 11] = 1 # 'normal'(11)이 아닌 레이블을 모두 1 (이상, anomaly)로 변경
y_test[y_test == 11] = 0 # 'normal' 레이블을 0 (정상, normal)으로 변경
y_train[y_train != 11] = 1 # 훈련 데이터도 동일하게 처리
y_train[y_train == 11] = 0
# Subsetting only Normal Network packets in our training set (훈련 세트에 정상 패킷만 남김)
temp_df = x_train.copy() # 훈련 데이터 복사 (원본 데이터 보존)
temp_df['label'] = y_train # 훈련 데이터에 레이블 열 추가
temp_df = temp_df.loc[temp_df['label'] == 0] # 레이블이 0 (정상)인 데이터만 선택
temp_df = temp_df.drop('label', axis=1) # 레이블 열 제거
x_train = temp_df.copy() # 정상 데이터만 남은 훈련 데이터
# Normalize the above splits using the MinMaxScaler (MinMaxScaler를 사용하여 데이터 정규화)
scaler = MinMaxScaler() # MinMaxScaler 객체 생성 (0과 1 사이로 값 변환)
# Make sure to only fit the scaler on the training data (훈련 데이터에만 scaler를 fit)
x_train = scaler.fit_transform(x_train) # 훈련 데이터 변환 (fit: 훈련 데이터의 최소/최대값 계산, transform: 변환)
x_test = scaler.transform(x_test) # 테스트 데이터 변환 (훈련 데이터에서 계산된 최소/최대값 사용)
# Creating dataset dictionary (데이터셋 딕셔너리 생성)
dataset = {}
dataset['x_train'] = x_train.astype(np.float32) # 훈련 데이터 (float32로 타입 변환, 메모리 절약)
dataset['y_train'] = y_train.astype(np.float32) # 훈련 레이블
dataset['x_test'] = x_test.astype(np.float32) # 테스트 데이터
dataset['y_test'] = y_test.astype(np.float32) # 테스트 레이블
# check how many anomalies are in our Testing set (테스트 세트의 이상 징후 개수 확인, 데이터 불균형 확인)
print('Number of Normal Network packets in the Training set:', x_train.shape[0]) # 훈련 세트 정상 패킷 수
print('Number of Normal Network packets in the Testing set:', collections.Counter(y_test)[0]) # 테스트 세트 정상 패킷 수
print('Number of Anomalous Network packets in the Testing set:', collections.Counter(y_test)[1]) # 테스트 세트 이상 패킷 수
# 2. Generative Adversarial Networks (GAN)
# 2.1 생성자 네트워크 (Generator Network)
def get_generator(optimizer): # 옵티마이저를 인자로 받는 생성자 함수
generator = Sequential() # 순차적으로 레이어를 쌓는 Sequential 모델 생성
# 첫 번째 Dense 레이어
generator.add(Dense(64, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42)))
# - 64개의 뉴런(유닛)
# - 입력 차원(input_dim): 114 (데이터의 특징 개수)
# - kernel_initializer: 가중치 초기화 방법 (glorot_normal, Xavier initialization)
generator.add(Activation('tanh')) # tanh 활성화 함수 적용
# - tanh: -1과 1 사이의 값을 출력 (sigmoid보다 gradient vanishing 문제 덜 발생)
# 두 번째 Dense 레이어
generator.add(Dense(128))
generator.add(Activation('tanh'))
# 세 번째 Dense 레이어
generator.add(Dense(256))
generator.add(Activation('tanh'))
# 네 번째 Dense 레이어
generator.add(Dense(256))
generator.add(Activation('tanh'))
# 다섯 번째 Dense 레이어
generator.add(Dense(512))
generator.add(Activation('tanh'))
# 출력 레이어
generator.add(Dense(114, activation='tanh'))
# - 114개의 뉴런 (입력 데이터와 동일한 차원)
# - tanh 활성화 함수 (생성된 데이터도 -1과 1 사이로 맞춤, MinMaxScaler와 호환)
generator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
# - loss: binary_crossentropy (이진 분류 손실 함수, 판별자를 속이는 데 사용)
# - optimizer: Adam, RMSprop 등 (가중치 업데이트 방법)
return generator # 생성자 모델 반환
# 2.2 판별자 네트워크 (Discriminator Network)
def get_discriminator(optimizer):
discriminator = Sequential()
# 첫 번째 Dense 레이어
discriminator.add(Dense(256, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42)))
discriminator.add(Activation('relu')) # ReLU 활성화 함수
# - ReLU (Rectified Linear Unit): 0보다 작으면 0, 0보다 크면 그대로 출력 (계산 효율, gradient vanishing 문제 완화)
discriminator.add(Dropout(0.2)) # 드롭아웃 (과적합 방지)
# - Dropout: 훈련 중에 무작위로 뉴런을 비활성화 (20% 비율)
# 두 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2)) # 드롭아웃
# 세 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2))# 드롭아웃
# 네 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu'))# ReLU
discriminator.add(Dropout(0.2))# 드롭아웃
# 다섯 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2)) # 드롭아웃
# 출력 레이어
discriminator.add(Dense(1)) # 1개의 뉴런 (진짜/가짜 확률)
discriminator.add(Activation('sigmoid')) # sigmoid 활성화 함수
# - sigmoid: 0과 1 사이의 값을 출력 (확률)
discriminator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
# - loss: binary_crossentropy (진짜/가짜 이진 분류)
# - optimizer
return discriminator # 판별자 모델 반환
# 2.3 GAN 네트워크 구성
def get_gan_network(discriminator, generator, optimizer, input_dim=114):
discriminator.trainable = False # 판별자 가중치 고정 (생성자만 훈련)
gan_input = Input(shape=(input_dim,)) # GAN의 입력 (Input 레이어, 함수형 API)
x = generator(gan_input) # 생성자를 통과 (가짜 데이터 생성)
gan_output = discriminator(x) # 판별자를 통과 (진짜/가짜 확률)
gan = Model(inputs=gan_input, outputs=gan_output) # GAN 모델 생성 (입력, 출력)
gan.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
return gan # GAN 모델 반환
# 2.4 하이퍼파라미터 설정
learning_rate = 0.00001 # 학습률 (작은 값, 안정적인 학습)
batch_size = 512 # 배치 크기 (한 번에 처리하는 데이터 샘플 수)
epochs = 10 # 에포크 수 (전체 데이터셋 반복 횟수)
adam = Adam(learning_rate=learning_rate, beta_1=0.5) # Adam 옵티마이저 (beta_1: 모멘텀 관련)
# 2.5 모델 훈련
x_train, y_train, x_test, y_test = dataset['x_train'], dataset['y_train'], dataset['x_test'], dataset['y_test'] # 데이터셋
batch_count = x_train.shape[0] // batch_size # 배치 수 계산
pbar = tqdm(total=epochs * batch_count) # 진행률 표시줄
gan_loss = [] # GAN 손실 기록
discriminator_loss = [] # 판별자 손실 기록
generator = get_generator(adam) # 생성자 모델
discriminator = get_discriminator(adam) # 판별자 모델
gan = get_gan_network(discriminator, generator, adam, input_dim=114) # GAN 모델
for epoch in range(epochs): # 에포크 반복
for index in range(batch_count): # 배치 반복
pbar.update(1) # 진행률 표시
# 1. 판별자 훈련 (Discriminator Training)
# 랜덤 노이즈 생성 (생성자 입력)
noise = np.random.normal(0, 1, size=[batch_size, 114]) # 정규 분포 (평균 0, 표준편차 1)
# 가짜 데이터 생성
generated_images = generator.predict_on_batch(noise)
# 진짜 데이터 (정상 데이터) 배치 가져오기
image_batch = x_train[index * batch_size: (index + 1) * batch_size]
# 진짜 데이터와 가짜 데이터 합치기 (판별자 입력)
X = np.vstack((generated_images, image_batch))
y_dis = np.ones(2 * batch_size) # 레이블 생성 (1: 진짜, 0: 가짜)
y_dis[:batch_size] = 0 # 앞부분(가짜)은 0
# 판별자 훈련
discriminator.trainable = True # 판별자 훈련 가능
d_loss = discriminator.train_on_batch(X, y_dis) # 판별자 훈련 (입력, 레이블)
# 2. 생성자 훈련 (Generator Training)
# 랜덤 노이즈 생성 (생성자 입력)
noise = np.random.uniform(0, 1, size=[batch_size, 114]) # 균등 분포 (0과 1 사이)
y_gen = np.ones(batch_size) # 생성자 훈련용 레이블 (모두 1, 판별자를 속이도록)
discriminator.trainable = False # 판별자 훈련 불가능 (고정)
g_loss = gan.train_on_batch(noise, y_gen) # GAN 훈련 (생성자만 훈련)
# 손실 기록
discriminator_loss.append(d_loss)
gan_loss.append(g_loss)
# 에포크별 손실 출력
print("Epoch %d Batch %d/%d [D loss: %f] [G loss:%f]" % (epoch, index, batch_count, d_loss, g_loss))
# 2.6 훈련 손실 시각화
fig, ax = plt.subplots()
plt.plot(discriminator_loss, label='Discriminator') # 판별자 손실
plt.plot(gan_loss, label='Generator') # 생성자 손실
plt.title("Training Losses")
plt.legend()
plt.show()
# 2.7 이상 탐지
nr_batches_test = np.ceil(x_test.shape[0] // batch_size).astype(np.int32) # 테스트 배치 수
results = [] # 판별자 예측 결과 (확률)
# 테스트 데이터에 대한 판별자 예측
for t in range(nr_batches_test + 1):
ran_from = t * batch_size
ran_to = (t + 1) * batch_size
image_batch = x_test[ran_from:ran_to]
tmp_rslt = discriminator.predict(x=image_batch, batch_size=128, verbose=0) # 예측
results = np.append(results, tmp_rslt)
# 정상/이상 데이터 평균 점수 계산 (확률)
pd.options.display.float_format = '{:20,.7f}'.format # 소수점 형식
results_df = pd.concat([pd.DataFrame(results), pd.DataFrame(y_test)], axis=1) # 결과, 레이블
results_df.columns = ['results', 'y_test'] # 열 이름
print('Mean score for normal packets :', results_df.loc[results_df['y_test'] == 0, 'results'].mean()) # 정상
print('Mean score for anomalous packets :', results_df.loc[results_df['y_test'] == 1, 'results'].mean()) # 이상
# 이상 징후 판별 (임계값 기반)
per = np.percentile(results, 1) # 가장 낮은 1% 점수 (임계값)
y_pred = results.copy()
y_pred = np.array(y_pred)
inds = (y_pred > per) # 임계값보다 크면 정상 (0)
inds_comp = (y_pred <= per) # 임계값 이하면 이상 (1)
y_pred[inds] = 0
y_pred[inds_comp] = 1
# 3. 평가
# 정확도, 정밀도, 재현율, F1-score
precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='binary')
print('Accuracy Score :', accuracy_score(y_test, y_pred))
print('Precision :', precision)
print('Recall :', recall)
print('F1 :', f1)
# 혼동 행렬
cm = confusion_matrix(y_test, y_pred) # 혼동 행렬
print('Confusion Matrix :')
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Greens):
plt.figure(figsize=(10, 10))
plt.imshow(cm, interpolation='nearest', cmap=cmap) # 이미지
plt.title(title)
plt.colorbar()
plt.tight_layout()
width, height = cm.shape
for x in range(width):
for y in range(height):
plt.annotate(str(cm[x][y]), xy=(y, x),
horizontalalignment='center',
verticalalignment='center')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
plot_confusion_matrix(cm)
# ROC 곡선
fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_test, y_pred) # ROC
auc_keras = auc(fpr_keras, tpr_keras) # AUC
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--') # 대각선
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.2f})'.format(auc_keras)) # ROC
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
ReLU (Rectified Linear Unit) 활성화 함수:
f(x) = max(0, x)LeakyReLU 레이어를 사용할 수도 있습니다.Tanh (Hyperbolic Tangent) 활성화 함수:
f(x) = (e^x - e^-x) / (e^x + e^-x)이 정도면 아주 상세한 설명이 되었을 것 같습니다. 더 궁금한 부분이나 수정이 필요한 부분이 있다면 언제든지 말씀해주세요!
좋습니다! 생략 없이, ReLU와 TanH 활성화 함수까지 포함하여 아주 자세하게 코드 설명을 다시 작성해 보겠습니다.
전체 코드 (Full Code with Detailed Explanations):
# Import system packages (시스템 패키지 임포트)
import os # 운영체제와 상호작용 (파일 경로 등)
import sys # 파이썬 인터프리터와 상호작용 (시스템 경로 등)
import time # 시간 관련 함수 (훈련 시간 측정 등)
import logging # 로깅 (훈련 과정 기록)
import importlib # 모듈 다시 로드 (개발 중 유용)
# Import data manipulation libraries (데이터 조작 라이브러리 임포트)
import numpy as np # 수치 연산 (배열, 행렬)
import pandas as pd # 데이터 분석 (DataFrame)
import collections # 데이터 구조 (Counter)
from tqdm import tqdm # 진행률 표시줄
# Import visualization libraries (시각화 라이브러리 임포트)
import matplotlib.pyplot as plt # 그래프 그리기
# Importing ML/DL libraries (머신러닝/딥러닝 라이브러리 임포트)
from sklearn.preprocessing import MinMaxScaler, label_binarize # 데이터 정규화, 레이블 이진화
from sklearn.model_selection import train_test_split # 데이터 분할 (훈련/테스트)
from sklearn.metrics import roc_curve, auc, precision_recall_fscore_support, average_precision_score # 평가 지표
from sklearn.metrics import precision_recall_curve, auc, confusion_matrix, accuracy_score # 평가 지표
import tensorflow as tf # 텐서플로우
from tensorflow import keras # 케라스 (텐서플로우 기반 딥러닝 API)
from tensorflow.keras import initializers # 가중치 초기화
from tensorflow.keras.models import Model, Sequential # 모델 (함수형, 순차형)
from tensorflow.keras.optimizers import Adam, RMSprop # 옵티마이저
from tensorflow.keras.callbacks import ModelCheckpoint # 콜백 (모델 저장 등)
from tensorflow.keras.layers import Dense, Dropout # 레이어 (밀집, 드롭아웃)
from tensorflow.keras.layers import LeakyReLU # 고급 활성화 함수 (Leaky ReLU)
from tensorflow.keras.layers import Input, BatchNormalization, ReLU, Dense, Reshape, Flatten, Activation # 다양한 레이어
from tensorflow.keras.layers import Dropout, multiply, GaussianNoise, MaxPooling2D, concatenate # 다양한 레이어
import pickle # 파이썬 객체 저장/로드
import random # 난수 생성
random.seed(123) # 난수 시드 고정 (재현성)
data_path = './data/' # 데이터 경로 설정 (필요에 따라 수정)
# 1. 데이터셋 (Dataset)
# loading the pickled file (pickle 파일 로드)
filename = data_path + 'preprocessed_data_full.pkl' # pickle 파일 경로
input_file = open(filename, 'rb') # 바이너리 읽기 모드('rb')로 파일 열기
preprocessed_data = pickle.load(input_file) # pickle 파일에서 데이터 로드
input_file.close() # 파일 닫기
# preprocessed_data 딕셔너리에서 데이터 추출
le = preprocessed_data['le'] # 레이블 인코더 (문자열 레이블을 숫자로 변환)
x_train = preprocessed_data['x_train'] # 훈련 데이터 (특징, feature)
y_train = preprocessed_data['y_train'] # 훈련 데이터 (레이블, label)
x_test = preprocessed_data['x_test'] # 테스트 데이터 (특징)
y_test = preprocessed_data['y_test'] # 테스트 데이터 (레이블)
# Converting labels to Binary (레이블을 이진 형식으로 변환, 0과 1)
y_test[y_test != 11] = 1 # 'normal'(11)이 아닌 레이블을 모두 1 (이상, anomaly)로 변경
y_test[y_test == 11] = 0 # 'normal' 레이블을 0 (정상, normal)으로 변경
y_train[y_train != 11] = 1 # 훈련 데이터도 동일하게 처리
y_train[y_train == 11] = 0
# Subsetting only Normal Network packets in our training set (훈련 세트에 정상 패킷만 남김)
temp_df = x_train.copy() # 훈련 데이터 복사 (원본 데이터 보존)
temp_df['label'] = y_train # 훈련 데이터에 레이블 열 추가
temp_df = temp_df.loc[temp_df['label'] == 0] # 레이블이 0 (정상)인 데이터만 선택
temp_df = temp_df.drop('label', axis=1) # 레이블 열 제거
x_train = temp_df.copy() # 정상 데이터만 남은 훈련 데이터
# Normalize the above splits using the MinMaxScaler (MinMaxScaler를 사용하여 데이터 정규화)
scaler = MinMaxScaler() # MinMaxScaler 객체 생성 (0과 1 사이로 값 변환)
# Make sure to only fit the scaler on the training data (훈련 데이터에만 scaler를 fit)
x_train = scaler.fit_transform(x_train) # 훈련 데이터 변환 (fit: 훈련 데이터의 최소/최대값 계산, transform: 변환)
x_test = scaler.transform(x_test) # 테스트 데이터 변환 (훈련 데이터에서 계산된 최소/최대값 사용)
# Creating dataset dictionary (데이터셋 딕셔너리 생성)
dataset = {}
dataset['x_train'] = x_train.astype(np.float32) # 훈련 데이터 (float32로 타입 변환, 메모리 절약)
dataset['y_train'] = y_train.astype(np.float32) # 훈련 레이블
dataset['x_test'] = x_test.astype(np.float32) # 테스트 데이터
dataset['y_test'] = y_test.astype(np.float32) # 테스트 레이블
# check how many anomalies are in our Testing set (테스트 세트의 이상 징후 개수 확인, 데이터 불균형 확인)
print('Number of Normal Network packets in the Training set:', x_train.shape[0]) # 훈련 세트 정상 패킷 수
print('Number of Normal Network packets in the Testing set:', collections.Counter(y_test)[0]) # 테스트 세트 정상 패킷 수
print('Number of Anomalous Network packets in the Testing set:', collections.Counter(y_test)[1]) # 테스트 세트 이상 패킷 수
# 2. Generative Adversarial Networks (GAN)
# 2.1 생성자 네트워크 (Generator Network)
def get_generator(optimizer): # 옵티마이저를 인자로 받는 생성자 함수
generator = Sequential() # 순차적으로 레이어를 쌓는 Sequential 모델 생성
# 첫 번째 Dense 레이어
generator.add(Dense(64, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42)))
# - 64개의 뉴런(유닛)
# - 입력 차원(input_dim): 114 (데이터의 특징 개수)
# - kernel_initializer: 가중치 초기화 방법 (glorot_normal, Xavier initialization)
generator.add(Activation('tanh')) # tanh 활성화 함수 적용
# - tanh: -1과 1 사이의 값을 출력 (sigmoid보다 gradient vanishing 문제 덜 발생)
# 두 번째 Dense 레이어
generator.add(Dense(128))
generator.add(Activation('tanh'))
# 세 번째 Dense 레이어
generator.add(Dense(256))
generator.add(Activation('tanh'))
# 네 번째 Dense 레이어
generator.add(Dense(256))
generator.add(Activation('tanh'))
# 다섯 번째 Dense 레이어
generator.add(Dense(512))
generator.add(Activation('tanh'))
# 출력 레이어
generator.add(Dense(114, activation='tanh'))
# - 114개의 뉴런 (입력 데이터와 동일한 차원)
# - tanh 활성화 함수 (생성된 데이터도 -1과 1 사이로 맞춤, MinMaxScaler와 호환)
generator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
# - loss: binary_crossentropy (이진 분류 손실 함수, 판별자를 속이는 데 사용)
# - optimizer: Adam, RMSprop 등 (가중치 업데이트 방법)
return generator # 생성자 모델 반환
# 2.2 판별자 네트워크 (Discriminator Network)
def get_discriminator(optimizer):
discriminator = Sequential()
# 첫 번째 Dense 레이어
discriminator.add(Dense(256, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42)))
discriminator.add(Activation('relu')) # ReLU 활성화 함수
# - ReLU (Rectified Linear Unit): 0보다 작으면 0, 0보다 크면 그대로 출력 (계산 효율, gradient vanishing 문제 완화)
discriminator.add(Dropout(0.2)) # 드롭아웃 (과적합 방지)
# - Dropout: 훈련 중에 무작위로 뉴런을 비활성화 (20% 비율)
# 두 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2)) # 드롭아웃
# 세 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2))# 드롭아웃
# 네 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu'))# ReLU
discriminator.add(Dropout(0.2))# 드롭아웃
# 다섯 번째 Dense 레이어
discriminator.add(Dense(128))
discriminator.add(Activation('relu')) # ReLU
discriminator.add(Dropout(0.2)) # 드롭아웃
# 출력 레이어
discriminator.add(Dense(1)) # 1개의 뉴런 (진짜/가짜 확률)
discriminator.add(Activation('sigmoid')) # sigmoid 활성화 함수
# - sigmoid: 0과 1 사이의 값을 출력 (확률)
discriminator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
# - loss: binary_crossentropy (진짜/가짜 이진 분류)
# - optimizer
return discriminator # 판별자 모델 반환
# 2.3 GAN 네트워크 구성
def get_gan_network(discriminator, generator, optimizer, input_dim=114):
discriminator.trainable = False # 판별자 가중치 고정 (생성자만 훈련)
gan_input = Input(shape=(input_dim,)) # GAN의 입력 (Input 레이어, 함수형 API)
x = generator(gan_input) # 생성자를 통과 (가짜 데이터 생성)
gan_output = discriminator(x) # 판별자를 통과 (진짜/가짜 확률)
gan = Model(inputs=gan_input, outputs=gan_output) # GAN 모델 생성 (입력, 출력)
gan.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일
return gan # GAN 모델 반환
# 2.4 하이퍼파라미터 설정
learning_rate = 0.00001 # 학습률 (작은 값, 안정적인 학습)
batch_size = 512 # 배치 크기 (한 번에 처리하는 데이터 샘플 수)
epochs = 10 # 에포크 수 (전체 데이터셋 반복 횟수)
adam = Adam(learning_rate=learning_rate, beta_1=0.5) # Adam 옵티마이저 (beta_1: 모멘텀 관련)
# 2.5 모델 훈련
x_train, y_train, x_test, y_test = dataset['x_train'], dataset['y_train'], dataset['x_test'], dataset['y_test'] # 데이터셋
batch_count = x_train.shape[0] // batch_size # 배치 수 계산
pbar = tqdm(total=epochs * batch_count) # 진행률 표시줄
gan_loss = [] # GAN 손실 기록
discriminator_loss = [] # 판별자 손실 기록
generator = get_generator(adam) # 생성자 모델
discriminator = get_discriminator(adam) # 판별자 모델
gan = get_gan_network(discriminator, generator, adam, input_dim=114) # GAN 모델
for epoch in range(epochs): # 에포크 반복
for index in range(batch_count): # 배치 반복
pbar.update(1) # 진행률 표시
# 1. 판별자 훈련 (Discriminator Training)
# 랜덤 노이즈 생성 (생성자 입력)
noise = np.random.normal(0, 1, size=[batch_size, 114]) # 정규 분포 (평균 0, 표준편차 1)
# 가짜 데이터 생성
generated_images = generator.predict_on_batch(noise)
# 진짜 데이터 (정상 데이터) 배치 가져오기
image_batch = x_train[index * batch_size: (index + 1) * batch_size]
# 진짜 데이터와 가짜 데이터 합치기 (판별자 입력)
X = np.vstack((generated_images, image_batch))
y_dis = np.ones(2 * batch_size) # 레이블 생성 (1: 진짜, 0: 가짜)
y_dis[:batch_size] = 0 # 앞부분(가짜)은 0
# 판별자 훈련
discriminator.trainable = True # 판별자 훈련 가능
d_loss = discriminator.train_on_batch(X, y_dis) # 판별자 훈련 (입력, 레이블)
# 2. 생성자 훈련 (Generator Training)
# 랜덤 노이즈 생성 (생성자 입력)
noise = np.random.uniform(0, 1, size=[batch_size, 114]) # 균등 분포 (0과 1 사이)
y_gen = np.ones(batch_size) # 생성자 훈련용 레이블 (모두 1, 판별자를 속이도록)
discriminator.trainable = False # 판별자 훈련 불가능 (고정)
g_loss = gan.train_on_batch(noise, y_gen) # GAN 훈련 (생성자만 훈련)
# 손실 기록
discriminator_loss.append(d_loss)
gan_loss.append(g_loss)
# 에포크별 손실 출력
print("Epoch %d Batch %d/%d [D loss: %f] [G loss:%f]" % (epoch, index, batch_count, d_loss, g_loss))
# 2.6 훈련 손실 시각화
fig, ax = plt.subplots()
plt.plot(discriminator_loss, label='Discriminator') # 판별자 손실
plt.plot(gan_loss, label='Generator') # 생성자 손실
plt.title("Training Losses")
plt.legend()
plt.show()
# 2.7 이상 탐지
nr_batches_test = np.ceil(x_test.shape[0] // batch_size).astype(np.int32) # 테스트 배치 수
results = [] # 판별자 예측 결과 (확률)
# 테스트 데이터에 대한 판별자 예측
for t in range(nr_batches_test + 1):
ran_from = t * batch_size
ran_to = (t + 1) * batch_size
image_batch = x_test[ran_from:ran_to]
tmp_rslt = discriminator.predict(x=image_batch, batch_size=128, verbose=0) # 예측
results = np.append(results, tmp_rslt)
# 정상/이상 데이터 평균 점수 계산 (확률)
pd.options.display.float_format = '{:20,.7f}'.format # 소수점 형식
results_df = pd.concat([pd.DataFrame(results), pd.DataFrame(y_test)], axis=1) # 결과, 레이블
results_df.columns = ['results', 'y_test'] # 열 이름
print('Mean score for normal packets :', results_df.loc[results_df['y_test'] == 0, 'results'].mean()) # 정상
print('Mean score for anomalous packets :', results_df.loc[results_df['y_test'] == 1, 'results'].mean()) # 이상
# 이상 징후 판별 (임계값 기반)
per = np.percentile(results, 1) # 가장 낮은 1% 점수 (임계값)
y_pred = results.copy()
y_pred = np.array(y_pred)
inds = (y_pred > per) # 임계값보다 크면 정상 (0)
inds_comp = (y_pred <= per) # 임계값 이하면 이상 (1)
y_pred[inds] = 0
y_pred[inds_comp] = 1
# 3. 평가
# 정확도, 정밀도, 재현율, F1-score
precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='binary')
print('Accuracy Score :', accuracy_score(y_test, y_pred))
print('Precision :', precision)
print('Recall :', recall)
print('F1 :', f1)
# 혼동 행렬
cm = confusion_matrix(y_test, y_pred) # 혼동 행렬
print('Confusion Matrix :')
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Greens):
plt.figure(figsize=(10, 10))
plt.imshow(cm, interpolation='nearest', cmap=cmap) # 이미지
plt.title(title)
plt.colorbar()
plt.tight_layout()
width, height = cm.shape
for x in range(width):
for y in range(height):
plt.annotate(str(cm[x][y]), xy=(y, x),
horizontalalignment='center',
verticalalignment='center')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
plot_confusion_matrix(cm)
# ROC 곡선
fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_test, y_pred) # ROC
auc_keras = auc(fpr_keras, tpr_keras) # AUC
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--') # 대각선
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.2f})'.format(auc_keras)) # ROC
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
ReLU (Rectified Linear Unit) 활성화 함수:
f(x) = max(0, x)LeakyReLU 레이어를 사용할 수도 있습니다.Tanh (Hyperbolic Tangent) 활성화 함수:
f(x) = (e^x - e^-x) / (e^x + e^-x)이 정도면 아주 상세한 설명이 되었을 것 같습니다. 더 궁금한 부분이나 수정이 필요한 부분이 있다면 언제든지 말씀해주세요!
이 랩은 Generative Adversarial Networks (GAN)를 사용하여 네트워크 데이터에서 이상 징후를 탐지하는 방법을 다룹니다. 이전 랩에서 다룬 지도 학습 (XGBoost) 및 비지도 학습 (Deep Autoencoders) 방법과 비교하여 GAN의 장점과 특징을 살펴봅니다.
핵심 개념:
랩 진행 순서:
1. 데이터셋 (Dataset)
# Import system packages (시스템 패키지 임포트)
import os
import sys
import time
import logging
import importlib
# Import data manipulation libraries (데이터 조작 라이브러리 임포트)
import numpy as np
import pandas as pd
import collections
from tqdm import tqdm
# Import visualization libraries (시각화 라이브러리 임포트)
import matplotlib.pyplot as plt
# Importing ML/DL libraries (머신러닝/딥러닝 라이브러리 임포트)
from sklearn.preprocessing import MinMaxScaler, label_binarize
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc, precision_recall_fscore_support, average_precision_score
from sklearn.metrics import precision_recall_curve, auc, confusion_matrix, accuracy_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import initializers
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Input, BatchNormalization, LeakyReLU, Dense, Reshape, Flatten, Activation
from tensorflow.keras.layers import Dropout, multiply, GaussianNoise, MaxPooling2D, concatenate
import pickle
import random
random.seed(123)
data_path = './data/' # 데이터 경로 설정 (필요에 따라 수정)
# loading the pickled file (pickle 파일 로드)
filename = data_path + 'preprocessed_data_full.pkl' # 파일명 설정
input_file = open(filename, 'rb') # 파일 열기 (read binary)
preprocessed_data = pickle.load(input_file) # pickle 데이터 로드
input_file.close() # 파일 닫기
# preprocessed_data 딕셔너리에서 데이터 추출
le = preprocessed_data['le'] # 레이블 인코더
x_train = preprocessed_data['x_train'] # 훈련 데이터 (특징)
y_train = preprocessed_data['y_train'] # 훈련 데이터 (레이블)
x_test = preprocessed_data['x_test'] # 테스트 데이터 (특징)
y_test = preprocessed_data['y_test'] # 테스트 데이터 (레이블)
# Converting labels to Binary (레이블을 이진 형식으로 변환)
y_test[y_test != 11] = 1 # 'normal'(11)이 아닌 레이블을 1 (이상)로 변경
y_test[y_test == 11] = 0 # 'normal' 레이블을 0 (정상)으로 변경
y_train[y_train != 11] = 1 # 'normal'(11)이 아닌 레이블을 1 (이상)로 변경
y_train[y_train == 11] = 0 # 'normal' 레이블을 0 (정상)으로 변경
# Subsetting only Normal Network packets in our training set (훈련 세트에 정상 패킷만 남김)
temp_df = x_train.copy() # 훈련 데이터 복사
temp_df['label'] = y_train # 레이블 열 추가
temp_df = temp_df.loc[temp_df['label'] == 0] # 정상 데이터(레이블 0)만 선택
temp_df = temp_df.drop('label', axis=1) # 레이블 열 제거
x_train = temp_df.copy() # 훈련 데이터 업데이트
# Normalize the above splits using the MinMaxScaler (MinMaxScaler를 사용하여 데이터 정규화)
scaler = MinMaxScaler()
# Make sure to only fit the scaler on the training data (훈련 데이터에만 scaler를 fit)
x_train = scaler.fit_transform(x_train) # 훈련 데이터 변환 (fit and transform)
x_test = scaler.transform(x_test) # 테스트 데이터 변환 (transform only)
# Creating dataset dictionary (데이터셋 딕셔너리 생성)
dataset = {}
dataset['x_train'] = x_train.astype(np.float32) # 훈련 데이터 (float32로 타입 변환)
dataset['y_train'] = y_train.astype(np.float32) # 훈련 레이블 (float32)
dataset['x_test'] = x_test.astype(np.float32) # 테스트 데이터 (float32)
dataset['y_test'] = y_test.astype(np.float32) # 테스트 레이블 (float32)
# check how many anomalies are in our Testing set (테스트 세트의 이상 징후 개수 확인)
print('Number of Normal Network packets in the Training set:', x_train.shape[0]) # 훈련 세트 정상 패킷 수
print('Number of Normal Network packets in the Testing set:', collections.Counter(y_test)[0]) # 테스트 세트 정상 패킷 수
print('Number of Anomalous Network packets in the Testing set:', collections.Counter(y_test)[1]) # 테스트 세트 이상 패킷 수
2. Generative Adversarial Networks (GAN)
GAN 소개: (위 설명과 동일)
2.1 생성자 네트워크(Generator Network)
# Defining the Generator network (생성자 네트워크 정의)
def get_generator(optimizer): # 옵티마이저를 인자로 받음
generator = Sequential() # Sequential 모델 생성
generator.add(Dense(64, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42))) # 첫 번째 Dense 레이어 (64개 유닛, 입력 차원 114, 가중치 초기화: glorot_normal)
generator.add(Activation('tanh')) # tanh 활성화 함수
generator.add(Dense(128)) # 두 번째 Dense 레이어 (128개 유닛)
generator.add(Activation('tanh')) # tanh 활성화 함수
generator.add(Dense(256)) # 세 번째 Dense 레이어 (256개 유닛)
generator.add(Activation('tanh')) # tanh 활성화 함수
generator.add(Dense(256)) # 네번째 Dense 레이어
generator.add(Activation('tanh'))
generator.add(Dense(512)) # 다섯 번째 Dense 레이어 (512개 유닛)
generator.add(Activation('tanh')) # tanh 활성화 함수
generator.add(Dense(114, activation='tanh')) # 출력 레이어 (114개 유닛, tanh 활성화 함수)
generator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일 (손실 함수: binary_crossentropy, 옵티마이저)
return generator # 생성자 모델 반환
# Defining the Discriminator network (판별자 네트워크 정의)
def get_discriminator(optimizer): # 옵티마이저를 인자로 받음
discriminator = Sequential() # Sequential 모델 생성
discriminator.add(Dense(256, input_dim=114, kernel_initializer=initializers.glorot_normal(seed=42))) # 첫 번째 Dense 레이어 (256개 유닛, 입력 차원 114, 가중치 초기화)
discriminator.add(Activation('relu')) # ReLU 활성화 함수
discriminator.add(Dropout(0.2)) # 드롭아웃 (20%)
discriminator.add(Dense(128)) # 두 번째 Dense 레이어 (128개 유닛)
discriminator.add(Activation('relu'))# ReLU 활성화 함수
discriminator.add(Dropout(0.2))# 드롭아웃 (20%)
discriminator.add(Dense(128)) # 세 번째 Dense 층
discriminator.add(Activation('relu'))
discriminator.add(Dropout(0.2))
discriminator.add(Dense(128)) # 네 번째 Dense 층
discriminator.add(Activation('relu'))
discriminator.add(Dropout(0.2))
discriminator.add(Dense(128)) # 다섯 번째 Dense 레이어 (128개 유닛)
discriminator.add(Activation('relu')) # ReLU 활성화 함수
discriminator.add(Dropout(0.2)) # 드롭아웃 (20%)
discriminator.add(Dense(1)) # 출력 레이어 (1개 유닛)
discriminator.add(Activation('sigmoid')) # sigmoid 활성화 함수
discriminator.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일 (손실 함수, 옵티마이저)
return discriminator # 판별자 모델 반환
# Function to build the GAN network (GAN 네트워크를 구성하는 함수)
def get_gan_network(discriminator, generator, optimizer, input_dim=114):
discriminator.trainable = False # 판별자의 가중치를 고정 (훈련되지 않도록)
gan_input = Input(shape=(input_dim,)) # GAN의 입력 (Input 레이어)
x = generator(gan_input) # 생성자를 통과
gan_output = discriminator(x) # 판별자를 통과 (최종 출력)
gan = Model(inputs=gan_input, outputs=gan_output) # GAN 모델 생성 (입력, 출력)
gan.compile(loss='binary_crossentropy', optimizer=optimizer) # 모델 컴파일 (손실 함수, 옵티마이저)
return gan # GAN 모델 반환
# Setting hyperparameters (하이퍼파라미터 설정)
learning_rate = 0.00001 # 학습률
batch_size = 512 # 배치 크기
epochs = 10 # 에포크 수
adam = Adam(learning_rate=learning_rate, beta_1=0.5) # Adam 옵티마이저 (학습률, beta_1 설정)
# Training the GAN (GAN 훈련)
x_train, y_train, x_test, y_test = dataset['x_train'], dataset['y_train'], dataset['x_test'], dataset['y_test'] # 데이터셋에서 데이터 추출
# Calculating the number of batches based on the batch size (배치 크기 기반으로 배치 수 계산)
batch_count = x_train.shape[0] // batch_size
pbar = tqdm(total=epochs * batch_count) # 진행률 표시줄 (tqdm)
gan_loss = [] # GAN 손실 기록
discriminator_loss = [] # 판별자 손실 기록
# Inititalizing the network (네트워크 초기화)
generator = get_generator(adam) # 생성자 모델 생성
discriminator = get_discriminator(adam) # 판별자 모델 생성
gan = get_gan_network(discriminator, generator, adam, input_dim=114) # GAN 모델 생성
for epoch in range(epochs): # 에포크 반복
for index in range(batch_count): # 배치 반복
pbar.update(1) # 진행률 표시줄 업데이트
# Creating a random set of input noise and images (랜덤 노이즈 생성)
noise = np.random.normal(0, 1, size=[batch_size, 114])
# Generate fake samples (가짜 샘플 생성)
generated_images = generator.predict_on_batch(noise) # 생성자를 사용하여 예측 (가짜 데이터 생성)
# Obtain a batch of normal network packets (정상 네트워크 패킷 배치 가져오기)
image_batch = x_train[index * batch_size: (index + 1) * batch_size]
# Stack real and fake data (진짜 데이터와 가짜 데이터 합치기)
X = np.vstack((generated_images, image_batch))
y_dis = np.ones(2 * batch_size) # 판별자 레이블 생성 (모두 1로 초기화)
y_dis[:batch_size] = 0 # 앞부분(가짜 데이터)은 0으로 설정
# Train discriminator (판별자 훈련)
discriminator.trainable = True # 판별자 훈련 가능하도록 설정
d_loss = discriminator.train_on_batch(X, y_dis) # 판별자 훈련 (입력, 레이블)
# Train generator (생성자 훈련)
noise = np.random.uniform(0, 1, size=[batch_size, 114]) # 균등 분포에서 노이즈 생성
y_gen = np.ones(batch_size) # 생성자 레이블 (모두 1)
discriminator.trainable = False # 판별자 훈련 불가능하도록 설정
g_loss = gan.train_on_batch(noise, y_gen) # GAN 훈련 (노이즈, 레이블)
# Record the losses (손실 기록)
discriminator_loss.append(d_loss)
gan_loss.append(g_loss)
# Print epoch loss (에포크 손실 출력)
print("Epoch %d Batch %d/%d [D loss: %f] [G loss:%f]" % (epoch, index, batch_count, d_loss, g_loss))
# Plot training losses (훈련 손실 그래프 그리기)
fig, ax = plt.subplots()
plt.plot(discriminator_loss, label='Discriminator') # 판별자 손실
plt.plot(gan_loss, label='Generator') # 생성자 손실
plt.title("Training Losses") # 제목
plt.legend() # 범례
plt.show()
# Predictions on the test set (테스트 세트에 대한 예측)
nr_batches_test = np.ceil(x_test.shape[0] // batch_size).astype(np.int32) # 테스트 배치 수 계산
results = [] # 예측 결과 저장
for t in range(nr_batches_test + 1): # 테스트 배치 반복
ran_from = t * batch_size
ran_to = (t + 1) * batch_size
image_batch = x_test[ran_from:ran_to]
tmp_rslt = discriminator.predict(x=image_batch, batch_size=128, verbose=0) # 판별자를 사용하여 예측
results = np.append(results, tmp_rslt) # 결과 추가
# Calculate mean scores (평균 점수 계산)
pd.options.display.float_format = '{:20,.7f}'.format
results_df = pd.concat([pd.DataFrame(results), pd.DataFrame(y_test)], axis=1) # 결과와 레이블 합치기
results_df.columns = ['results', 'y_test'] # 열 이름 설정
print('Mean score for normal packets :', results_df.loc[results_df['y_test'] == 0, 'results'].mean()) # 정상 패킷 평균 점수
print('Mean score for anomalous packets :', results_df.loc[results_df['y_test'] == 1, 'results'].mean()) # 이상 패킷 평균 점수
# Obtaining the lowest 1% score (가장 낮은 1% 점수 얻기)
per = np.percentile(results, 1) # 1% 백분위수 계산
y_pred = results.copy() # 예측 결과 복사
y_pred = np.array(y_pred) # numpy 배열로 변환
# Thresholding based on the score (점수 기반으로 임계값 처리)
inds = (y_pred > per) # 임계값보다 큰 값 (정상)
inds_comp = (y_pred <= per) # 임계값 이하 (이상)
y_pred[inds] = 0 # 정상은 0으로 설정
y_pred[inds_comp] = 1 # 이상은 1로 설정
3. 평가 (Evaluation)
# Calculate evaluation metrics (평가 지표 계산)
precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='binary') # 정밀도, 재현율, F1-score
print('Accuracy Score :', accuracy_score(y_test, y_pred)) # 정확도
print('Precision :', precision)
print('Recall :', recall)
print('F1 :', f1)
# Confusion Matrix (혼동 행렬)
cm = confusion_matrix(y_test, y_pred) # 혼동 행렬 계산
print('Confusion Matrix :')
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Greens): # 혼동 행렬 그리는 함수
plt.figure(figsize=(10, 10))
plt.imshow(cm, interpolation='nearest', cmap=cmap) # 이미지로 표시
plt.title(title) # 제목
plt.colorbar() # 컬러바
plt.tight_layout()
width, height = cm.shape
for x in range(width): # 행 반복
for y in range(height): # 열 반복
plt.annotate(str(cm[x][y]), xy=(y, x), # 값 표시
horizontalalignment='center',
verticalalignment='center')
plt.ylabel('True label') # y축 레이블
plt.xlabel('Predicted label')# x축 레이블
plt.show()
plot_confusion_matrix(cm) # 혼동 행렬 그리기
# ROC Curve (ROC 곡선)
fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_test, y_pred) # ROC 곡선 계산
auc_keras = auc(fpr_keras, tpr_keras) # AUC 계산
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--') # 대각선
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.2f})'.format(auc_keras)) # ROC 곡선
plt.xlabel('False positive rate') # x축 레이블
plt.ylabel('True positive rate') # y축 레이블
plt.title('ROC curve') # 제목
plt.legend(loc='best') # 범례
plt.show()
결론: (위 설명과 동일)
세 가지 방법 (XGBoost, Deep Autoencoders, GAN) 비교: (위 설명과 동일)
이상 징후의 희소성 처리: (위 설명과 동일)
이 코드 설명이 랩 내용을 이해하는 데 도움이 되었기를 바랍니다. 궁금한 점이 있으면 언제든지 질문해주세요.
이 랩은 Generative Adversarial Networks (GAN)를 사용하여 네트워크 데이터에서 이상 징후를 탐지하는 방법을 다룹니다. 이전 랩에서 다룬 지도 학습 (XGBoost) 및 비지도 학습 (Deep Autoencoders) 방법과 비교하여 GAN의 장점과 특징을 살펴봅니다.
핵심 개념:
랩 진행 순서:
1. 데이터셋 (Dataset)
MinMaxScaler를 사용하여 훈련 데이터를 0과 1 사이로 정규화(스케일링)합니다.MinMaxScaler로 테스트 데이터도 스케일링합니다.2. Generative Adversarial Networks (GAN)
GAN 소개: GAN은 두 개의 네트워크(생성자, 판별자)가 서로 경쟁하며 학습하는 모델입니다.
학습 목표:
최적 상태: 생성자가 만든 가짜 데이터가 진짜 데이터와 구별이 어려워 판별자가 더 이상 진짜/가짜를 구분할 수 없을때. (두 네트워크 모두 개선 불가 상태)
손실 함수(Loss Function): 이 랩에서는 이항 교차 엔트로피(binary cross-entropy)를 사용.
2.1 생성자 네트워크(Generator Network)
2.2 판별자 네트워크(Discriminator Network)
2.3 GAN 네트워크 구성
discriminator.trainable = False)2.4 하이퍼파라미터 설정
2.5 모델 훈련
2.6 훈련 손실 (Training Loss)
2.7 이상 탐지 (Anomaly Detection)
3. 평가 (Evaluation)
결론:
세 가지 방법 (XGBoost, Deep Autoencoders, GAN) 비교:
이상 징후의 희소성 처리:
