kafka에는 master, slave 개념이 없음
kafka는 worker에만 설치
$ wget -O /skybluelee/kafka_2.12-3.0.0.tgz https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
$ cd /skybluelee
$ tar xvfz kafka_2.12-3.0.0.tgz
/skybluelee/kafka_2.12-3.0.0/bin
안에 zookeeper에 대한 설정 파일이 존재
worker별로 노드를 다르게 설정한다.
현 위치: worker-1
$ mkdir -p /skybluelee/kafka_2.12-3.0.0/zookeeper_data
$ echo '1' > /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
$ cat /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
1
$ vi /skybluelee/kafka_2.12-3.0.0/config/zookeeper.properties
....
#dataDir=/tmp/zookeeper
dataDir=/skybluelee/kafka_2.12-3.0.0/zookeeper_data
initLimit=5
syncLimit=2
server.1=spark-worker-01:2888:3888
server.2=spark-worker-02:2888:3888
server.3=spark-worker-03:2888:3888
server1은 myid가 1인 worker-1을 의미, worker 2, 3도 myid를 변경할 것
$ mkdir -p /skybluelee/kafka_2.12-3.0.0/kafka_logs
$ vi /skybluelee/kafka_2.12-3.0.0/config/server.properties
....
broker.id=1
....
advertised.listeners=PLAINTEXT://spark-worker-01:9092
....
#log.dirs=/tmp/kafka-logs
log.dirs=/skybluelee/kafka_2.12-3.0.0/kafka_logs
delete.topic.enable=true // version에 따라 파일 안에 있을 수도 있음
auto.create.topics.enable=false // 이 경우에는 추가
....
#zookeeper.connect=localhost:2181
zookeeper.connect=spark-worker-01:2181,spark-worker-02:2181,spark-worker-03:2181
....
$ scp -r /skybluelee/kafka_2.12-3.0.0 spark-worker-02:/skybluelee/
$ scp -r /skybluelee/kafka_2.12-3.0.0 spark-worker-03:/skybluelee/
scp -r: 내부 파일 까지 전부 복사
$ echo '2' > /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
$ cat /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
2
$ vi /skybluelee/kafka_2.12-3.0.0/config/server.properties
....
broker.id=2
....
advertised.listeners=PLAINTEXT://spark-worker-02:9092
....
$ echo '3' > /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
$ cat /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
3
$ vi /skybluelee/kafka_2.12-3.0.0/config/server.properties
....
broker.id=3
....
advertised.listeners=PLAINTEXT://spark-worker-03:9092
....
$ nohup /skybluelee/kafka_2.12-3.0.0/bin/zookeeper-server-start.sh /skybluelee/kafka_2.12-3.0.0/config/zookeeper.properties > /skybluelee/kafka_2.12-3.0.0/bin/nohup_zookeeper.out &
$ jps
1849 QuorumPeerMain
모든 worker에서 실행해야 함
zookeeper-server-start.sh를 실행, zookeeper.properties를 참조, & 백그라운드 환경에서 실행
$ nohup /skybluelee/kafka_2.12-3.0.0/bin/kafka-server-start.sh /skybluelee/kafka_2.12-3.0.0/config/server.properties > /skybluelee/kafka_2.12-3.0.0/bin/nohup_kafka.out &
$ jps
1532 Kafka
모든 worker에서 실행해야 함
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --topic tweet --partitions 3 --replication-factor 3 --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092
tweet이란 topic을 3개의 partition으로 나누어 생성하고, 문제 해결을 위해 3 copy
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --list --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092
tweet
topic list 반환
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --describe --topic tweet --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092
Topic: tweet TopicId: AjkHOMV2REWuCzI_l5Lrxg PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: tweet Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: tweet Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: tweet Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
topic 상세 정보 확인(describe 사용)
- (필요시) 토픽 삭제....
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --delete --topic tweet --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092
worker-01
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --broker-list spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092 --topic tweet
>
콘솔로 메시지 입력
worker-02, worker-03
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-console-consumer.sh --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092 --topic tweet
// 아무것도 뜨지 않음
프로듀서는 topic(tweet)에 메시지를 넣고 컨슈머는 토픽을 바라본다. 이때 메시지가 들어오면 컨슈머가 능동적으로 메시지를 가지고 온다. kafka는 수동적으로 데이터가 들어오면 디스크에 쌓기만 하고 컨슈머가 요청하고 컨슈머가 가져감.