[AWS] AWS Lambda를 사용하여 DLQ에서 SQS로 메시지 재전송하기

김강욱·2024년 5월 18일
0

Project-Evertrip

목록 보기
14/19
post-thumbnail

이번 포스팅에서는 AWS Lambda를 사용하여 DLQ(Dead Letter Queue)에 반환되었던 메시지들을 다시 Amazon SQS에 전달하는 작업을 진행해보도록 하겠습니다.

👌 Why AWS Lambda 사용?

현재 진행중인 Evertrip 프로젝트에서는 게시글에 대한 사용자의 이벤트를 Amazon SQS에 저장하고 서버에서 일정 주기에 따라 메시지 수신 요청을 하여 해당 이벤트 메시지들을 배치 프로세스를 거쳐 DB에 저장하게 됩니다.

서버 측에서 예상치 못한 에러가 발생했을 때 해당 메시지들을 처리하지 못하는 경우가 발생하고 Amazon SQS에서는 해당 메시지들을 반복적으로 정상적인 처리를 못할 경우(최대 재시도 횟수를 초과할 시) DLQ에 메시지를 전송하게 되어있습니다.

DLQ에 전송된 메시지들을 서버가 정상 동작할 때 처리할 수있도록 다시 SQS에 재전송할 필요가 있고 AWS Lambda는 서버를 관리할 필요 없이 간단하게 메시지 재전송 로직을 처리할 수 있기 때문입니다.



👌AWS Lambda 설정

AWS Lambda를 설정하기에 앞서 기본적인 Amazon SQS 개념과 DLQ 설정, 그리고 AWS Lambda 사용에 대한 설정은 아래 글을 참고하시면 좋을 것 같습니다.

[AWS] Amazon SQS란?
[AWS] Amazon SQS 적용해보기 - SQS 생성편
[AWS] AWS Cloud Watch로 Amazon SQS 모니터링하기

그럼 AWS Lambda를 사용하여 DLQ에 저장된 메시지들을 SQS에 재전송하도록 설정하겠습니다.

참고로 주기적인 폴링 방식을 사용하여 AWS Lamda 함수를 실행하기 위해 AWS CloudWatch Events를 사용해볼 예정입니다.

우선 AWS Lambda 함수부터 생성하고 환경 변수 세팅 및 실행 코드를 작성해보도록 하겠습니다.

1. AWS Lambda 생성 및 권한 설정

우선 AWS Lambda 함수를 생성해줍니다. 함수 생성은 과정을 생략하도록 하겠습니다. 필자는 python 환경의 DlqToSqs라는 이름으로 함수를 생성해주었습니다.

이후 IAM 메뉴에 들어가셔서 역할에 대한 권한을 설정해줘야 합니다. 방금 생성한 함수를 클릭해줍니다.

권한 추가에 정책 연결을 클릭하시고 AmazonSQSFullAccessCloudWatchEventsFullAccess를 추가해줍니다. 해당 권한들은 Lambda 함수가 SQSCloudWatchEvents에 대해 접근 권한을 허가해줍니다.

2. 환경 변수 세팅

AWS Lambda 함수 내에서 사용하는 변수를 환경 변수로 세팅해주는 과정입니다.

AWS Lambda 함수가 SQSDLQ에 접근해야하기 때문에 SQSDLQ의 주소가 필요합니다. 해당 주소는 대기열 세부정보에서 확인하실 수 있습니다.

위의 사진처럼 SQSDLQ의 URL 주소를 복사하여 AWS Lambda 환경 변수 편집에서 세팅해주시면 됩니다.

MAX_AGE_SECONDS 변수는 메시지가 전송된 시간(클라이언트에서 SQS로)을 기준으로 특정 시간을 초과했을 시 DLQ에서 SQS로 재전송하기 위해 필요한 변수입니다. 실제 프로젝트에서는 3600을 입력해서 SQS가 처음 메시지를 받은 시점으로부터 1시간 이상 차이가 나는 메시지들을 SQS로 다시 재전송하도록 설정할 예정입니다.

3. 함수 코드 작성

import json
import boto3
import os
import time

# 환경 변수에서 Queue URL 및 설정 가져오기
DLQ_URL = os.environ['DLQ_URL']
ORIGINAL_QUEUE_URL = os.environ['SQS_URL']
MAX_AGE_SECONDS = int(os.environ['MAX_AGE_SECONDS'])  # 메시지가 DLQ에 머무를 수 있는 최대 시간(초)

sqs = boto3.client('sqs')

def lambda_handler(event, context):
    try:
        # DLQ에서 메시지 가져오기
        response = sqs.receive_message(
            QueueUrl=DLQ_URL,
            MaxNumberOfMessages=10,  # 한 번에 최대 10개의 메시지
            WaitTimeSeconds=10,  # Long polling 대기 시간
            AttributeNames=['All'],  # 모든 메시지 속성 읽기
            MessageAttributeNames=['All']
        )
        
        if 'Messages' not in response:
            print("No messages to process.")
            return
        
        for message in response['Messages']:
            message_body = message['Body']
            attributes = message.get('MessageAttributes', {})
            
            # InitialTimestamp 추출
            initial_timestamp_attr = attributes.get('InitialTimestamp')
            initial_timestamp = int(initial_timestamp_attr['StringValue'])

            current_time = int(time.time())
            print(f"current_time is : {current_time}")
            message_age = current_time - initial_timestamp
            
            if message_age > MAX_AGE_SECONDS:
                # 원래 큐로 메시지 다시 보내기
                sqs.send_message(
                    QueueUrl=ORIGINAL_QUEUE_URL,
                    MessageBody=message_body,
                    MessageAttributes=attributes
                )
                print(f"Message sent back to original queue: {message_body}")
                
                # DLQ에서 메시지 삭제
                sqs.delete_message(
                    QueueUrl=DLQ_URL,
                    ReceiptHandle=message['ReceiptHandle']
                )
                print(f"Message deleted from DLQ: {message_body}")
            else:
                print(f"Message not old enough to reprocess: {message_body} (Age: {message_age} seconds)")
    
    except Exception as e:
        print(f"Error processing messages: {e}")


