이번 포스팅은 kafka를 통해 producer와 consumer가 데이터를 주고 받는 과정에서 kafka에서 발생하는 메트릭에 대해 모니텅 하고자 한다.
필자는 총 3대의 서버를 사용했으며, 각각의 역할은 kafka-client, broker, monitoring이다. kafka-client에는 producer와 consumer가 동작하고 있고, broker에는 zookeeper와 kafka가 동작하고 있으며, broker에서 발생하는 JMX 메트릭을 jconsole(노트북)과 ELK가 동작하고 있는 monitoring서버에서 모니터링하고자 한다. 전체구성도는 아래와 같다.
Java 설치 및 JAVA_HOME(환경변수) 설정은 생략한다. 필자는 java-1.8.0-openjdk 사용
> curl -O https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
> tar xvf kafka_2.12-2.8.0.tgz
> rm -rf kafka_2.12-2.8.0.tgz
> cd kafka_2.12-2.8.0
> cd ./kafka_2.12-2.8.0
## producer 또는 consumer에서 broker에 접속할 때 사용 가능한 hostname 설정(필자는 broker01로 설정)
> vi config/server.properties
advertised.listeners=PLAINTEXT://broker01:9092
java.rmi.server.hostname은 본인의 외부 IP로 변경
rmi.server.hostname을 설정해야, 외부의 jconsole과 같은 도구에서 접속 가능
> export KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.rmi.port=9999
-Djava.rmi.server.hostname=<broker01서버 외부 ip>'
# 1) Foregroud
> env JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties
# 2) background 실행
> env JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic kafka-hong \
--partitions 2 --replication-factor 1
# 생성된 topic 확인
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
kafka broker01의 설정은 다 되었다. 9999포트를 통해 JMX metric을 모니터링할 예정인데 방화벽이 있다면 9999포트를 허용해줘야 한다.
cmd 창에서 jconsole을 입력하면 아래와 같은 창이 뜰것이다.
아래 Romote Process 창에 broker01의 외부ip에 9999포트를 입력하면 (broker01의 ip:9999) kafka broker에서 발생하는 JMX 메트릭을 볼 수 있다.
Java 설치 및 JAVA_HOME(환경변수) 설정은 생략한다. 필자는 java-1.8.0-openjdk 사용
/etc/host에서 broker01 설정을 해준다.
curl -LO https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.15.0-linux-x86_64.tar.gz
> tar xvf logstash-oss-7.15.0-linux-x86_64.tar.gz
> rm -rf logstash-oss-7.15.0-linux-x86_64.tar.gz
> cd logstash-7.15.0
- kafka producer로 동작하기 위해서, 일정 주기로 일정 메세지를 전달하도록 설정 필요.
- logstash를 기본으로 사용하면, 전송 성능을 위해서 정의된 batch size 만큼을 보내도록 설정됨.
- 따라서 본 실습을 위해서는 1개의 thread가 한번에 1개의 메세지만 보내도록 설정한다.
- 관련 configuration
# logstash에서 동시에 실행 가능한 thread pipeline.workers: 2 # logstash에서 한번에 전송할 batch size pipeline.batch.size: 125
- logstash 실행시 적용 가능한 option
thread 게수 : -w 또는 --pipeline.workers batch 개수 : -b, --pipeline.batch.size
## Add producer config file
> mkdir ./logstash_conf
> vi ./logstash_conf/producer.conf
#producer.conf 파일
input {
file {
path=>"데이터 경로"
start_position => "beginning"
}
}
filter {
sleep {
time => "1" # Sleep 1 second
every => 1 # on every 1th event
}
}
output {
stdout{
codec => rubydebug
}
kafka {
bootstrap_servers => "broker01:9092"
topic_id => "kafka-hong"
codec => plain {
format => "%{message}"
}
}
}
producer 또한 JMX 메트릭을 관찰 할 수 있다. 하고 싶으면 아래 명령어 입력
> export LS_JAVA_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9998
-Dcom.sun.management.jmxremote.rmi.port=9998
-Djava.rmi.server.hostname=<kakfa-client 외부 ip>'
# thread 1개, batch size 1개로 데이터를 kafka로 전송
> ./logstash-7.15.0/bin/logstash -w 1 -b 1 -f ./logstash_conf/producer.conf
# 각 파일의 위치는 잘 파악하고 알아서 지정해준다.
## Add consumer config file
> mkdir ~/logstash_conf
> vi ~/logstash_conf/consumer.conf
#consumer.conf 파일
input {
kafka {
bootstrap_servers => "broker01:9092"
group_id => "consumer_group_1"
topics => ["kafka-hong"]
consumer_threads => 1
}
}
filter {
sleep {
time => "1" # Sleep 1 second
every => 1 # on every 1th event
}
}
output {
stdout{
codec => rubydebug
}
}
producer와 마찬가지로 consumer도 메트릭 관찰 가능
> export LS_JAVA_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9997
-Dcom.sun.management.jmxremote.rmi.port=9997
-Djava.rmi.server.hostname=<kakfa-client 외부 ip>'
- thread 1개, batch size 1개로 데이터를 kafka로 전송
- logstash 실행시 -b (batch.size) 옵션을 1로 설정해야, broker에 데이터가 1건이라로 도착하면, logstash에서 consumer thread로 데이터를 전송하고, 이를 화면에 출력한다.
- -b 128로 하면, broker에서 128건을 가져올 때 까지 기다린 후 화면에 출력함.
- path.data 옵션 : 각 logstash 프로세스에서 내부적으로 관리하기 위한 데이터를 저장하기 위한 공간
- 이전 producer에서 이미 default 경로(./data)를 사용하고 있으므로, consumer용 logstash에서 사용하기 위한 경로를 지정한다.
> mkdir ~/data
> ./logstash-7.15.0/bin/logstash -w 1 -b 1 --path.data ~/data/consumer_data -f ./logstash_conf/consumer.conf
왼쪽이 producer 오른쪽이 consumer 출력 화면
> vi logstash-7.15.0/config/jvm.options
## 위 파일 내용에 아래 jmx 관련된 내용을 LS_JAVA_OPTS에 추가
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9998 -Dcom.sun.management.jmxremote.rmi.port=9998 -Djava.rmi.server.hostname=<kakfa-client 외부 ip>
그리고 broker 모니터링과 똑같이 지정해준 포트와 kafka-client 외부 ip를 이용하여 jconsole에서 모니터링 할 수 있다.