
2024년 초, 한 온라인 쇼핑몰이 새해 대규모 할인 행사 중 DDoS 공격을 받았습니다.
초당 수십만 건의 요청이 발생하여 서비스가 불안정해졌고, 정상적인 고객들의 접속이 어려워졌습니다.
import boto3
import json
def create_waf_rate_rule():
waf = boto3.client('wafv2')
# Rate-based rule 생성
rate_rule = {
'Name': 'IPRateLimit',
'Priority': 1,
'Statement': {
'RateBasedStatement': {
'Limit': 2000, # 5분당 IP당 최대 요청 수
'AggregateKeyType': 'IP'
}
},
'Action': {
'Block': {}
},
'VisibilityConfig': {
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'IPRateLimitMetric'
}
}
return waf.create_rule_group(
Name='DDoSProtection',
Scope='REGIONAL',
Capacity=1000,
Rules=[rate_rule]
)
def create_ddos_alarm():
cloudwatch = boto3.client('cloudwatch')
response = cloudwatch.put_metric_alarm(
AlarmName='HighRequestRate',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='RequestCount',
Namespace='AWS/ApplicationELB',
Period=300,
Statistic='Sum',
Threshold=10000,
AlarmDescription='DDoS 의심 트래픽 감지',
AlarmActions=['arn:aws:sns:region:account-id:alert-topic']
)
return response
def create_sql_injection_protection():
waf = boto3.client('wafv2')
sql_rule = {
'Name': 'SQLInjectionProtection',
'Priority': 2,
'Statement': {
'SqliMatchStatement': {
'FieldToMatch': {
'QueryString': {}
},
'TextTransformations': [{
'Priority': 1,
'Type': 'URL_DECODE'
}]
}
},
'Action': {
'Block': {}
},
'VisibilityConfig': {
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'SQLInjectionAttempts'
}
}
return waf.create_rule_group(
Name='SQLInjectionProtection',
Scope='REGIONAL',
Capacity=1000,
Rules=[sql_rule]
)
def block_malicious_user_agents():
waf = boto3.client('wafv2')
bad_bots = [
'curl*', 'python-requests*', 'wget*',
'scanbot*', 'sqlmap*', 'nikto*'
]
user_agent_rule = {
'Name': 'BlockMaliciousUserAgents',
'Priority': 3,
'Statement': {
'ByteMatchStatement': {
'SearchString': json.dumps(bad_bots),
'FieldToMatch': {
'SingleHeader': {
'Name': 'user-agent'
}
},
'TextTransformations': [{
'Priority': 1,
'Type': 'NONE'
}],
'PositionalConstraint': 'CONTAINS'
}
},
'Action': {
'Block': {}
},
'VisibilityConfig': {
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'MaliciousUserAgents'
}
}
return user_agent_rule
import boto3
import json
import time
def lambda_handler(event, context):
# CloudWatch 로그에서 의심스러운 IP 추출
suspicious_ips = analyze_logs(event)
# NACL 업데이트
update_nacl(suspicious_ips)
# SNS 알림 발송
send_notification(suspicious_ips)
def analyze_logs(event):
logs = boto3.client('logs')
# 최근 5분간의 로그 분석
response = logs.filter_log_events(
logGroupName='/aws/applicationelb/app-lb',
filterPattern='[timestamp, sourceIP, requestMethod, statusCode]',
startTime=int((time.time() - 300) * 1000),
endTime=int(time.time() * 1000)
)
suspicious_ips = []
ip_count = {}
for event in response['events']:
ip = event['message'].split()[1]
ip_count[ip] = ip_count.get(ip, 0) + 1
# 임계값 초과 IP 탐지
if ip_count[ip] > 1000: # 5분간 1000회 이상 요청
suspicious_ips.append(ip)
return suspicious_ips
def update_nacl(suspicious_ips):
ec2 = boto3.client('ec2')
for idx, ip in enumerate(suspicious_ips, start=1):
response = ec2.create_network_acl_entry(
NetworkAclId='acl-12345',
RuleNumber=100 + idx,
Protocol='-1',
RuleAction='deny',
Egress=False,
CidrBlock=f'{ip}/32'
)
import pandas as pd
import boto3
from datetime import datetime, timedelta
def analyze_vpc_flow_logs():
# VPC Flow Logs 데이터 가져오기
client = boto3.client('logs')
response = client.filter_log_events(
logGroupName='/aws/vpc/flow-logs',
startTime=int((datetime.now() - timedelta(hours=1)).timestamp() * 1000),
endTime=int(datetime.now().timestamp() * 1000)
)
# 데이터 프레임으로 변환
records = []
for event in response['events']:
fields = event['message'].split()
if len(fields) >= 13:
records.append({
'timestamp': fields[0],
'source_ip': fields[3],
'dest_ip': fields[4],
'source_port': fields[5],
'dest_port': fields[6],
'protocol': fields[7],
'packets': int(fields[8]),
'bytes': int(fields[9])
})
df = pd.DataFrame(records)
# 통계 분석
traffic_by_ip = df.groupby('source_ip').agg({
'bytes': 'sum',
'packets': 'count'
}).sort_values('bytes', ascending=False)
return traffic_by_ip.head(10) # Top 10 트래픽 소스
def configure_shield_advanced():
shield = boto3.client('shield')
# DDoS 방어 활성화
response = shield.create_protection(
Name='WebApplicationProtection',
ResourceArn='arn:aws:elasticloadbalancing:region:account-id:loadbalancer/app/my-alb/1234567890'
)
# 보호 그룹 생성
protection_group = shield.create_protection_group(
ProtectionGroupId='WebAppGroup',
Pattern='ALL',
ResourceType='APPLICATION_LOAD_BALANCER',
Members=['arn:aws:elasticloadbalancing:region:account-id:loadbalancer/app/my-alb/1234567890']
)
return response, protection_group
def create_security_dashboard():
cloudwatch = boto3.client('cloudwatch')
dashboard_body = {
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["AWS/ApplicationELB", "RequestCount", "LoadBalancer", "app/my-alb/1234567890"],
["AWS/WAF", "BlockedRequests", "WebACL", "my-web-acl"],
["AWS/Shield", "DDoSDetected", "ResourceArn", "arn:aws:elasticloadbalancing:region:account-id:loadbalancer/app/my-alb/1234567890"]
],
"period": 300,
"stat": "Sum",
"region": "ap-northeast-2",
"title": "보안 모니터링 대시보드"
}
}
]
}
response = cloudwatch.put_dashboard(
DashboardName='SecurityMonitoring',
DashboardBody=json.dumps(dashboard_body)
)
return response
이러한 심화된 보안 아키텍처를 구현함으로써 다음과 같은 이점을 얻을 수 있습니다
운영 환경에서는 이러한 보안 조치들을 단계적으로 구현하고, 지속적으로 모니터링하며 개선해 나가는 것이 중요합니다.
또한, 정기적인 보안 감사와 취약점 분석을 통해 새로운 위협에 대비해야 합니다.