
SQS와 Lambda를 연동하여 다음과 같은 시나리오를 처리하는 아키텍처를 설계합니다.
아래는 설계할 아키텍처입니다.

# 표준 큐 생성
aws sqs create-queue --queue-name standard-queue
# DLQ 생성
aws sqs create-queue --queue-name dlq-queue
# 표준 큐에 DLQ 연결
aws sqs set-queue-attributes \
--queue-url <standard-queue-url> \
--attributes '{"RedrivePolicy":"{\"maxReceiveCount\": \"5\", \"deadLetterTargetArn\": \"<dlq-queue-arn>\"}"}'
import json
import boto3
from botocore.exceptions import ClientError
def lambda_handler(event, context):
try:
# SQS 메시지 처리 로직
for record in event['Records']:
body = json.loads(record['body'])
process_message(body)
return {"statusCode": 200, "body": "Success"}
except Exception as e:
# 예외 발생 시 메시지 처리 실패로 간주
print(f"Error processing message: {e}")
raise e
def process_message(body):
# 메시지 처리 로직
print(f"Processing message: {body}")
if body.get("error"):
raise ValueError("Simulated processing error")
Lambda 함수가 SQS 큐에 접근할 수 있도록 필요한 권한을 부여합니다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sqs:*",
"Resource": "<standard-queue-arn>"
}
]
}
DLQ로 이동된 메시지를 모니터링하고 알림 또는 복구 로직을 구현합니다.
DLQ에 메시지가 쌓이면 알림을 받을 수 있도록 CloudWatch 경보를 설정합니다.
aws cloudwatch put-metric-alarm \
--alarm-name DLQMessageAlarm \
--metric-name ApproximateNumberOfMessagesVisible \
--namespace AWS/SQS \
--dimensions Name=QueueName,Value=dlq-queue \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--evaluation-periods 1 \
--alarm-actions <sns-topic-arn>
DLQ 메시지를 복구하는 별도의 Lambda 함수를 생성합니다.
import boto3
import json
def lambda_handler(event, context):
sqs = boto3.client('sqs')
dlq_url = "<dlq-queue-url>"
try:
# DLQ에서 메시지 수신
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10
)
if 'Messages' in response:
for message in response['Messages']:
body = json.loads(message['Body'])
process_dlq_message(body)
# 메시지 삭제
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=message['ReceiptHandle']
)
return {"statusCode": 200, "body": "DLQ messages processed successfully"}
except Exception as e:
print(f"Error processing DLQ messages: {e}")
raise e
def process_dlq_message(body):
print(f"Recovering DLQ message: {body}")
# 복구 처리 로직