해당 함수는 DLQ의 메시지를 모두 읽어와서 해당 메시지의 속성에 InitialTimestamp 값을 추출해서 현재 시간과 비교해서 MAX_AGE_SECONDS 변수 값보다 클 시 SQS에 메시지를 돌려보내고 해당 메시지를 DLQ에서 삭제하라는 명령을 내립니다.


이제 Lambda 함수 생성 및 설정을 해줬으니 CloudWatch Events를 사용한 주기적으로 Lambda 함수를 실행할 수 있도록 이벤트 규칙을 생성해보도록 하겠습니다.



👌 Cloud WatchEvents 설정

CloudWatch 서비스로 들어가셔서 좌측 메뉴바에 이벤트 > 규칙을 클릭해줍니다.

규칙 생성을 클릭해줍니다.

다음 규칙 세부 정보에서 규칙 이름을 설정해주시고 규칙 유형은 일정으로 선택하고 규칙 생성으로 이동 버튼을 클릭해줍니다.

다음 일정 정의에서 원하시는 일정 패턴을 선택해주시면 됩니다. 필자는 이후 테스트를 진행하기 위해 짧게 설정하도록 하겠습니다.

다음 대상 선택에서 Lambda 함수를 선택하시고 이전에 만들었던 함수를 선택해주시면 됩니다. 추가 설정에는 이벤트 오류 시 재시도 정책에 대해서 정의할 수 있는데 해당사항이 없으므로 생략하였습니다. 다음 버튼을 클릭해주시고 넘어가시면 됩니다.

태그 구성도 생략하겠습니다. 이후 검토 및 생성에서 규칙 생성 버튼을 클릭하시면 규칙이 생성되는 것을 확인하실 수 있습니다.



👌 테스트 해보기!

설정은 다됐으니 테스트를 한번 진행해보도록 하겠습니다. 이벤트 규칙 주기는 3분으로 설정했으며, Lambda 함수에서 환경 변수로 설정해준MAX_AGE_SECONDS의 값은 60초로 입력하였습니다.

우선 테스트를 위해 SQS에 메시지를 전송해보도록 하겠습니다.

현재 시간에 대한 TimeStamp 값을 가져오기 위해 간단하게 개발자 도구의 Console 창을 이용했습니다.

SQS에 메시지 본문과 메시지 속성에 값을 지정해준 후 메시지를 전송해줍니다.

그리고 메시지 폴링을 하여 정보를 확인합니다.

해당 메시지가 맞네요. 이제 메시지 폴링을 최대 재시도 횟수를 초과하도록 반복하여 DLQ에 메시지를 전송하고 DLQ에 메시지가 전달됐는지 확인해보도록 하겠습니다.

SQS 대기열이 비었는지 확인

SQS 대기열이 비었다는 걸 확인할 수 있습니다. 아마 DLQ로 정상적으로 메시지가 전달된 것 같네요.

DLQ 대기열 확인

제대로 들어와있는 걸 확인할 수 있습니다.

이제 Lambda 함수가 실행되고 DLQ에 있는 메시지의 TimeStamp 속성을 추출하여 함수 실행 시점의 TimeStamp와의 차이를 구하고 설정해둔 MAX_AGE_SECONDS 변수를 초과할 시 해당 메시지를 다시 SQS로 전송하게 됩니다.

시간을 짧게 설정했기 때문에 바로 확인해보도록 하겠습니다.

바로 DLQ 대기열을 확인해보도록 하겠습니다.

예상대로 비었군요. 이제 SQS 대기열을 확인해보겠습니다.

SQS 대기열 확인

메시지 폴링을 하고 새로운 메시지가 들어온 것을 확인할 수 있습니다.

메시지 본문을 보니 DLQ에서 전송받은 메시지가 맞네요.

CloudWatchEvents Log 확인

이제 CloudWatch에서 기록된 로그를 확인해보도록 하겠습니다.

CloudWatch 서비스에서 좌측 메뉴에 로그 그룹을 클릭하시고 Lambda 함수 이름에 맞는 로그 그룹을 클릭해서 들어갑니다.

해당 로그 스트림에 클릭해서 들어가줍니다.

Lambda 함수에서 작성해둔 print 코드가 제대로 실행되었음을 확인할 수 있습니다. DLQ에 있는 메시지를 추출하여 SQS에 메시지를 새로 전송해주고 DLQ에는 해당 메시지를 삭제해주는 명령이 정상 호출됨을 확인할 수 있습니다.

이렇게 테스트까지 무사히 마쳤습니다 ㅎㅎ

profile
TO BE DEVELOPER

0개의 댓글

관련 채용 정보