이번 포스팅에서는 AWS Lambda
를 사용하여 DLQ(Dead Letter Queue)
에 반환되었던 메시지들을 다시 Amazon SQS
에 전달하는 작업을 진행해보도록 하겠습니다.
현재 진행중인 Evertrip
프로젝트에서는 게시글에 대한 사용자의 이벤트를 Amazon SQS
에 저장하고 서버에서 일정 주기에 따라 메시지 수신 요청을 하여 해당 이벤트 메시지들을 배치 프로세스를 거쳐 DB에 저장하게 됩니다.
서버 측에서 예상치 못한 에러가 발생했을 때 해당 메시지들을 처리하지 못하는 경우가 발생하고 Amazon SQS
에서는 해당 메시지들을 반복적으로 정상적인 처리를 못할 경우(최대 재시도 횟수를 초과할 시) DLQ
에 메시지를 전송하게 되어있습니다.
DLQ
에 전송된 메시지들을 서버가 정상 동작할 때 처리할 수있도록 다시 SQS
에 재전송할 필요가 있고 AWS Lambda
는 서버를 관리할 필요 없이 간단하게 메시지 재전송 로직을 처리할 수 있기 때문입니다.
AWS Lambda
를 설정하기에 앞서 기본적인 Amazon SQS
개념과 DLQ
설정, 그리고 AWS Lambda
사용에 대한 설정은 아래 글을 참고하시면 좋을 것 같습니다.
[AWS] Amazon SQS란?
[AWS] Amazon SQS 적용해보기 - SQS 생성편
[AWS] AWS Cloud Watch로 Amazon SQS 모니터링하기
그럼 AWS Lambda
를 사용하여 DLQ
에 저장된 메시지들을 SQS
에 재전송하도록 설정하겠습니다.
참고로 주기적인 폴링 방식을 사용하여 AWS Lamda
함수를 실행하기 위해 AWS CloudWatch Events
를 사용해볼 예정입니다.
우선 AWS Lambda
함수부터 생성하고 환경 변수 세팅 및 실행 코드를 작성해보도록 하겠습니다.
우선 AWS Lambda
함수를 생성해줍니다. 함수 생성은 과정을 생략하도록 하겠습니다. 필자는 python
환경의 DlqToSqs
라는 이름으로 함수를 생성해주었습니다.
이후 IAM 메뉴에 들어가셔서 역할에 대한 권한을 설정해줘야 합니다. 방금 생성한 함수를 클릭해줍니다.
권한 추가에 정책 연결을 클릭하시고 AmazonSQSFullAccess
와 CloudWatchEventsFullAccess
를 추가해줍니다. 해당 권한들은 Lambda
함수가 SQS
와 CloudWatchEvents
에 대해 접근 권한을 허가해줍니다.
AWS Lambda
함수 내에서 사용하는 변수를 환경 변수로 세팅해주는 과정입니다.
AWS Lambda
함수가 SQS
와 DLQ
에 접근해야하기 때문에 SQS
와 DLQ
의 주소가 필요합니다. 해당 주소는 대기열 세부정보에서 확인하실 수 있습니다.
위의 사진처럼 SQS
와 DLQ
의 URL 주소를 복사하여 AWS Lambda
환경 변수 편집에서 세팅해주시면 됩니다.
MAX_AGE_SECONDS
변수는 메시지가 전송된 시간(클라이언트에서 SQS로)을 기준으로 특정 시간을 초과했을 시 DLQ
에서 SQS
로 재전송하기 위해 필요한 변수입니다. 실제 프로젝트에서는 3600을 입력해서 SQS
가 처음 메시지를 받은 시점으로부터 1시간 이상 차이가 나는 메시지들을 SQS
로 다시 재전송하도록 설정할 예정입니다.
import json
import boto3
import os
import time
# 환경 변수에서 Queue URL 및 설정 가져오기
DLQ_URL = os.environ['DLQ_URL']
ORIGINAL_QUEUE_URL = os.environ['SQS_URL']
MAX_AGE_SECONDS = int(os.environ['MAX_AGE_SECONDS']) # 메시지가 DLQ에 머무를 수 있는 최대 시간(초)
sqs = boto3.client('sqs')
def lambda_handler(event, context):
try:
# DLQ에서 메시지 가져오기
response = sqs.receive_message(
QueueUrl=DLQ_URL,
MaxNumberOfMessages=10, # 한 번에 최대 10개의 메시지
WaitTimeSeconds=10, # Long polling 대기 시간
AttributeNames=['All'], # 모든 메시지 속성 읽기
MessageAttributeNames=['All']
)
if 'Messages' not in response:
print("No messages to process.")
return
for message in response['Messages']:
message_body = message['Body']
attributes = message.get('MessageAttributes', {})
# InitialTimestamp 추출
initial_timestamp_attr = attributes.get('InitialTimestamp')
initial_timestamp = int(initial_timestamp_attr['StringValue'])
current_time = int(time.time())
print(f"current_time is : {current_time}")
message_age = current_time - initial_timestamp
if message_age > MAX_AGE_SECONDS:
# 원래 큐로 메시지 다시 보내기
sqs.send_message(
QueueUrl=ORIGINAL_QUEUE_URL,
MessageBody=message_body,
MessageAttributes=attributes
)
print(f"Message sent back to original queue: {message_body}")
# DLQ에서 메시지 삭제
sqs.delete_message(
QueueUrl=DLQ_URL,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Message deleted from DLQ: {message_body}")
else:
print(f"Message not old enough to reprocess: {message_body} (Age: {message_age} seconds)")
except Exception as e:
print(f"Error processing messages: {e}")
해당 함수는 DLQ
의 메시지를 모두 읽어와서 해당 메시지의 속성에 InitialTimestamp
값을 추출해서 현재 시간과 비교해서 MAX_AGE_SECONDS
변수 값보다 클 시 SQS
에 메시지를 돌려보내고 해당 메시지를 DLQ
에서 삭제하라는 명령을 내립니다.
이제 Lambda
함수 생성 및 설정을 해줬으니 CloudWatch Events
를 사용한 주기적으로 Lambda
함수를 실행할 수 있도록 이벤트 규칙을 생성해보도록 하겠습니다.
CloudWatch
서비스로 들어가셔서 좌측 메뉴바에 이벤트 > 규칙을 클릭해줍니다.
규칙 생성을 클릭해줍니다.
다음 규칙 세부 정보에서 규칙 이름을 설정해주시고 규칙 유형은 일정
으로 선택하고 규칙 생성으로 이동
버튼을 클릭해줍니다.
다음 일정 정의에서 원하시는 일정 패턴을 선택해주시면 됩니다. 필자는 이후 테스트를 진행하기 위해 짧게 설정하도록 하겠습니다.
다음 대상 선택에서 Lambda 함수
를 선택하시고 이전에 만들었던 함수를 선택해주시면 됩니다. 추가 설정에는 이벤트 오류 시 재시도 정책에 대해서 정의할 수 있는데 해당사항이 없으므로 생략하였습니다. 다음 버튼을 클릭해주시고 넘어가시면 됩니다.
태그 구성도 생략하겠습니다. 이후 검토 및 생성에서 규칙 생성
버튼을 클릭하시면 규칙이 생성되는 것을 확인하실 수 있습니다.
설정은 다됐으니 테스트를 한번 진행해보도록 하겠습니다. 이벤트 규칙 주기는 3분으로 설정했으며, Lambda
함수에서 환경 변수로 설정해준MAX_AGE_SECONDS
의 값은 60초로 입력하였습니다.
우선 테스트를 위해 SQS
에 메시지를 전송해보도록 하겠습니다.
현재 시간에 대한 TimeStamp
값을 가져오기 위해 간단하게 개발자 도구의 Console
창을 이용했습니다.
SQS
에 메시지 본문과 메시지 속성에 값을 지정해준 후 메시지를 전송해줍니다.
그리고 메시지 폴링을 하여 정보를 확인합니다.
해당 메시지가 맞네요. 이제 메시지 폴링을 최대 재시도 횟수를 초과하도록 반복하여 DLQ
에 메시지를 전송하고 DLQ
에 메시지가 전달됐는지 확인해보도록 하겠습니다.
SQS
대기열이 비었다는 걸 확인할 수 있습니다. 아마 DLQ
로 정상적으로 메시지가 전달된 것 같네요.
제대로 들어와있는 걸 확인할 수 있습니다.
이제 Lambda
함수가 실행되고 DLQ
에 있는 메시지의 TimeStamp
속성을 추출하여 함수 실행 시점의 TimeStamp
와의 차이를 구하고 설정해둔 MAX_AGE_SECONDS
변수를 초과할 시 해당 메시지를 다시 SQS
로 전송하게 됩니다.
시간을 짧게 설정했기 때문에 바로 확인해보도록 하겠습니다.
바로 DLQ
대기열을 확인해보도록 하겠습니다.
예상대로 비었군요. 이제 SQS
대기열을 확인해보겠습니다.
메시지 폴링을 하고 새로운 메시지가 들어온 것을 확인할 수 있습니다.
메시지 본문을 보니 DLQ
에서 전송받은 메시지가 맞네요.
이제 CloudWatch
에서 기록된 로그를 확인해보도록 하겠습니다.
CloudWatch
서비스에서 좌측 메뉴에 로그 그룹을 클릭하시고 Lambda
함수 이름에 맞는 로그 그룹을 클릭해서 들어갑니다.
해당 로그 스트림에 클릭해서 들어가줍니다.
Lambda
함수에서 작성해둔 print
코드가 제대로 실행되었음을 확인할 수 있습니다. DLQ
에 있는 메시지를 추출하여 SQS
에 메시지를 새로 전송해주고 DLQ
에는 해당 메시지를 삭제해주는 명령이 정상 호출됨을 확인할 수 있습니다.
이렇게 테스트까지 무사히 마쳤습니다 ㅎㅎ