이전에 Lambda - SQS 메시지 보내기에 대해서 알아보았다. 이번 포스팅에서는 Lambda - SQS - Lambda - DB로 연결하는 방법과 한 번에 많은 데이터(Bulk data)를 DB에 삽입하는 방법에 대해서 알아보려고 한다.
이전에는 Lambda > SQS 방향으로 연결했다면, 이번에는 SQS > Lambda(Lambda Consumer) 방향으로 연결하는 하는 것에 대해서 알아보자.
그리고 Bulk Data를 Lambda Consumer에서 처리하는 방법에 대해서도 알아보자.
위와 같이 SQS 트리거로 Lambda가 연결되었을 때 코드의 lambda_handler(event, context) 함수에서 event로 SQS의 Record가 들어온다.
Record의 양식은 아래 url에서 확인가능하다.
SQS 큐의 Record를 받는 Lambda를 Lambda Consumer라고 한다. 이는 위에서 SQS의 Trigger로 연결된 Lambda이다. 해당 Lambda에서는 Q에서 전달된 Bulk Data를 DB에 보내는 방법에 대해서 알아보고자 한다.
cursor.executemany는 쿼리문 중 value에 들어가는 값들을 시퀀스 데이터 형태로 받아서 한 번에 DB로 전달하는 메서드이다. 해당 메서드를 사용하여 SQS에서 받은 Bulk Data를 DB에 전달하고자 한다.
Syntax
cursor.executemany(쿼리문, 시퀀스형 파라미터)
예제
data = [
('Jane', date(2005, 2, 12)),
('Joe', date(2006, 5, 23)),
('John', date(2010, 10, 3)),
]
stmt = "INSERT INTO employees (first_name, hire_date) VALUES (%s, %s)"
cursor.executemany(stmt, data)
-->
예제의 결과
INSERT INTO employees (first_name, hire_date)
VALUES ('Jane', '2005-02-12'), ('Joe', '2006-05-23'), ('John', '2010-10-03')
import json
import pymysql
import dbinfo
import pymysql
def lambda_handler(event, context):
# 1
connection = pymysql.connect(
host = dbinfo.db_host,
user = dbinfo.db_username,
passwd = dbinfo.db_password,
db = dbinfo.db_name,
port = dbinfo.db_port
)
cursor = connection.cursor()
# 2
data = []
i = 0
# 3
for record in event['Records']:
i += 1
payload = json.loads(record['body'])
values = [
str(payload["ip"]),
str(payload["now"]),
int(payload["random_number"])
]
data.append(values)
if i % 1000 == 0:
query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%d)"
cursor.executemany(query, data)
connection.commit()
data = []
# 4
query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%s)"
cursor.executemany(query, data)
connection.commit()
connection.close()
# 1
connection = pymysql.connect(
host = dbinfo.db_host,
user = dbinfo.db_username,
passwd = dbinfo.db_password,
db = dbinfo.db_name,
port = dbinfo.db_port
)
cursor = connection.cursor()
- pymysql을 사용해서 DB와 연동하는 코드이다.
# 2
data = []
i = 0
- data : 이후 bulk로 value 값을 만들 떄 필요한 리스트를 미리 선언한 것이다.
- i : for문을 돌려서 i가 N일 때 Bulk Data를 DB에 넣을 것이다.
그 때 필요한 i 값을 미리 선언한 것이다.
# 3
for record in event['Records']:
i += 1
payload = json.loads(record['body'])
values = [
str(payload["ip"]),
str(payload["now"]),
int(payload["random_number"])
]
data.append(values)
if i % 1000 == 0:
query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%d)"
cursor.executemany(query, data)
connection.commit()
data = []
- event['Records']: SQS에서 오는 값에서 필요한 값을 뽑아 내기 위한 인덱싱 작업이다.
만약 Q에 10개의 데이터가 있었을 때 event['Records'] 밑에 10개의 record가 있다.
즉, 해당 for문은 10개 record에서 하나씩 뽑아서 데이터를 뽑는 코드이다.
- payload: json 값으로 되어 있는 record['body']를 dict로 변환
- values: 거기서 post로 날린 body 값을 추출하여 values 안에 넣음
- data.append(values): for문 바깥에서 선언한 data에 values를 append.
즉, 데이터들을 하나의 리스트에 모으는 작업이다.
- if i % 1000 = 0: i가 1000의 배수일 때마다 아래 로직이 실행
- cursor.executemany(query, data): for문을 돌면서 만들어진
시퀀스 데이터가 query에 넣어져 쿼리문이 실행됨.
- connection.commit(), data = []: 쿼리 실행을 저장하고, data를 초기화 해줌
# 4
query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%s)"
cursor.executemany(query, data)
connection.commit()
connection.close()
- 만약 i가 1000의 배수가 아니라서 for문이 끝났을 때
나머지 data를 DB에 넣어주는 코드이다.