kafka 설정

이상민·2023년 6월 1일
0

spark

목록 보기
16/17

kafka 설치

kafka에는 master, slave 개념이 없음
kafka는 worker에만 설치

$ wget -O /skybluelee/kafka_2.12-3.0.0.tgz https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
$ cd /skybluelee
$ tar xvfz kafka_2.12-3.0.0.tgz

zookeeper 설정

/skybluelee/kafka_2.12-3.0.0/bin안에 zookeeper에 대한 설정 파일이 존재
worker별로 노드를 다르게 설정한다.
현 위치: worker-1

$ mkdir -p /skybluelee/kafka_2.12-3.0.0/zookeeper_data
$ echo '1' > /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid

$ cat /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
1

$ vi /skybluelee/kafka_2.12-3.0.0/config/zookeeper.properties
....
#dataDir=/tmp/zookeeper
dataDir=/skybluelee/kafka_2.12-3.0.0/zookeeper_data

initLimit=5
syncLimit=2

server.1=spark-worker-01:2888:3888
server.2=spark-worker-02:2888:3888
server.3=spark-worker-03:2888:3888

server1은 myid가 1인 worker-1을 의미, worker 2, 3도 myid를 변경할 것

kafka 설정

$ mkdir -p /skybluelee/kafka_2.12-3.0.0/kafka_logs
$ vi /skybluelee/kafka_2.12-3.0.0/config/server.properties
....
broker.id=1
....
advertised.listeners=PLAINTEXT://spark-worker-01:9092
....
#log.dirs=/tmp/kafka-logs
log.dirs=/skybluelee/kafka_2.12-3.0.0/kafka_logs

delete.topic.enable=true // version에 따라 파일 안에 있을 수도 있음
auto.create.topics.enable=false // 이 경우에는 추가
....
#zookeeper.connect=localhost:2181
zookeeper.connect=spark-worker-01:2181,spark-worker-02:2181,spark-worker-03:2181
....

다른 worker 설정

$ scp -r /skybluelee/kafka_2.12-3.0.0 spark-worker-02:/skybluelee/
$ scp -r /skybluelee/kafka_2.12-3.0.0 spark-worker-03:/skybluelee/

scp -r: 내부 파일 까지 전부 복사

worker-2

$ echo '2' > /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
$ cat /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
2

$ vi /skybluelee/kafka_2.12-3.0.0/config/server.properties
....
broker.id=2
....
advertised.listeners=PLAINTEXT://spark-worker-02:9092
....

worker-3

$ echo '3' > /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
$ cat /skybluelee/kafka_2.12-3.0.0/zookeeper_data/myid
3


$ vi /skybluelee/kafka_2.12-3.0.0/config/server.properties
....
broker.id=3
....
advertised.listeners=PLAINTEXT://spark-worker-03:9092
....

zookeeper 실행

$ nohup /skybluelee/kafka_2.12-3.0.0/bin/zookeeper-server-start.sh /skybluelee/kafka_2.12-3.0.0/config/zookeeper.properties > /skybluelee/kafka_2.12-3.0.0/bin/nohup_zookeeper.out &

$ jps
1849 QuorumPeerMain

모든 worker에서 실행해야 함
zookeeper-server-start.sh를 실행, zookeeper.properties를 참조, & 백그라운드 환경에서 실행

kafka 실행

$ nohup /skybluelee/kafka_2.12-3.0.0/bin/kafka-server-start.sh /skybluelee/kafka_2.12-3.0.0/config/server.properties > /skybluelee/kafka_2.12-3.0.0/bin/nohup_kafka.out &
$ jps
1532 Kafka

모든 worker에서 실행해야 함

topic 생성

$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --topic tweet --partitions 3 --replication-factor 3 --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092

tweet이란 topic을 3개의 partition으로 나누어 생성하고, 문제 해결을 위해 3 copy

$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --list --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092
tweet

topic list 반환

$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --describe --topic tweet --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092
Topic: tweet    TopicId: AjkHOMV2REWuCzI_l5Lrxg PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: tweet    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: tweet    Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: tweet    Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

topic 상세 정보 확인(describe 사용)

- (필요시) 토픽 삭제....
$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-topics.sh --delete --topic tweet --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092

실행

콘솔 프로듀서 실행

worker-01

$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --broker-list spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092 --topic tweet
>

콘솔로 메시지 입력

콘솔 컨슈머 실행

worker-02, worker-03

$ /skybluelee/kafka_2.12-3.0.0/bin/kafka-console-consumer.sh --bootstrap-server spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092 --topic tweet
// 아무것도 뜨지 않음

부가 설명

프로듀서는 topic(tweet)에 메시지를 넣고 컨슈머는 토픽을 바라본다. 이때 메시지가 들어오면 컨슈머가 능동적으로 메시지를 가지고 온다. kafka는 수동적으로 데이터가 들어오면 디스크에 쌓기만 하고 컨슈머가 요청하고 컨슈머가 가져감.

0개의 댓글