이전 포스팅에서 lambda와 SQS를 연결하는 작업을 했다. 이번에는 Lambda - SQS - dynamoDB
관계를 만들어 lambda에서 사용자 정보를 SQS로 보내면, DB에서 그 데이터를 polling 해가는 작업을 해 볼것이다.
API 엔드포인트로 접근하면 SQS로 메세지를 보내는 lambda 함수와 API Gateway를 추가한다.
import json
import os
import boto3
import random
import socket
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError
def lambda_handler(event, context):
dynamo = boto3.resource('dynamodb').Table('ikaria_db')
KST = timezone(timedelta(hours=+9), 'KST')
ip = socket.gethostbyname(socket.gethostname())
lst = []
for i in range(1,101):
values = {
"id": len(dynamo.scan(TableName='ikaria_db')["Items"]) + i,
"date_of_visit" : datetime.now(KST).strftime("%Y-%m-%d %H:%M:%S.%f"),
"ip_address" : ip,
"user_number" : random.randrange(1000000)}
lst.append(values)
msg_body = json.dumps(lst)
msg = send_sqs_message(os.environ['ikaria_sqs'], msg_body)
return msg
def send_sqs_message(sqs_queue_url, msg_body):
sqs_client = boto3.client('sqs')
msg = sqs_client.send_message(QueueUrl=sqs_queue_url, MessageBody=msg_body, MessageGroupId="ikaria")
return msg
SQS 메시지 대기열을 만든다. 표준
대기열로 만들었다.
SQS메세지를 dynamoDB로 polling해가는 lambda 함수를 작성한다. 2번에서 만든 SQS를 트리거 추가
로 연결하는걸 잊지 말자.
import json
import boto3
def lambda_handler(event, context):
dynamo = boto3.resource('dynamodb').Table('ikaria_db')
records = event['Records']
for record in records:
payload = json.loads(record["body"])
for i in payload:
dynamo.put_item(Item={
"id": i["id"],
"date_of_visit" : i["date_of_visit"],
"ip_address" : i["ip_address"],
"user_number" : i["user_number"]
})
트리거 추가로 API를 연결한 lambda함수를 실행해보면 SQS로 보낸 데이터를 DB가 받은 것을 확인할 수 있다.
[참고 사이트]