[Lambda] SQS를 사용한 재시도 및 DLQ 구현 학습하기

궁금하면 500원·2024년 5월 31일
0

데브옵스

목록 보기
4/36

1. 요구 사항 정의

SQS와 Lambda를 연동하여 다음과 같은 시나리오를 처리하는 아키텍처를 설계합니다.

  1. 메시지를 처리하는 Lambda 함수가 실행되며, 성공적으로 처리되지 않은 메시지는 재시도를 수행합니다.
  2. 특정 횟수 이상 재시도에 실패한 메시지는 DLQ로 이동합니다.
  3. DLQ로 이동된 메시지는 모니터링하며, 복구 또는 알림 처리 로직을 구현합니다.
  4. 타임아웃과 예외 처리를 통한 안정적인 메시지 처리 환경을 구축합니다.

2. 아키텍처 다이어그램

아래는 설계할 아키텍처입니다.

  1. SQS 표준 큐에 메시지가 들어옵니다.
  2. Lambda 함수가 SQS를 폴링하여 메시지를 처리합니다.
  3. 실패 시 메시지는 자동으로 다시 큐로 반환됩니다.
  4. 재시도 횟수가 초과된 메시지는 DLQ로 이동합니다.
  5. DLQ에 저장된 메시지는 모니터링 및 복구 처리됩니다.

3. AWS 구성

3.1 SQS 설정

  • 표준 큐와 DLQ 생성
  • DLQ를 표준 큐와 연결
# 표준 큐 생성
aws sqs create-queue --queue-name standard-queue

# DLQ 생성
aws sqs create-queue --queue-name dlq-queue

# 표준 큐에 DLQ 연결
aws sqs set-queue-attributes \
    --queue-url <standard-queue-url> \
    --attributes '{"RedrivePolicy":"{\"maxReceiveCount\": \"5\", \"deadLetterTargetArn\": \"<dlq-queue-arn>\"}"}'

3.2 Lambda 함수 생성

  • Lambda 함수는 메시지를 처리하고 성공 여부를 반환합니다.
  • 실패 시 예외를 발생시켜 메시지가 다시 큐로 반환되도록 설정합니다.
import json
import boto3
from botocore.exceptions import ClientError

def lambda_handler(event, context):
    try:
        # SQS 메시지 처리 로직
        for record in event['Records']:
            body = json.loads(record['body'])
            process_message(body)
        return {"statusCode": 200, "body": "Success"}
    except Exception as e:
        # 예외 발생 시 메시지 처리 실패로 간주
        print(f"Error processing message: {e}")
        raise e

def process_message(body):
    # 메시지 처리 로직
    print(f"Processing message: {body}")
    if body.get("error"):
        raise ValueError("Simulated processing error")

3.3 IAM 역할 및 권한 설정

Lambda 함수가 SQS 큐에 접근할 수 있도록 필요한 권한을 부여합니다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "sqs:*",
      "Resource": "<standard-queue-arn>"
    }
  ]
}

4. DLQ 모니터링 및 복구 처리

DLQ로 이동된 메시지를 모니터링하고 알림 또는 복구 로직을 구현합니다.

4.1 CloudWatch 알림 설정

DLQ에 메시지가 쌓이면 알림을 받을 수 있도록 CloudWatch 경보를 설정합니다.

aws cloudwatch put-metric-alarm \
    --alarm-name DLQMessageAlarm \
    --metric-name ApproximateNumberOfMessagesVisible \
    --namespace AWS/SQS \
    --dimensions Name=QueueName,Value=dlq-queue \
    --threshold 1 \
    --comparison-operator GreaterThanOrEqualToThreshold \
    --evaluation-periods 1 \
    --alarm-actions <sns-topic-arn>

4.2 DLQ 복구 처리 Lambda

DLQ 메시지를 복구하는 별도의 Lambda 함수를 생성합니다.

import boto3
import json

def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    dlq_url = "<dlq-queue-url>"
    
    try:
        # DLQ에서 메시지 수신
        response = sqs.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=10
        )
        
        if 'Messages' in response:
            for message in response['Messages']:
                body = json.loads(message['Body'])
                process_dlq_message(body)

                # 메시지 삭제
                sqs.delete_message(
                    QueueUrl=dlq_url,
                    ReceiptHandle=message['ReceiptHandle']
                )
        return {"statusCode": 200, "body": "DLQ messages processed successfully"}
    except Exception as e:
        print(f"Error processing DLQ messages: {e}")
        raise e

def process_dlq_message(body):
    print(f"Recovering DLQ message: {body}")
    # 복구 처리 로직

5. 예상 시나리오

  • 정상 처리: 메시지가 성공적으로 처리되고 SQS에서 삭제됩니다.
  • 처리 실패: 재시도가 이루어지며, 지정된 횟수 초과 시 DLQ로 이동됩니다.
  • DLQ 메시지: 복구 Lambda가 실행되어 복구를 시도합니다.

6. 확장 계획

  • SQS FIFO 큐로 변경해 메시지 순서를 유지하는 방식으로 확장
  • EventBridge와 연계하여 복잡한 이벤트 처리 추가
  • 모니터링 대시보드 구현

참조자료

profile
꾸준히, 의미있는 사이드 프로젝트 경험과 문제해결 과정을 기록하기 위한 공간입니다.

0개의 댓글