[AWS] DDoS 방어와 보안 모니터링 아키텍처 학습하기

궁금하면 500원·2024년 12월 13일
0

데브옵스

목록 보기
18/36

1. DDoS 공격 대응을 위한 다계층 방어 전략

1.1 Shield Advanced와 WAF 연동 구성

실제 재직했었던 회사와 현장에서 DDoS 공격 사례를 바탕으로,
효과적인 방어 아키텍처를 구성해보겠습니다.

1. 사례 분석: 게임 서비스 DDoS 공격 대응

1.1 2023년 12월, 某 게임 서비스에서 발생한 DDoS 공격 사례

  • 공격 유형: UDP Flood + HTTP GET Flood 복합 공격
  • 공격 규모: 초당 100만 요청
  • 피해 상황: 서비스 2시간 중단, 매출 손실 약 5천만원

1.2 2016년 11월, 대*지역 공공기관 콜센터 업무지원 및 서비스에서 발생한 DDoS 공격 사례

  • 공격 유형: HTTP 플러딩(대량의 HTTP 요청을 통해 서버를 과부하 상태로 만드는 방식).
  • 공격 규모: 초당 5만 건 이상의 HTTP 요청 발생, 네트워크 대역폭 90% 이상 점유.
  • 피해 상황: 홈페이지 게시글 작성 불가, 고객 민원 접수 지연, 하루 10만 건의 게시글 작성 서비스 완전 마비.

대응 아키텍처 구성

# AWS WAF 규칙 설정 예시 (AWS SDK for Python - Boto3 사용)
import boto3

def create_rate_based_rule(web_acl_id):
    waf = boto3.client('wafv2')
    
    # Rate limit 규칙 생성
    try:
        response = waf.create_rule_group(
            Name='GameServiceRateLimit',
            Scope='REGIONAL',
            Capacity=50,
            Rules=[
                {
                    'Name': 'IPRateLimit',
                    'Priority': 1,
                    'Statement': {
                        'RateBasedStatement': {
                            'Limit': 2000,  # IP당 5분에 2000개 요청으로 제한
                            'AggregateKeyType': 'IP'
                        }
                    },
                    'Action': {
                        'Block': {}
                    },
                    'VisibilityConfig': {
                        'SampledRequestsEnabled': True,
                        'CloudWatchMetricsEnabled': True,
                        'MetricName': 'IPRateLimitMetric'
                    }
                }
            ],
            VisibilityConfig={
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'GameServiceRateLimit'
            }
        )
        print("Rule group created successfully")
        return response
    except Exception as e:
        print(f"Error creating rule group: {e}")
        return None

1.2 CloudWatch와 Lambda를 활용한 자동 차단 시스템

# Lambda 함수: 비정상 트래픽 감지 및 자동 차단
import boto3
import json
from datetime import datetime, timedelta

def lambda_handler(event, context):
    cloudwatch = boto3.client('cloudwatch')
    waf = boto3.client('wafv2')
    
    # CloudWatch에서 요청 수 메트릭 조회
    response = cloudwatch.get_metric_data(
        MetricDataQueries=[
            {
                'Id': 'requests',
                'MetricStat': {
                    'Metric': {
                        'Namespace': 'AWS/ApplicationELB',
                        'MetricName': 'RequestCount',
                        'Dimensions': [
                            {
                                'Name': 'LoadBalancer',
                                'Value': 'app/game-service/1234567890'
                            }
                        ]
                    },
                    'Period': 60,
                    'Stat': 'Sum'
                }
            }
        ],
        StartTime=datetime.utcnow() - timedelta(minutes=5),
        EndTime=datetime.utcnow()
    )
    
    # 임계값 초과시 WAF 규칙 업데이트
    if max(response['MetricDataResults'][0]['Values']) > 50000:  # 임계값: 분당 5만 요청
        update_waf_rule(waf, "신규 차단 규칙")

def update_waf_rule(waf, rule_name):
    # WAF 규칙 업데이트 로직
    pass

2. 보안 감사 및 로그 분석 시스템

2.1 CloudTrail과 Elasticsearch를 활용한 보안 로그 분석

# Elasticsearch에 CloudTrail 로그 적재 Lambda 함수
from elasticsearch import Elasticsearch
import boto3
import json

def lambda_handler(event, context):
    es = Elasticsearch(['your-elasticsearch-endpoint'])
    
    for record in event['Records']:
        # CloudTrail 로그 파싱
        cloudtrail_event = json.loads(record['Sns']['Message'])
        
        # 의심스러운 패턴 검사
        if is_suspicious_activity(cloudtrail_event):
            # Elasticsearch에 로그 저장
            es.index(
                index='security-alerts',
                body={
                    'timestamp': cloudtrail_event['eventTime'],
                    'eventName': cloudtrail_event['eventName'],
                    'sourceIPAddress': cloudtrail_event['sourceIPAddress'],
                    'userIdentity': cloudtrail_event['userIdentity'],
                    'severity': 'HIGH'
                }
            )
            
            # 관리자에게 알림 전송
            send_alert(cloudtrail_event)

