마이크로 서비스와 분산 시스템, 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 완전관리형 메시지 대기열 서비스
쉽게 비동기처리라고 생각하면 된다!
대용량 데이터를 주고
리턴으로 API Gateway에 첫번째 람다가 잘 받았다는 연락을 먼저 해주고
두번째 람다가 받은 대용량 데이터를 worker들이 동등하게 일처리 할 수 있게 로직을 짜는 것!
람다 함수 한 개로 한 번에 데이터를 우르르 보내는 게 아니라, SQS를 통해 한 줄서기로 보내주면 worker들이 동등하게 일처리를 하고 제한 시간내에 안정적으로 메시지 처리를 할 수 있다!
팀원과,, 함께 그림으로 그려가면서 로직을 파악했다!
import json
import boto3
def lambda_handler(event, context):
client = boto3.client("sqs")
ip = event['requestContext']['identity']['sourceIp']
request_time = event['requestContext']['requestTime']
response = client.send_message(
QueueUrl = "https://sqs.ap-northeast-2.amazonaws.com/703474273090/load-test",
MessageBody = json.dumps((ip, request_time))
)
return {
'statusCode': response["ResponseMetadata"]["HTTPStatusCode"],
'body': json.dumps(response['ResponseMetadata']['HTTPStatusCode'])
}
import psycopg2
import sys
import boto3
import os
import json
conn = psycopg2.connect(
host = os.environ['HOST'],
dbname = os.environ['DBNAME'],
user = os.environ['USER'],
password = os.environ['PASSWORD'],
port = os.environ['PORT'],
)
def lambda_handler(event, context):
val = []
records = event['Records']
for record in records:
val.append(json.loads(record['body']))
print(val)
cursor = conn.cursor()
psl = "INSERT INTO lambda_logs (ip_address, access_time) VALUES (%s, %s)"
cursor.executemany(psl, val)
conn.commit()
return{
'statusCode':200,
'body':json.dumps('success')
}
환경변수 설정하기!
Lambda 함수에서 import 지원하지 않는 모듈 설정
저번에 했던 거 추가해주기!
{
"Records": [
{
"messageId": "6d0cf2d8-348c-49e5-8537-a81ca2af0846",
"receiptHandle": "AQEBnrZStxAoC6BV0iHyrP624JPso87YpjQgSYJ8i8ul98mbw7dEXtfbha9cQZ21kCnfrv6bt333hK/t9SZgRnwXXuo8dkg4732vn2xNMKM8xv7DCWzXDLam55XLJzq3UiQFIkZdANJhg/Yf+x3sLkRtnz4d7k9ZUj98i3+hHRCD5+WDaoLUzEoxzgASul7ccyOp75iskDNoi5oLvmTeeTHoPXtuw4QBsQ7dxeiIAv3OusJDk3fcmxSRQdKK5DqZxnzD4rKfZl8vU4241JGMzKNVmR3u9Tq15xZ6HSCPDaufCK0idgHJnZnh90Om/3HsIyBSXV1+cQZ9yBMTdyh6q9QpR5P2SQBgPJyBFmwdoQStvOpcJgmAPHi2bZf+OlmcU3OCvXjV+NEo5oUqhbgME8COrw==",
"body": "{\"client_ip\": \"127.0.0.1\", \"request_time\": \"09/Apr/2015:12:34:56 +0000\"}",
"attributes": {
"ApproximateReceiveCount": "6",
"SentTimestamp": "1655103586901",
"SenderId": "AROA2HSSVRNBCIQSBSLDZ:logs-3",
"ApproximateFirstReceiveTimestamp": "1655103586901"
},
"messageAttributes": {},
"md5OfBody": "dfabdcc0018f9979c421515d718bbe6d",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:ap-northeast-2:703474273090:logs",
"awsRegion": "ap-northeast-2"
}
]
}
import requests
from random import random
from concurrent.futures import ThreadPoolExecutor
def post_url(args):
return requests.post(args[0], data=args[1])
for i in range(10):
form_data = {"ikria":random()}
list_of_urls = [("https://ghcrol2iwf.execute-api.ap-northeast-2.amazonaws.com/default/logs-3",form_data)]*1000
with ThreadPoolExecutor(max_workers=20) as pool:
response_list = list(pool.map(post_url,list_of_urls))
for response in response_list:
print(response)
print(f"{(i+1)*1000}번 성공")
for문이 아닌 한 번에 대량으로 메시지가 들어갈 수 있게 코드를 짜야한다!
우리는 한 번에 10,000개의 메세지가 들어가도록 코드를 작성했다!
https://www.youtube.com/watch?v=nhEFJgIhvuk
http://labs.brandi.co.kr/2018/02/16/leesg.html