[AWS] AWS ec2로 Kafka 구성 (2)

sunnyjjang·2024년 10월 9일

AWS

목록 보기
18/21
post-thumbnail

LAB. Kafka Server setting

워크플로우
1. kafka 인스턴스 생성
2. Kafka-server 구성
3. Kafka-producer 구성
4. kafka- Consumer 구성

업로드중..

1. 인스턴스 생성

  • name : kafka
  • AMI : Amazon Linux 2023
  • 인스턴스 유형 : t2.medium
  • key : lab-key
  • 네트워크
    • VPC : lab-vpc
    • 서브넷 : public-ap-northeast-2c (10.0.1.0/24)
      • 퍼블릭 IP 자동 할당 : 활성화 (실제 업무에서는 비활성화 권장)
    • SG : kafka-sg
  • 스토리지 : 20G, gp3
  • 인스턴스 개수 : 3개
  • 생성 후, Name tag 변경
    • kafka-server
    • kafka-producer
    • kafka-consumer

2. Kafka-server 구성

  1. 웹 서버 설치
# ssh 서버창에서 보여지는 이름 변경
[ec2-user@ip-10-0-0-87 ~]$ sudo hostnamectl hostname kafka-server
[ec2-user@ip-10-0-0-87 ~]$ exit
# 다시 접속하면 ip 주소 대신 설정한 이름으로 노출
[ec2-user@kafka-server ~]$ 

# root 계정 변경
[ec2-user@kafka-server ~]$ sudo su -

# 아파치 웹서버 설치
[root@kafka-server ~]# yum update -y 
[root@kafka-server ~]# yum install httpd -y 
[root@kafka-server ~]# systemctl enable --now  httpd
[root@kafka-server ~]# systemctl status httpd

# index.html 입력
[root@kafka-server ~]# echo  "<html><h1><br><br><center> AWS 데이터 파이프라인 구축 </center> </h1></html>" > /var/www/html/index.html 
exit
  • public ip 로 index.html 확인
  1. java 설치
[ec2-user@kafka-server ~]$ sudo yum search java-11
[ec2-user@kafka-server ~]$ sudo yum install -y java-11-amazon-corretto
[ec2-user@kafka-server ~]$ java --version
  1. kafka 설치
# kafka 다운로드
[ec2-user@kafka-server ~]$ wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
# zip 파일 압축 해제
[ec2-user@kafka-server ~]$ tar zxvf kafka_2.13-3.8.0.tgz
# 해제된 파일 리스트 확인
[ec2-user@kafka-server ~]$ ls
kafka  kafka_2.13-3.8.0  kafka_2.13-3.8.0.tgz
# 디렉토리 이동
[ec2-user@kafka-server ~]$ cd kafka
# ls 폴더 리스트 업 - ls 폴더들에 카프카, 주키퍼 전부 존재
[ec2-user@kafka-server kafka]$ ls
LICENSE  NOTICE  bin  config  libs  licenses  site-docs
  1. jookeeper 실행
# zookeeper.properties의 구성 정보 보기 - Zookeeper의 설정을 정의한 파일
# kafka 서버에 주키퍼 운영 구성이 되어있는지 확인
[ec2-user@kafka-server kafka]$ cat config/zookeeper.properties 

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

#----> 주키서 포트 넘버 2181임을 확인

# 주키퍼 백그라운드 실행
[ec2-user@kafka-server kafka]$ ./bin/zookeeper-server-start.sh config/zookeeper.properties &
  1. kafka 실행
# server.properties의 구성 정보 보기 - Apache Kafka 브로커의 구성 파일
# Kafka 서버의 동작을 제어하는 다양한 설정이 포함
[ec2-user@kafka-server kafka]$ cat config/server.properties | grep ^[a-z]
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

# 카프카 서버(브로커) 백그라운드 실행
[ec2-user@kafka-server kafka]$ ./bin/kafka-server-start.sh config/server.properties & 
# 실행 확인
[ec2-user@kafka-server kafka]$ sudo netstat -napt
tcp6       0      0 :::2181                 :::*                    LISTEN      26719/java
tcp6       0      0 :::9092                 :::*                    LISTEN      27207/java
...
# 주키퍼 클라이언트 2181
# 카프카 서버(브로커) 9092
  1. kafka topic 생성
# 토픽 apartinfo 생성: 파티션 1개, 복제 파티션 없음(1), 운영 서버 localhost:9092 
[ec2-user@kafka-server kafka]$ bin/kafka-topics.sh --create --topic apartinfo --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092  &
...
[2024-09-02 03:55:20,205] INFO [Partition apartinfo-0 broker=0] Log loaded for partition apartinfo-0 with initial high watermark 0 (kafka.cluster.Partition)
Created topic apartinfo.

# 생성된 Topic 확인 - localhost:9092 로 서비스 하고 있는 토픽 확인
[ec2-user@kafka-server kafka]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
apartinfo

