
import boto3
def setup_cloudwatch_alarm():
cloudwatch = boto3.client('cloudwatch')
# EC2 상태 체크 경보 생성
response = cloudwatch.put_metric_alarm(
AlarmName='EC2HealthCheck',
AlarmDescription='EC2 인스턴스 상태 체크',
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:ap-northeast-2:123456789012:EC2HealthCheckTopic' # SNS 주제 ARN
],
MetricName='StatusCheckFailed',
Namespace='AWS/EC2',
Statistic='Maximum',
Dimensions=[
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0' # 모니터링할 EC2 인스턴스 ID
}
],
Period=60, # 60초마다 체크
EvaluationPeriods=3, # 3회 연속으로 실패하면 경보
Threshold=1,
ComparisonOperator='GreaterThanThreshold'
)
return response
import boto3
import time
from datetime import datetime
def lambda_handler(event, context):
ec2 = boto3.client('ec2')
# 설정값
MAIN_INSTANCE_ID = 'i-1234567890abcdef0'
STANDBY_INSTANCE_ID = 'i-0987654321fedcba0'
EIP_ALLOCATION_ID = 'eipalloc-1234567890abcdef0'
try:
# 1. EIP 현재 상태 확인
eip_response = ec2.describe_addresses(
AllocationIds=[EIP_ALLOCATION_ID]
)
# 2. 기존 EC2에서 EIP 연결 해제
if 'InstanceId' in eip_response['Addresses'][0]:
ec2.disassociate_address(
AssociationId=eip_response['Addresses'][0]['AssociationId']
)
# 3. 대기 중인 EC2에 EIP 연결
ec2.associate_address(
AllocationId=EIP_ALLOCATION_ID,
InstanceId=STANDBY_INSTANCE_ID
)
# 4. 장애 발생한 EC2 종료
ec2.terminate_instances(
InstanceIds=[MAIN_INSTANCE_ID]
)
# 5. 새로운 대기 EC2 생성
new_instance_response = ec2.run_instances(
ImageId='ami-1234567890abcdef0', # AMI ID
InstanceType='t3.micro',
MinCount=1,
MaxCount=1,
SecurityGroupIds=['sg-1234567890abcdef0'],
SubnetId='subnet-1234567890abcdef0',
TagSpecifications=[
{
'ResourceType': 'instance',
'Tags': [
{
'Key': 'Name',
'Value': f'Standby-{datetime.now().strftime("%Y%m%d-%H%M%S")}'
}
]
}
]
)
return {
'statusCode': 200,
'body': 'Recovery process completed successfully'
}
except Exception as e:
print(f'Error during recovery process: {str(e)}')
return {
'statusCode': 500,
'body': f'Error during recovery: {str(e)}'
}
import boto3
from datetime import datetime
def send_recovery_notification(event_details):
sns = boto3.client('sns')
message = f"""
EC2 자동 복구 프로세스 알림
시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
장애 인스턴스: {event_details['failed_instance_id']}
복구 인스턴스: {event_details['recovery_instance_id']}
복구 소요 시간: {event_details['recovery_duration']}초
복구 단계:
1. EIP 마이그레이션: {'성공' if event_details['eip_migration_success'] else '실패'}
2. 신규 인스턴스 생성: {'성공' if event_details['new_instance_creation_success'] else '실패'}
추가 조치 필요 항목:
- 애플리케이션 로그 확인
- 데이터 정합성 검증
- 보안 그룹 설정 확인
"""
sns.publish(
TopicArn='arn:aws:sns:ap-northeast-2:123456789012:EC2RecoveryNotification',
Subject=f"[중요] EC2 자동 복구 완료 - {event_details['status']}",
Message=message
)
import boto3
def setup_asg_with_lifecycle_hooks():
asg = boto3.client('autoscaling')
# ASG 생성
asg.create_auto_scaling_group(
AutoScalingGroupName='HighAvailabilityASG',
LaunchTemplate={
'LaunchTemplateId': 'lt-1234567890abcdef0',
'Version': '$Latest'
},
MinSize=1,
MaxSize=1,
DesiredCapacity=1,
VPCZoneIdentifier='subnet-1234567890abcdef0,subnet-0987654321fedcba0',
HealthCheckType='ELB',
HealthCheckGracePeriod=300,
Tags=[
{
'Key': 'Environment',
'Value': 'Production',
'PropagateAtLaunch': True
}
]
)
# 라이프사이클 훅 생성
asg.put_lifecycle_hook(
AutoScalingGroupName='HighAvailabilityASG',
LifecycleHookName='TerminatingHook',
LifecycleTransition='autoscaling:EC2_INSTANCE_TERMINATING',
HeartbeatTimeout=300,
DefaultResult='CONTINUE',
NotificationTargetARN='arn:aws:sns:ap-northeast-2:123456789012:ASGLifecycleEvents',
RoleARN='arn:aws:iam::123456789012:role/ASGLifecycleHookRole'
)
def handle_eip_attachment(event, context):
ec2 = boto3.client('ec2')
asg = boto3.client('autoscaling')
# ASG 이벤트에서 인스턴스 ID 추출
instance_id = event['detail']['EC2InstanceId']
try:
# EIP 할당
allocation_id = 'eipalloc-1234567890abcdef0' # EIP 할당 ID
ec2.associate_address(
AllocationId=allocation_id,
InstanceId=instance_id
)
# ASG 라이프사이클 완료 알림
asg.complete_lifecycle_action(
AutoScalingGroupName=event['detail']['AutoScalingGroupName'],
LifecycleHookName=event['detail']['LifecycleHookName'],
InstanceId=instance_id,
LifecycleActionResult='CONTINUE'
)
except Exception as e:
print(f"Error handling EIP attachment: {str(e)}")
# 실패 시 인스턴스 종료
asg.complete_lifecycle_action(
AutoScalingGroupName=event['detail']['AutoScalingGroupName'],
LifecycleHookName=event['detail']['LifecycleHookName'],
InstanceId=instance_id,
LifecycleActionResult='ABANDON'
)
def handle_ebs_snapshot(event, context):
ec2 = boto3.client('ec2')
instance_id = event['detail']['EC2InstanceId']
try:
# 인스턴스에 연결된 EBS 볼륨 조회
volumes = ec2.describe_volumes(
Filters=[
{
'Name': 'attachment.instance-id',
'Values': [instance_id]
}
]
)
for volume in volumes['Volumes']:
# 스냅샷 생성
snapshot_response = ec2.create_snapshot(
VolumeId=volume['VolumeId'],
Description=f'Automated snapshot for instance {instance_id}',
TagSpecifications=[
{
'ResourceType': 'snapshot',
'Tags': [
{
'Key': 'AutoRecovery',
'Value': 'true'
},
{
'Key': 'OriginalInstanceId',
'Value': instance_id
},
{
'Key': 'OriginalVolumeId',
'Value': volume['VolumeId']
}
]
}
]
)
# 스냅샷 생성 완료 대기
waiter = ec2.get_waiter('snapshot_completed')
waiter.wait(
SnapshotIds=[snapshot_response['SnapshotId']],
WaiterConfig={'Delay': 15, 'MaxAttempts': 40}
)
print(f"Snapshot {snapshot_response['SnapshotId']} created successfully")
except Exception as e:
print(f"Error handling EBS snapshot: {str(e)}")
raise e
def attach_new_volume(event, context):
ec2 = boto3.client('ec2')
instance_id = event['detail']['EC2InstanceId']
az = event['detail']['Placement']['AvailabilityZone']
try:
# 최신 스냅샷 조회
snapshots = ec2.describe_snapshots(
Filters=[
{
'Name': 'tag:AutoRecovery',
'Values': ['true']
}
],
OwnerIds=['self']
)
if not snapshots['Snapshots']:
raise Exception("No recovery snapshot found")
latest_snapshot = sorted(
snapshots['Snapshots'],
key=lambda x: x['StartTime'],
reverse=True
)[0]
# 새 볼륨 생성
new_volume = ec2.create_volume(
SnapshotId=latest_snapshot['SnapshotId'],
AvailabilityZone=az,
VolumeType='gp3',
TagSpecifications=[
{
'ResourceType': 'volume',
'Tags': [
{
'Key': 'Name',
'Value': f'Recovered-{instance_id}'
}
]
}
]
)
# 볼륨 생성 완료 대기
waiter = ec2.get_waiter('volume_available')
waiter.wait(
VolumeIds=[new_volume['VolumeId']],
WaiterConfig={'Delay': 15, 'MaxAttempts': 40}
)
# 새 인스턴스에 볼륨 연결
ec2.attach_volume(
Device='/dev/xvdf', # 디바이스 이름
InstanceId=instance_id,
VolumeId=new_volume['VolumeId']
)
print(f"Volume {new_volume['VolumeId']} attached successfully")
except Exception as e:
print(f"Error attaching new volume: {str(e)}")
raise e
이러한 고가용성 아키텍처를 통해 다음과 같은 효과를 얻을 수 있습니다
1. 서비스 다운타임 최소화 (목표 RTO 5분 이내)
2. 데이터 손실 방지 (RPO 0에 근접)
3. 운영 자동화를 통한 인적 오류 감소
4. 안정적인 서비스 운영 보장
최근 구현한 시스템에서 다음과 같은 성과를 얻었습니다:
월간 가용성 99.99% 달성
이러한 고가용성 아키텍처는 지속적인 개선과 모니터링이 필요한 살아있는 시스템입니다.
새로운 AWS 서비스와 기능이 출시될 때마다 아키텍처를 검토하고 개선하는 것이 중요합니다.
특히 비용 대비 효율성을 고려하여, 각 기업의 상황에 맞는 최적의 구성을 찾는 것이 핵심입니다.
본 포스팅에서 제시한 세 가지 모델(CloudWatch+Lambda, ASG, ASG+EBS)은 각각의 장단점이 있으며, 서비스의 특성과 요구사항에 따라 적절한 모델을 선택하거나 조합하여 사용할 수 있습니다.
마지막으로, 이러한 자동화된 시스템을 운영할 때는 항상 "제어된 실패"를 고려해야 합니다.
자동 복구 시스템이 오동작할 경우를 대비한 수동 개입 절차도 반드시 마련해야 하며, 정기적인 재해 복구 훈련을 통해 시스템의 신뢰성을 검증해야 합니다.