
실제 재직했었던 회사와 현장에서 DDoS 공격 사례를 바탕으로,
효과적인 방어 아키텍처를 구성해보겠습니다.
1.1 2023년 12월, 某 게임 서비스에서 발생한 DDoS 공격 사례
1.2 2016년 11월, 대*지역 공공기관 콜센터 업무지원 및 서비스에서 발생한 DDoS 공격 사례
# AWS WAF 규칙 설정 예시 (AWS SDK for Python - Boto3 사용)
import boto3
def create_rate_based_rule(web_acl_id):
waf = boto3.client('wafv2')
# Rate limit 규칙 생성
try:
response = waf.create_rule_group(
Name='GameServiceRateLimit',
Scope='REGIONAL',
Capacity=50,
Rules=[
{
'Name': 'IPRateLimit',
'Priority': 1,
'Statement': {
'RateBasedStatement': {
'Limit': 2000, # IP당 5분에 2000개 요청으로 제한
'AggregateKeyType': 'IP'
}
},
'Action': {
'Block': {}
},
'VisibilityConfig': {
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'IPRateLimitMetric'
}
}
],
VisibilityConfig={
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'GameServiceRateLimit'
}
)
print("Rule group created successfully")
return response
except Exception as e:
print(f"Error creating rule group: {e}")
return None
# Lambda 함수: 비정상 트래픽 감지 및 자동 차단
import boto3
import json
from datetime import datetime, timedelta
def lambda_handler(event, context):
cloudwatch = boto3.client('cloudwatch')
waf = boto3.client('wafv2')
# CloudWatch에서 요청 수 메트릭 조회
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'requests',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/ApplicationELB',
'MetricName': 'RequestCount',
'Dimensions': [
{
'Name': 'LoadBalancer',
'Value': 'app/game-service/1234567890'
}
]
},
'Period': 60,
'Stat': 'Sum'
}
}
],
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow()
)
# 임계값 초과시 WAF 규칙 업데이트
if max(response['MetricDataResults'][0]['Values']) > 50000: # 임계값: 분당 5만 요청
update_waf_rule(waf, "신규 차단 규칙")
def update_waf_rule(waf, rule_name):
# WAF 규칙 업데이트 로직
pass
# Elasticsearch에 CloudTrail 로그 적재 Lambda 함수
from elasticsearch import Elasticsearch
import boto3
import json
def lambda_handler(event, context):
es = Elasticsearch(['your-elasticsearch-endpoint'])
for record in event['Records']:
# CloudTrail 로그 파싱
cloudtrail_event = json.loads(record['Sns']['Message'])
# 의심스러운 패턴 검사
if is_suspicious_activity(cloudtrail_event):
# Elasticsearch에 로그 저장
es.index(
index='security-alerts',
body={
'timestamp': cloudtrail_event['eventTime'],
'eventName': cloudtrail_event['eventName'],
'sourceIPAddress': cloudtrail_event['sourceIPAddress'],
'userIdentity': cloudtrail_event['userIdentity'],
'severity': 'HIGH'
}
)
# 관리자에게 알림 전송
send_alert(cloudtrail_event)
def is_suspicious_activity(event):
suspicious_patterns = [
'DeleteSecurityGroup',
'UpdateFunctionCode',
'PutBucketPolicy'
]
return event['eventName'] in suspicious_patterns
# Kibana 대시보드용 데이터 가공 Lambda 함수
def process_security_metrics(event, context):
es = Elasticsearch(['your-elasticsearch-endpoint'])
# 최근 24시간 보안 이벤트 집계
response = es.search(
index='security-alerts',
body={
'query': {
'range': {
'timestamp': {
'gte': 'now-24h'
}
}
},
'aggs': {
'security_events': {
'terms': {
'field': 'eventName.keyword'
}
}
}
}
)
# 대시보드 데이터 가공
dashboard_data = process_dashboard_data(response)
# S3에 결과 저장
save_to_s3(dashboard_data)
# Auto Scaling Group 상태 모니터링 및 복구 Lambda 함수
def monitor_asg_health(event, context):
asg = boto3.client('autoscaling')
ec2 = boto3.client('ec2')
response = asg.describe_auto_scaling_groups(
AutoScalingGroupNames=['game-service-asg']
)
unhealthy_instances = []
for instance in response['AutoScalingGroups'][0]['Instances']:
if instance['HealthStatus'] != 'Healthy':
unhealthy_instances.append(instance['InstanceId'])
if unhealthy_instances:
# 비정상 인스턴스 종료 및 교체
asg.terminate_instance_in_auto_scaling_group(
InstanceId=unhealthy_instances[0],
ShouldDecrementDesiredCapacity=False
)
# SNS를 활용한 비상 연락망 시스템
def alert_emergency_contacts(event, context):
sns = boto3.client('sns')
# 심각도에 따른 알림 대상 결정
severity = event['severity']
if severity == 'CRITICAL':
topic_arn = 'arn:aws:sns:region:account:emergency-team'
else:
topic_arn = 'arn:aws:sns:region:account:support-team'
# 알림 메시지 생성
message = create_alert_message(event)
# SNS 토픽 발행
sns.publish(
TopicArn=topic_arn,
Message=message,
Subject=f'[{severity}] Security Alert'
)
# 보안 성능 측정 Lambda 함수
def collect_security_metrics(event, context):
cloudwatch = boto3.client('cloudwatch')
# WAF 차단 비율 계산
blocked_requests = get_waf_blocked_requests()
total_requests = get_total_requests()
block_rate = (blocked_requests / total_requests) * 100
# CloudWatch에 메트릭 게시
cloudwatch.put_metric_data(
Namespace='CustomSecurityMetrics',
MetricData=[
{
'MetricName': 'WAFBlockRate',
'Value': block_rate,
'Unit': 'Percent'
}
]
)
1. 성능 고려사항
2. 비용 최적화
3. 규정 준수
이러한 심화된 보안 아키텍처를 통해 다음과 같은 효과를 얻을 수 있습니다:
향후 발전 방향으로는 AI/ML을 활용한 이상 탐지, 컨테이너 보안 강화, Zero Trust 아키텍처 도입 등을 고려할 수 있습니다.
감사합니다 ㅠㅠ