# kafka 실행을 용이하게 하기 위해 PATH 에 등록
[ec2-user@kafka-server kafka]$ vi ~/.bash_profile
# 파일에 해당 패스 추가
PATH=$PATH:~/kafka/bin 

# path 추가한 파일 다시 로드하여 설정 반영
[ec2-user@kafka-server kafka]$ source ~/.bash_profile

# 파일에 등록되어 있는 패스 목록
[ec2-user@kafka-server kafka]$ echo $PATH
/home/ec2-user/.local/bin:/home/ec2-user/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/ec2-user/kafka/bin

3. Kafka-producer 구성

  1. Logstash 설치
[root@kafka-producer ~]# hostname
kafka-producer

# logstash 리포지토리 등록 후 설치
# aws 레포지토리에 logstash는 없음 (대부분의 프로그램은 있음)
# logstash 설치할 레포지토리의 url의 인증키를 등록
[root@kafka-producer ~]# 
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat <<END > /etc/yum.repos.d/logstash.repo
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
END

[root@kafka-producer ~]# yum install logstash -y

# PATH에 등록 (kafka 패스 등록한 것과 같은 이유)
[ec2-user@kafka-producer ~]$ vi ~/.bash_profile
# 파일에 하단 두줄 추가
export LS_HOME=/usr/share/logstash
PATH=$PATH:$LS_HOME/bin

# 파일 다시 로드해서 path 설정 즉시 적용
[ec2-user@kafka-producer ~]$ source ~/.bash_profile

# 설치 확인
[ec2-user@kafka-producer ~]$ logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 8.15.2
  1. logstash 파일 생성
# 엑세스 키 정보 입력 
[ec2-user@kafka-producer ~]$ vi ~/apartinfo_test.conf
# 아래 내용 입력
input {
      s3 {
        sincedb_path => "/dev/null"
        access_key_id => "AKIARWPFIQ*****"
        secret_access_key => "wQruFZaMCNr8Bhar2****"
        region => "ap-northeast-2"
        prefix => "ods/danji_master.json/" #bucket 하위 폴더를 지정해준다. 
        bucket => "es-kafka-test-bucket"          #bucket 명을 지정해준다.
        additional_settings => {
          force_path_style => true
          follow_redirects => false
        }
      }
    }

output {
  stdout { }
}

# 권한 부여
[ec2-user@kafka-producer ~]$ sudo chmod 777 /usr/share/logstash/data

# 내용 확인
# 명령 실행후 아래의 결과처럼 
# S3 버킷에 저장된 데이터를 가져와서 STDOUT이 되면 성공
# Ctrl+C로 중지
[ec2-user@kafka-producer ~]$ logstash -f /home/ec2-user/apartinfo_test.conf
Using bundled JDK: /usr/share/logstash/jdk
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2024-09-02 06:42:31.459 [main] runner - Starting Logstash {"logstash.version"=>"8.15.0", "jruby.version"=>"jruby 9.4.8.0 (3.1.4) 2024-07-02 4d41e55a67 OpenJDK 64-Bit Server VM 21.0.4+7-LTS on 21.0.4+7-LTS +indy +jit [x86_64-linux]"}
[INFO ] 2024-09-02 06:42:31.466 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11]
...
[INFO ] 2024-09-02 06:42:35.467 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2024-09-02 06:42:35.477 [[main]<s3] s3 - Using the provided sincedb_path {:sincedb_path=>"/dev/null"}
[INFO ] 2024-09-02 06:42:35.500 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
{
         "event" => {
        "original" => "{\"id\":\"4\",\"danji_name\":\"강변우성\",\"sido\":\"서울특별시\",\"sigungu\":\"광진구\",\"dong\":\"구의동\",\"address2\":\"546-7\",\"new_address\":\"서울 광진구 구의강변로 42\",\"house_count\":\"354세대\",\"dong_count\":\"3개동\",\"build_ym\":\"1992년 12월\"}\n"
    },
      "@version" => "1",
    "@timestamp" => 2024-09-02T06:42:37.082802348Z,
       "message" => "{\"id\":\"4\",\"danji_name\":\"강변우성\",\"sido\":\"서울특별시\",\"sigungu\":\"광진구\",\"dong\":\"구의동\",\"address2\":\"546-7\",\"new_address\":\"서울 광진구 구의강변로 42\",\"house_count\":\"354세대\",\"dong_count\":\"3개동\",\"build_ym\":\"1992년 12월\"}\n"
}
...
<Ctrl><C> 눌러주세요.
^C[WARN ] 2024-09-02 07:02:36.713 [SIGINT handler] runner - SIGINT received. Shutting down.
[INFO ] 2024-09-02 07:02:36.783 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[INFO ] 2024-09-02 07:02:37.727 [Converge PipelineAction::StopAndDelete<main>] pipelinesregistry - Removed pipeline from registry successfully {:pipeline_id=>:main}
[INFO ] 2024-09-02 07:02:37.735 [LogStash::Runner] runner - Logstash shut down.

