[AWS] 보안 아키텍처 다층 방어와 자동화 해결하기

궁금하면 500원·2024년 12월 16일

데브옵스

목록 보기
21/37

AWS 보안 아키텍처 실전 시나리오와 해결 방안

1. DDoS 공격 대응을 위한 다층 방어 전략

1.1 시나리오: 대규모 DDoS 공격 발생

2024년 초, 한 온라인 쇼핑몰이 새해 대규모 할인 행사 중 DDoS 공격을 받았습니다.

초당 수십만 건의 요청이 발생하여 서비스가 불안정해졌고, 정상적인 고객들의 접속이 어려워졌습니다.

1.2 아키텍처 구성

import boto3
import json

def create_waf_rate_rule():
    waf = boto3.client('wafv2')
    
    # Rate-based rule 생성
    rate_rule = {
        'Name': 'IPRateLimit',
        'Priority': 1,
        'Statement': {
            'RateBasedStatement': {
                'Limit': 2000,  # 5분당 IP당 최대 요청 수
                'AggregateKeyType': 'IP'
            }
        },
        'Action': {
            'Block': {}
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'IPRateLimitMetric'
        }
    }
    
    return waf.create_rule_group(
        Name='DDoSProtection',
        Scope='REGIONAL',
        Capacity=1000,
        Rules=[rate_rule]
    )

1.3 CloudWatch 경보 설정

def create_ddos_alarm():
    cloudwatch = boto3.client('cloudwatch')
    
    response = cloudwatch.put_metric_alarm(
        AlarmName='HighRequestRate',
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=2,
        MetricName='RequestCount',
        Namespace='AWS/ApplicationELB',
        Period=300,
        Statistic='Sum',
        Threshold=10000,
        AlarmDescription='DDoS 의심 트래픽 감지',
        AlarmActions=['arn:aws:sns:region:account-id:alert-topic']
    )
    return response

2. WAF를 활용한 지능형 위협 탐지 및 차단

2.1 SQL Injection 방어 규칙

def create_sql_injection_protection():
    waf = boto3.client('wafv2')
    
    sql_rule = {
        'Name': 'SQLInjectionProtection',
        'Priority': 2,
        'Statement': {
            'SqliMatchStatement': {
                'FieldToMatch': {
                    'QueryString': {}
                },
                'TextTransformations': [{
                    'Priority': 1,
                    'Type': 'URL_DECODE'
                }]
            }
        },
        'Action': {
            'Block': {}
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'SQLInjectionAttempts'
        }
    }
    
    return waf.create_rule_group(
        Name='SQLInjectionProtection',
        Scope='REGIONAL',
        Capacity=1000,
        Rules=[sql_rule]
    )

2.2 악성 User-Agent 차단

def block_malicious_user_agents():
    waf = boto3.client('wafv2')
    
    bad_bots = [
        'curl*', 'python-requests*', 'wget*',
        'scanbot*', 'sqlmap*', 'nikto*'
    ]
    
    user_agent_rule = {
        'Name': 'BlockMaliciousUserAgents',
        'Priority': 3,
        'Statement': {
            'ByteMatchStatement': {
                'SearchString': json.dumps(bad_bots),
                'FieldToMatch': {
                    'SingleHeader': {
                        'Name': 'user-agent'
                    }
                },
                'TextTransformations': [{
                    'Priority': 1,
                    'Type': 'NONE'
                }],
                'PositionalConstraint': 'CONTAINS'
            }
        },
        'Action': {
            'Block': {}
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'MaliciousUserAgents'
        }
    }
    
    return user_agent_rule

3. CloudWatch와 Lambda를 활용한 자동화된 보안 대응

3.1 자동 IP 차단 Lambda 함수

import boto3
import json
import time

def lambda_handler(event, context):
    # CloudWatch 로그에서 의심스러운 IP 추출
    suspicious_ips = analyze_logs(event)
    
    # NACL 업데이트
    update_nacl(suspicious_ips)
    
    # SNS 알림 발송
    send_notification(suspicious_ips)

