AWS MSK as an event source for AWS Lambda 실습

임쿠쿠·2023년 4월 30일
0

kafka

목록 보기
2/6
post-thumbnail

0. 실습 목표

1) private 서브넷 내 인스턴스 구축 후, MSK 적용
2) Topic별 MSK 람다 트리거 테스트


1. 네트워크 구축

  • MSK를 생성 전, Kafka 클러스터를 적용할 VPC를 생성합니다.

1) VPC 생성

2) Subnet 생성

  • private 서브넷 2개와 public 서브넷 2개를 생성합니다.
  • 각 서브넷은 각기 다른 가용영역으로 설정합니다.
ex)
Public-Subnet-A-lambda--11.0.0.0/24--ap-northeast-1a
Public-Subnet-B-lambda--11.0.1.0/24--ap-northeast-1b
Private-Subnet-A-lambda--11.0.2.0/24--ap-northeast-1a
Private-Subnet-B-lambda--11.0.3.0/24--ap-northeast-1b

3) 외부 접속을 위한 Internet GateWay 생성 및 적용

  • IGW 생성 후 VPC 연결

4) Route Table 설정

(1) Public, Private Route Table 각 1개 씩 생성

(2) Public Route Table에 IGW 연결

(3) Public Route Table에 Public 서브넷 연결

5) NAT 게이트웨이 생성

  • Public 서브넷에 NAT 게이트웨이 생성

  • Private Route Table에 NAT 게이트웨이 연결


2. MSK 클러스터 생성 및 활용

1) MSK 클러스터 및 브로커 생성

  • private 서브넷에 클러스터 및 브로커 생성

  • 보안 설정

2) EC2 client 생성 및 보안그룹 설정

(1) MSK 및 Private EC2 보안 그룹 수정

  • private, public 서브넷에 각 1개의 AWS Linux 인스턴스 생성

  • MSK 인바운드 트래픽에 Private 인스턴스 보안그룹 적용

  • Private 인스턴스 인바운드 트래픽에 MSK 보안그룹 적용

(2) Private 인스턴스 MSK 테스트

  • public 인스턴스 연결 후, private 인스턴스 접속
참고) 
- https://medium.com/srcecde/copy-file-directory-to-from-ec2-using-scp-secure-copy-685c46636399
- scp -i msk.pem msk.pem ec2-user@3.35.133.131:/home/ec2-user/test
  • Private 인스턴스에 java, kafka 설치
sudo yum install java-1.8.0-openjdk
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xvf kafka_2.12-2.8.1.tgz
cd kafka_2.12-2.8.1
  • 토픽 생성
bin/kafka-topics.sh --create --topic demo_testing2 --bootstrap-server
{bootstrapSever URLS} --replication-factor 1 --partitions 2
  • Producer, Consumer 실행 후 테스트
bin/kafka-console-producer.sh --topic demo_testing2 --bootstrap-server {bootstrapSever URLS} 
bin/kafka-console-consumer.sh --topic demo_testing2 --bootstrap-server {bootstrapSever URLS}  


3. Lambda MSK 트리거 구성

1) Lambda 생성

  • 코드 Deploy
import base64
import boto3
import json

def lambda_handler(event, context):
    # TODO implement
    print(event)
    for partition_key in event['records']:
        partition_value=event['records'][partition_key]
        for record_value in partition_value:
             print((base64.b64decode(record_value['value'])).decode())

2) Lambda 권한 설정

  • VPC, MSK 정책 연결

3) Lambda Topic 별 MSK 트리거 구성

  • 위 EC2에서 생성한 Topic 반영

4) MSK 트리거 테스트

참고) https://aws.amazon.com/ko/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

profile
Pay it forward

0개의 댓글