# Logstash를 사용하여 S3에서 데이터를 읽어와 Kafka로 전송하는 작업을 수행하도록 구성
# 출력되는 S3 버킷의 내용은 kafka 서버로 전달하도록 output을 수정
[ec2-user@kafka-producer ~]$ vi ~/apartinfo_test.conf
input {
      s3 {
        sincedb_path => "/dev/null"
        access_key_id => "AKIARWPFIQ*******"
        secret_access_key => "wQruFZaMCNr8B*******"
        region => "ap-northeast-2"
        prefix => "ods/danji_master.json/" #bucket 하위 폴더를 지정해준다. 
        bucket => "es-kafka-test-bucket"          #bucket 명을 지정해준다.
        additional_settings => {
          force_path_style => true
          follow_redirects => false
        }
      }
    }

output {
    stdout { }
    kafka {
        codec => json
        topic_id => "apartinfo"
        bootstrap_servers =>  ["10.0.1.61:9092"] <- kafka-server의 private ip
    }  
}


# 수정후 다시 실행
[ec2-user@kafka-producer ~]$ logstash -f /home/ec2-user/apartinfo_test.conf
  • producer 에서 보내는 output을 server에서 잘 받고 있는지 확인 하기 위해 server에서 명령어 실행
[ec2-user@kafka-server kafka]$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic apartinfo --from-beginning
# 출력결과는 producer에서 나오는 결과와 동일한 데이터

4. kafka- Consumer 구성

  1. logstash 설치
[ec2-user@ip ~]$ sudo hostnamectl hostname consumer
[ec2-user@ip ~]$ exit

[ec2-user@kafka-consumer ~]$ sudo -i

# 리포지토리 인증서 등록 후, 설치
[root@kafka-consumer ~]# 
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat << END > /etc/yum.repos.d/logstash.repo
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
END

yum install logstash -y

# PATH에 등록 (kafka 패스 등록한 것과 같은 이유)
[ec2-user@kafka-consumer ~]$ vi ~/.bash_profile
# 파일에 두줄 추가
export LS_HOME=/usr/share/logstash
PATH=$PATH:$LS_HOME/bin
exit 
# 파일 다시 로드해서 path 설정 즉시 적용
[ec2-user@kafka-consumer ~]$ source ~/.bash_profile

# 설치 확인
[ec2-user@kafka-consumer ~]$ logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 8.15.2
  1. logstash 구성
# Logstash를 사용하여 Kafka queue에서 apartinfo 토픽 데이터를 읽어와 화면에 STDOUT

[ec2-user@kafka-consumer ~]$ vi consumer.conf
input {
    kafka {
        bootstrap_servers => "10.0.1.61:9092" # kafka-server 의 private ip
        group_id => "apart_info"
        topics => ["apartinfo"] # Topic 이름 지정
        consumer_threads => 1 # Consumer 처리 Thread갯수 지정
        }
}

output {
  stdout { codec => rubydebug }
}
[ec2-user@kafka-consumer ~]$ sudo chmod 777 /usr/share/logstash/data
[ec2-user@kafka-consumer ~]$ logstash -f /home/ec2-user/consumer.conf

실행결과 소스 가져와서 출력하는것을 볼수 있음.
확인 했으면 Ctrl+c로 종료
  • 특정 키워드에 대해서 메세지 출력하는 기능 추가
[ec2-user@kafka-consumer ~]$ sudo yum install pip -y
[ec2-user@kafka-consumer ~]$ pip install confluent_kafka

#Kafka 브로커와 연결하여, 특정 토픽에서 들어오는 메시지를 지속적으로 폴링(polling)하고, 그 메시지를 콘솔에 출력하는 작업을 수행
[ec2-user@kafka-consumer ~]$ cat > consumer.py
from confluent_kafka import Consumer, KafkaError

# Define the Kafka consumer configuration
conf = {
    'bootstrap.servers': '10.0.1.61:9092',  # Replace with your Kafka bootstrap servers
    'group.id': 'apartinfo',           # Replace with your consumer group ID
    'auto.offset.reset': 'earliest',                # Set to 'earliest' to read from the beginning of the topic
}


# Create the Kafka consumer
consumer = Consumer(conf)

# Subscribe to the Kafka topic
topic = 'apartinfo'  # Replace with the Kafka topic you want to consume
consumer.subscribe([topic])

# Poll for messages
try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages, with a timeout of 1 second

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f"Reached end of partition for {msg.topic()} [{msg.partition()}]")
            else:
                print(f"Error: {msg.error()}")
        else:
            # Process the received message
            print(f"Received message: {msg.value().decode('utf-8')}\n")

except KeyboardInterrupt:
    pass
finally:
    # Close down consumer to commit final offsets.
    consumer.close()

[ec2-user@kafka-consumer ~]$ python3 consumer.py
profile
지금 이 순간이 다시 넘겨볼 수 있는 한 페이지가 될 수 있게

0개의 댓글