1) Api GateWay를 통해 데이터를 전송
2) Api GateWay 통합 요청을 통해, SQS에 데이터 적재
3) Producer Lambda SQS 트리거 후, 데이터 MSK에 전달
4) Consumer Lambda MSK 트리거 후, kinesis FireHose에 데이터 전달
5) Kinesis FireHose에서 S3 lake로 데이터 적재
*MSK 생성 및 네트워크 구축은 이전 실습을 참고해주세요.
sudo apt-get update
sudo apt install python3-virtualenv
virtualenv kafka_yt
source kafka_yt/bin/activate
python3 --version
sudo apt install python3-pip
python3 -m pip install --upgrade pip
mkdir -p lambda_layers/python/lib/python3.8/site-packages
cd lambda_layers/python/lib/python3.8/site-packages
pip install kafka-python -t .
cd /mnt/c/Users/USER/lambda_layers
sudo apt install zip
zip -r kafka_yt_demo.zip *
from time import sleep
from json import dumps
from kafka import KafkaProducer
import json
// 토픽반영
topic_name='{Provide the topic name}'
// MSK 부트스랩서버 주소 반영
producer = KafkaProducer(bootstrap_servers=['{Put the broker URLs}'
,'{Put the broker URLs}'],value_serializer=lambda x: dumps(x).encode('utf-8'))
def lambda_handler(event, context):
print(event)
for i in event['Records']:
sqs_message =json.loads((i['body']))
print(sqs_message)
producer.send(topic_name, value=sqs_message)
producer.flush()
<Kinesis FireHose 사용 이유>
로그 데이터를 처리하는 Lambda 함수가 있다고 가정해 봅시다. 이 함수는 파일을 생성하고 S3에 직접 업로드하는 방법으로 구현될 수 있습니다. 그러나, 로그 데이터가 대량으로 생성되는 경우 이 함수는 자주 호출되며 S3에 불필요한 I/O 작업이 발생할 수 있습니다.
대신에, Kinesis Firehose를 사용하여 로그 데이터를 전송하고 일괄 처리 할 수 있습니다. Lambda 함수에서 생성된 로그 데이터는 Kinesis Firehose로 전송되고, Kinesis Firehose는 일정한 시간이나 파일 크기에 도달하면 S3로 배송합니다. 이렇게하면 Lambda 함수에서 S3에 직접 액세스하지 않고도 대량의 데이터를 처리하고 S3 요금을 줄일 수 있습니다.
import base64
import boto3
import json
client = boto3.client('firehose')
def lambda_handler(event, context):
print(event)
for partition_key in event['records']:
partition_value=event['records'][partition_key]
for record_value in partition_value:
actual_message=json.loads((base64.b64decode(record_value['value'])).decode('utf-8'))
print(actual_message)
newImage = (json.dumps(actual_message)+'\n').encode('utf-8')
print(newImage)
// 위 생성한 FireHose Name 적용
response = client.put_record(
DeliveryStreamName='{Kinesis Delivery Stream Name}',
Record={
'Data': newImage
})
(생략)