Kafka metric monitoring 1

홍기대·2022년 5월 6일
0

Kafka

목록 보기
4/5

이번 포스팅은 kafka를 통해 producer와 consumer가 데이터를 주고 받는 과정에서 kafka에서 발생하는 메트릭에 대해 모니텅 하고자 한다.
필자는 총 3대의 서버를 사용했으며, 각각의 역할은 kafka-client, broker, monitoring이다. kafka-client에는 producer와 consumer가 동작하고 있고, broker에는 zookeeper와 kafka가 동작하고 있으며, broker에서 발생하는 JMX 메트릭을 jconsole(노트북)과 ELK가 동작하고 있는 monitoring서버에서 모니터링하고자 한다. 전체구성도는 아래와 같다.

🚩 broker 구축

Java 설치 및 JAVA_HOME(환경변수) 설정은 생략한다. 필자는 java-1.8.0-openjdk 사용

1. apache kafka 설치

> curl -O https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz 
> tar xvf kafka_2.12-2.8.0.tgz
> rm -rf kafka_2.12-2.8.0.tgz
> cd kafka_2.12-2.8.0

> cd ./kafka_2.12-2.8.0
## producer 또는 consumer에서 broker에 접속할 때 사용 가능한 hostname 설정(필자는 broker01로 설정) 
> vi config/server.properties
advertised.listeners=PLAINTEXT://broker01:9092

2. kafka broker 구동

java.rmi.server.hostname은 본인의 외부 IP로 변경
rmi.server.hostname을 설정해야, 외부의 jconsole과 같은 도구에서 접속 가능

> export KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false 
  -Dcom.sun.management.jmxremote.ssl=false 
  -Dcom.sun.management.jmxremote.port=9999 
  -Dcom.sun.management.jmxremote.rmi.port=9999
  -Djava.rmi.server.hostname=<broker01서버 외부 ip>'

# 1) Foregroud 
> env JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties

# 2) background 실행
> env JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties

3. topic 생성

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic kafka-hong \
--partitions 2 --replication-factor 1

# 생성된 topic 확인
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092 

4. jconsole

kafka broker01의 설정은 다 되었다. 9999포트를 통해 JMX metric을 모니터링할 예정인데 방화벽이 있다면 9999포트를 허용해줘야 한다.


cmd 창에서 jconsole을 입력하면 아래와 같은 창이 뜰것이다.

아래 Romote Process 창에 broker01의 외부ip에 9999포트를 입력하면 (broker01의 ip:9999) kafka broker에서 발생하는 JMX 메트릭을 볼 수 있다.

🚩 kafka-client(producer)

Java 설치 및 JAVA_HOME(환경변수) 설정은 생략한다. 필자는 java-1.8.0-openjdk 사용
/etc/host에서 broker01 설정을 해준다.

1. logstash 설치

curl -LO https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.15.0-linux-x86_64.tar.gz
> tar xvf logstash-oss-7.15.0-linux-x86_64.tar.gz  
> rm -rf logstash-oss-7.15.0-linux-x86_64.tar.gz
> cd logstash-7.15.0

2. Configure the logstash.yml

  • kafka producer로 동작하기 위해서, 일정 주기로 일정 메세지를 전달하도록 설정 필요.
  • logstash를 기본으로 사용하면, 전송 성능을 위해서 정의된 batch size 만큼을 보내도록 설정됨.
  • 따라서 본 실습을 위해서는 1개의 thread가 한번에 1개의 메세지만 보내도록 설정한다.
  • 관련 configuration
# logstash에서 동시에 실행 가능한 thread
pipeline.workers: 2
# logstash에서 한번에 전송할 batch size
pipeline.batch.size: 125
  • logstash 실행시 적용 가능한 option
thread 게수 : -w 또는 --pipeline.workers
batch 개수 : -b, --pipeline.batch.size

3. producer.conf 파일 작성

## Add producer config file 
> mkdir ./logstash_conf
> vi ./logstash_conf/producer.conf

#producer.conf 파일
input {
    file {
        path=>"데이터 경로"
        start_position => "beginning"
    } 
}

filter {
    sleep {
        time => "1"   # Sleep 1 second
        every => 1   # on every 1th event
    }
}

output {
    stdout{
        codec => rubydebug
    }

    kafka { 
        bootstrap_servers => "broker01:9092"
        topic_id =>  "kafka-hong"
        codec => plain {
            format => "%{message}"
        }
    }
}

4. logstash 실행

producer 또한 JMX 메트릭을 관찰 할 수 있다. 하고 싶으면 아래 명령어 입력

> export LS_JAVA_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false 
  -Dcom.sun.management.jmxremote.ssl=false 
  -Dcom.sun.management.jmxremote.port=9998 
  -Dcom.sun.management.jmxremote.rmi.port=9998 
  -Djava.rmi.server.hostname=<kakfa-client 외부 ip>'
# thread 1개, batch size 1개로 데이터를 kafka로 전송
> ./logstash-7.15.0/bin/logstash -w 1 -b 1 -f ./logstash_conf/producer.conf
# 각 파일의 위치는 잘 파악하고 알아서 지정해준다.

🚩 kafka-client(Consumer)

1. consumer.conf 파일 작성

## Add consumer config file 
> mkdir ~/logstash_conf
> vi ~/logstash_conf/consumer.conf
#consumer.conf 파일
input {
    kafka {
      bootstrap_servers => "broker01:9092"
      group_id => "consumer_group_1"
      topics => ["kafka-hong"]
      consumer_threads => 1
    }
}

filter {
    sleep {
        time => "1"   # Sleep 1 second
        every => 1   # on every 1th event
    }
}

output {
    stdout{
        codec => rubydebug
    }
}

2. logstash 실행

producer와 마찬가지로 consumer도 메트릭 관찰 가능

> export LS_JAVA_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false 
  -Dcom.sun.management.jmxremote.ssl=false 
  -Dcom.sun.management.jmxremote.port=9997 
  -Dcom.sun.management.jmxremote.rmi.port=9997 
  -Djava.rmi.server.hostname=<kakfa-client 외부 ip>'
  • thread 1개, batch size 1개로 데이터를 kafka로 전송
  • logstash 실행시 -b (batch.size) 옵션을 1로 설정해야, broker에 데이터가 1건이라로 도착하면, logstash에서 consumer thread로 데이터를 전송하고, 이를 화면에 출력한다.
  • -b 128로 하면, broker에서 128건을 가져올 때 까지 기다린 후 화면에 출력함.
  • path.data 옵션 : 각 logstash 프로세스에서 내부적으로 관리하기 위한 데이터를 저장하기 위한 공간
  • 이전 producer에서 이미 default 경로(./data)를 사용하고 있으므로, consumer용 logstash에서 사용하기 위한 경로를 지정한다.
> mkdir ~/data
> ./logstash-7.15.0/bin/logstash -w 1 -b 1 --path.data ~/data/consumer_data -f ./logstash_conf/consumer.conf

왼쪽이 producer 오른쪽이 consumer 출력 화면

3. LS_JAVA_OPT 설정을 logstash 시작시에 기본으로 적용하는 방식

> vi logstash-7.15.0/config/jvm.options

## 위 파일 내용에 아래 jmx 관련된 내용을 LS_JAVA_OPTS에 추가
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9998 -Dcom.sun.management.jmxremote.rmi.port=9998 -Djava.rmi.server.hostname=<kakfa-client 외부 ip>

그리고 broker 모니터링과 똑같이 지정해준 포트와 kafka-client 외부 ip를 이용하여 jconsole에서 모니터링 할 수 있다.

profile
열심히 살자

0개의 댓글