def is_suspicious_activity(event):
    suspicious_patterns = [
        'DeleteSecurityGroup',
        'UpdateFunctionCode',
        'PutBucketPolicy'
    ]
    return event['eventName'] in suspicious_patterns

2.2 실시간 보안 대시보드 구현

# Kibana 대시보드용 데이터 가공 Lambda 함수
def process_security_metrics(event, context):
    es = Elasticsearch(['your-elasticsearch-endpoint'])
    
    # 최근 24시간 보안 이벤트 집계
    response = es.search(
        index='security-alerts',
        body={
            'query': {
                'range': {
                    'timestamp': {
                        'gte': 'now-24h'
                    }
                }
            },
            'aggs': {
                'security_events': {
                    'terms': {
                        'field': 'eventName.keyword'
                    }
                }
            }
        }
    )
    
    # 대시보드 데이터 가공
    dashboard_data = process_dashboard_data(response)
    
    # S3에 결과 저장
    save_to_s3(dashboard_data)

3. 장애 복구 및 비상 대응 계획

3.1 자동 복구 시스템 구현

# Auto Scaling Group 상태 모니터링 및 복구 Lambda 함수
def monitor_asg_health(event, context):
    asg = boto3.client('autoscaling')
    ec2 = boto3.client('ec2')
    
    response = asg.describe_auto_scaling_groups(
        AutoScalingGroupNames=['game-service-asg']
    )
    
    unhealthy_instances = []
    for instance in response['AutoScalingGroups'][0]['Instances']:
        if instance['HealthStatus'] != 'Healthy':
            unhealthy_instances.append(instance['InstanceId'])
    
    if unhealthy_instances:
        # 비정상 인스턴스 종료 및 교체
        asg.terminate_instance_in_auto_scaling_group(
            InstanceId=unhealthy_instances[0],
            ShouldDecrementDesiredCapacity=False
        )

3.2 비상 연락망 및 대응 프로세스 자동화

# SNS를 활용한 비상 연락망 시스템
def alert_emergency_contacts(event, context):
    sns = boto3.client('sns')
    
    # 심각도에 따른 알림 대상 결정
    severity = event['severity']
    if severity == 'CRITICAL':
        topic_arn = 'arn:aws:sns:region:account:emergency-team'
    else:
        topic_arn = 'arn:aws:sns:region:account:support-team'
    
    # 알림 메시지 생성
    message = create_alert_message(event)
    
    # SNS 토픽 발행
    sns.publish(
        TopicArn=topic_arn,
        Message=message,
        Subject=f'[{severity}] Security Alert'
    )

4. 보안 성능 측정 및 최적화

4.1 보안 메트릭 수집 및 분석

# 보안 성능 측정 Lambda 함수
def collect_security_metrics(event, context):
    cloudwatch = boto3.client('cloudwatch')
    
    # WAF 차단 비율 계산
    blocked_requests = get_waf_blocked_requests()
    total_requests = get_total_requests()
    block_rate = (blocked_requests / total_requests) * 100
    
    # CloudWatch에 메트릭 게시
    cloudwatch.put_metric_data(
        Namespace='CustomSecurityMetrics',
        MetricData=[
            {
                'MetricName': 'WAFBlockRate',
                'Value': block_rate,
                'Unit': 'Percent'
            }
        ]
    )

구현 시 주의사항

1. 성능 고려사항

  • WAF 규칙은 최소한으로 유지 (처리 지연 발생 가능)
  • CloudWatch 메트릭 수집 주기 최적화 (비용 고려)
  • Elasticsearch 인덱스 관리 및 보관 주기 설정

2. 비용 최적화

  • Shield Advanced 사용 시 월 3,000 USD 비용 발생
  • WAF 규칙 수에 따른 추가 비용 고려
  • CloudWatch 로그 보관 기간 최적화

3. 규정 준수

  • GDPR, HIPAA 등 규정에 따른 로그 저장 기간 준수
  • 개인정보 처리 관련 암호화 적용
  • 정기적인 보안 감사 및 리포트 생성

결론

이러한 심화된 보안 아키텍처를 통해 다음과 같은 효과를 얻을 수 있습니다:

  • 실시간 위협 감지 및 대응
  • 자동화된 보안 운영
  • 비용 효율적인 보안 관리
  • 규정 준수 및 감사 대응

향후 발전 방향으로는 AI/ML을 활용한 이상 탐지, 컨테이너 보안 강화, Zero Trust 아키텍처 도입 등을 고려할 수 있습니다.

profile
꾸준히, 의미있는 사이드 프로젝트 경험과 문제해결 과정을 기록하기 위한 공간입니다.

1개의 댓글

comment-user-thumbnail
2025년 6월 26일

감사합니다 ㅠㅠ

답글 달기