1) private 서브넷 내 인스턴스 구축 후, MSK 적용
2) Topic별 MSK 람다 트리거 테스트
ex)
Public-Subnet-A-lambda--11.0.0.0/24--ap-northeast-1a
Public-Subnet-B-lambda--11.0.1.0/24--ap-northeast-1b
Private-Subnet-A-lambda--11.0.2.0/24--ap-northeast-1a
Private-Subnet-B-lambda--11.0.3.0/24--ap-northeast-1b
(1) Public, Private Route Table 각 1개 씩 생성
(2) Public Route Table에 IGW 연결
(3) Public Route Table에 Public 서브넷 연결
(1) MSK 및 Private EC2 보안 그룹 수정
private, public 서브넷에 각 1개의 AWS Linux 인스턴스 생성
MSK 인바운드 트래픽에 Private 인스턴스 보안그룹 적용
Private 인스턴스 인바운드 트래픽에 MSK 보안그룹 적용
(2) Private 인스턴스 MSK 테스트
참고)
- https://medium.com/srcecde/copy-file-directory-to-from-ec2-using-scp-secure-copy-685c46636399
- scp -i msk.pem msk.pem ec2-user@3.35.133.131:/home/ec2-user/test
sudo yum install java-1.8.0-openjdk
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xvf kafka_2.12-2.8.1.tgz
cd kafka_2.12-2.8.1
bin/kafka-topics.sh --create --topic demo_testing2 --bootstrap-server
{bootstrapSever URLS} --replication-factor 1 --partitions 2
bin/kafka-console-producer.sh --topic demo_testing2 --bootstrap-server {bootstrapSever URLS}
bin/kafka-console-consumer.sh --topic demo_testing2 --bootstrap-server {bootstrapSever URLS}
import base64
import boto3
import json
def lambda_handler(event, context):
# TODO implement
print(event)
for partition_key in event['records']:
partition_value=event['records'][partition_key]
for record_value in partition_value:
print((base64.b64decode(record_value['value'])).decode())
참고) https://aws.amazon.com/ko/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/