Lambda로 SQS에 message send

파워소동·2022년 12월 4일
0
post-thumbnail

몇만건의 메시지를 빠르고 오류없이 발송하는 서비스를 개발하고 있습니다.

API Gateway를 통해 발송할 list data를 받아 각 데이터를 SQS에 send하고 목적에 따라 분리된 lambda를 통해 분산처리됩니다. 결과적으로 문자를 발송할 agent table에 데이터를 저장합니다.

데이터를 반복문을 통해 1건씩 SQS에 넣는 코드는 아래와 같습니다.

import json
import logging
import boto3
import os
import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs_client = boto3.client(service_name='sqs', region_name=os.environ['SQS_REGION'])

def lambda_handler(event, context):
    body = json.loads(event['body'])
    for i in range(len(body)):
        message_body = body[i]
        insert_sqs(message_body)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
          'resultCode' : 000,
          'resultMessage' : "successfully inserted"
        })
    }

def insert_sqs(insert_sqs_datas):
    try:
        sqs_client.send_message(QueueUrl=os.environ['SQS_URL'], MessageBody=json.dumps(insert_sqs_datas))
    except Exception as e:
        logging.error(e)

os.environ[ ] 는 환경변수 값입니다!

반복문을 이용해 1건 씩 SQS에 넣다 보니 너무 오래 걸리더라구요.

그래서 속도 개선을 위해 두가지 방법을 생각해보았습니다.

  1. 비동기적으로 sqs에 메시지를 넣으면 더 빨리지지 않을까?

하지만 boto3 모듈이 비동기를 지원하지 않는다고 합니다..!!
Amazon boto 비동기를 지원하는 모듈인 aiobotocore가 있다고는 하는데

aiobotocore는 람다에서 기본으로 지원하는 모듈이 아니기 때문에 가상환경에서 모듈을 생성하고 계층을 추가해 줘야 하는 번거로움이 있었습니다.

  1. 한 번에 메시지를 bulk send 하면 되지 않을까?

최대 10건의 메시지를 한 번에 send할 수 있는 send_messages() 메서드를 찾을 수 있었습니다!

send_messages()의 자세한 설명은 링크 참고!

수정한 코드는 아래와 같습니다.

import json
import logging
import boto3
import os
import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs_resource = boto3.resource('sqs')
queue = sqs_resource.get_queue_by_name(QueueName=os.environ['SQS_NAME'])

def lambda_handler(event, context):
    body = json.loads(event['body'])
    entries = []
    for i in range(len(body)):
        message_body = body[i]
        entry = {
                    'Id': str(i), 
                    'MessageBody': json.dumps(message_body)
                }
        entries.append(entry)
        if (i+1)%10==0:
            insert_sqs(entries)
            entries=[]
    insert_sqs(entries)
        
    
    return {
        'statusCode': 200,
        'body': json.dumps({
          'resultCode' : 000,
          'resultMessage' : "successfully inserted"
        })
    }

def insert_sqs(insert_sqs_datas):
    if not insert_sqs_datas:
        return
    try:
        queue.send_messages(Entries=insert_sqs_datas)
    except Exception as e:
        logging.error(e)

반복문을 통해 10건마다 SQS에 send 하도록 수정했습니다.
이것만 해도 속도가 훨씬 빨라졌어요!

추후 비동기적으로 send 해보는 방법을 시도해 보고 또 글 쓰도록 하겠습니다!!

0개의 댓글