def analyze_logs(event):
    logs = boto3.client('logs')
    
    # 최근 5분간의 로그 분석
    response = logs.filter_log_events(
        logGroupName='/aws/applicationelb/app-lb',
        filterPattern='[timestamp, sourceIP, requestMethod, statusCode]',
        startTime=int((time.time() - 300) * 1000),
        endTime=int(time.time() * 1000)
    )
    
    suspicious_ips = []
    ip_count = {}
    
    for event in response['events']:
        ip = event['message'].split()[1]
        ip_count[ip] = ip_count.get(ip, 0) + 1
        
        # 임계값 초과 IP 탐지
        if ip_count[ip] > 1000:  # 5분간 1000회 이상 요청
            suspicious_ips.append(ip)
    
    return suspicious_ips

def update_nacl(suspicious_ips):
    ec2 = boto3.client('ec2')
    
    for idx, ip in enumerate(suspicious_ips, start=1):
        response = ec2.create_network_acl_entry(
            NetworkAclId='acl-12345',
            RuleNumber=100 + idx,
            Protocol='-1',
            RuleAction='deny',
            Egress=False,
            CidrBlock=f'{ip}/32'
        )

4. VPC Flow Logs를 활용한 네트워크 트래픽 분석

4.1 트래픽 분석 스크립트

import pandas as pd
import boto3
from datetime import datetime, timedelta

def analyze_vpc_flow_logs():
    # VPC Flow Logs 데이터 가져오기
    client = boto3.client('logs')
    
    response = client.filter_log_events(
        logGroupName='/aws/vpc/flow-logs',
        startTime=int((datetime.now() - timedelta(hours=1)).timestamp() * 1000),
        endTime=int(datetime.now().timestamp() * 1000)
    )
    
    # 데이터 프레임으로 변환
    records = []
    for event in response['events']:
        fields = event['message'].split()
        if len(fields) >= 13:
            records.append({
                'timestamp': fields[0],
                'source_ip': fields[3],
                'dest_ip': fields[4],
                'source_port': fields[5],
                'dest_port': fields[6],
                'protocol': fields[7],
                'packets': int(fields[8]),
                'bytes': int(fields[9])
            })
    
    df = pd.DataFrame(records)
    
    # 통계 분석
    traffic_by_ip = df.groupby('source_ip').agg({
        'bytes': 'sum',
        'packets': 'count'
    }).sort_values('bytes', ascending=False)
    
    return traffic_by_ip.head(10)  # Top 10 트래픽 소스

5. AWS Shield Advanced를 통한 대규모 공격 방어

5.1 Shield Advanced 설정

def configure_shield_advanced():
    shield = boto3.client('shield')
    
    # DDoS 방어 활성화
    response = shield.create_protection(
        Name='WebApplicationProtection',
        ResourceArn='arn:aws:elasticloadbalancing:region:account-id:loadbalancer/app/my-alb/1234567890'
    )
    
    # 보호 그룹 생성
    protection_group = shield.create_protection_group(
        ProtectionGroupId='WebAppGroup',
        Pattern='ALL',
        ResourceType='APPLICATION_LOAD_BALANCER',
        Members=['arn:aws:elasticloadbalancing:region:account-id:loadbalancer/app/my-alb/1234567890']
    )
    
    return response, protection_group

6. 실전 구현 예제 및 베스트 프랙티스

6.1 통합 보안 모니터링 대시보드

def create_security_dashboard():
    cloudwatch = boto3.client('cloudwatch')
    
    dashboard_body = {
        "widgets": [
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/ApplicationELB", "RequestCount", "LoadBalancer", "app/my-alb/1234567890"],
                        ["AWS/WAF", "BlockedRequests", "WebACL", "my-web-acl"],
                        ["AWS/Shield", "DDoSDetected", "ResourceArn", "arn:aws:elasticloadbalancing:region:account-id:loadbalancer/app/my-alb/1234567890"]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "ap-northeast-2",
                    "title": "보안 모니터링 대시보드"
                }
            }
        ]
    }
    
    response = cloudwatch.put_dashboard(
        DashboardName='SecurityMonitoring',
        DashboardBody=json.dumps(dashboard_body)
    )
    
    return response

결론

이러한 심화된 보안 아키텍처를 구현함으로써 다음과 같은 이점을 얻을 수 있습니다

  1. 실시간 위협 탐지 및 대응
  2. 자동화된 보안 조치
  3. 상세한 보안 분석 및 모니터링
  4. 다층적 방어 체계 구축

운영 환경에서는 이러한 보안 조치들을 단계적으로 구현하고, 지속적으로 모니터링하며 개선해 나가는 것이 중요합니다.

또한, 정기적인 보안 감사와 취약점 분석을 통해 새로운 위협에 대비해야 합니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글