이번 포스팅을 통해서 카프카의 토픽으로 메시지를 보내는 역할을 하는 프로듀서(Producer)
의 주요 기능을 알아보고 주요 옵션에는 무엇이 있는지를 살펴보겠습니다.
프로듀서의 주요 기능은 각각의 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것입니다. 키 값을 정해 해당 키를 가진 모든 메시지를 동일한 파티션으로 전송할 수 있습니다. 만약 키 값을 입력하지 않으면, 파티션은 라운드 로빈(round-robin) 방식으로 파티션에 균등하게 분배합니다.
- 각각의 메시지를 토픽 파티션에 매핑하고 리더에 요청을 보냄
- 키 값을 정해 해당 키를 가진 모든 메시지를 동일한 파티션으로 전송
- 키 값 없는 경우 라운드 방식으로 파티션에 균등하게 분배
명령어의 위치는 카프카 설치 경로인 (usr/local/kafka)
의 하위 디렉토리인 bin 디렉토리이고, 명령어는 kafka-topics.sh
입니다.
- --zookeeper : 주키퍼 리스트 작성
- --topic : 원하는 토픽 이름 작성
- --partitions : 토픽의 파티션을 작성
- --replication-factor : 프로듀서가 전달하는 메시지의 복제본을 얼마나 둘것인가 작성
- --create
## 토픽 생성 명령어
/user/local/kafka/bin/kafka-topics.sh \
--zookeepeer peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka \
--topic peter-topic \
--partitions 1 \
--replication-factor 3 \
--create
이제 생성하고 난 후 만들어진 토픽에 대한 상세 정보를 확인해보겠습니다. 생성하는 명령어와 동일한 kafka-topics.sh
를 이용하고, --zookeeper
옵션에는 주키퍼 리스트를, --topic
옵션에는 방금 만든 토픽 이름을 입력하고, --describe
을 하면 됩니다.
## 토픽 상세정보 출력 명령어
/usr/local/kafka/bin/kafka-topics.sh \
--zookeeper peter-zk001:2181/peter-kafka \
--topic peter-topic --describe
peter-topic은 1개의 파티션으로 구성되어 있거, 리플리케이션 팩터는 3이라는 내용이 출력되어있습니다. 0번 파티션의 정보는 리더가 2번에 위치하고, 리더와 ISR는 2,1,3에 위치하고 있다는 내용을 나타냅니다.
카프카에서는 테스트 목적 등으로 토픽에 메시지를 보낼 수 있는 콘솔을 제공합니다.
위치는 /usr/local/kafka
의 하위 디렉터리인 bin 디렉터리
에 있고, 명령어는 kafka-console-producer.sh
입니다.
- --broker-list : 카프카 클러스터 내 모든 브로커 리스트 작성
- --topic : 메시지를 보내고자 하는 토픽이름 작성
## 프로듀서 콘솔 명령어
/usr/local/kafka/bin/kafka-console-producer.sh \
--broker-list peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic
보낸 메시지가 잘 전송되었는지 메시지를 가져와서 확인하는 명령어를 확인해보겠습니다.
위치는 동일하며, 명령어는 kafka-console-consumer.sh
입니다.
- --bootstrap-server
- --topic : 메시지를 확인하고자 하는 토픽 이름 작성
- --from-beginning
## 컨슈머 콘솔 명령어
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic \
--from-beginning
이번에는 콘솔을 통해서 메시지를 전송하는 것이 아닌 python code를 통해서 전송하는 기본적인 방법을 알아보겠습니다. 여러 샘플들이 존재하는데 reference는 https://github.com/dpkp/kafka-python입니다. (https://github.com/confluentinc/confluent-kafka-python 도 있습니다.)
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092')
for _ in range(100):
producer.send('peter-topic', b'some_message_bytes')
- bootstrap.servers : 카프카 클러스터는 클러스터 마스터라는 개념이 없기 때문에 클러스터 내 모든 서버가 클라이언의 요청을 받을 수 있습니다. 해당 옵션은 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보를 나타냅니다.
- acks : 프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack의 수입니다.
- acks=0 : 프로듀서는 서버로부터 어떠한 ack도 기다리지 않습니다. 이 경우 서버가 데이터를 받았는지 보장하지 않고, 클라이언트는 전송 실패에 대한 결과를 알지 못하기 때문에 재요청 설정도 적용되지 않습니다. 메시지가 손실될 수 있지만, 서버로부터 ack에 대한 응답을 기다리지 않기 때문에 매우 빠르게 메시지를 보낼 수 있어 높은 처리량을 얻을 수 있습니다.
- acks=1 : 리더는 데이터를 기록하지만, 모든 팔로워는 확인하지 않습니다. 이 경우 일부 데이터의 손실이 발생할 수 있습니다.
- acks=all or -1 : 리더는 ISR의 팔로워로부터 데이터에 대한 ack를 기다립니다. 하나의 팔로워가 있는 한 데이터는 손실되지 않으며, 데이터 무손실에 대해 가장 강력하게 보장합니다.
- buffer.memory : 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트입니다.
- compression.type : 데이터를 압축해서 보낼 수 있는데, 어떤 타입으로 압출할지를 정할 수 있습니다. 옵션으로는 none, gzip, snappy, lz4 같은 다양한 포맷 중 하나를 선택할 수 있습니다.
- retries : 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수입니다.
- batch.size : 같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도합니다. 이러한 동작은 클라이언트와 서버 양쪽에 성능적인 측면에서 도움이 됩니다. 이 설정을 통해서 배치 크기 바이트를 조정합니다. (배치 사이즈를 채우기 전에 장애가 발생하면 메시지는 날아갑니다.)
- linger.ms : 배치 형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정합니다. 카프카 프로듀서는 지정된 배치 사이즈에 도달하면 이 옵션과 관계없이 즉시 메시지를 전송하고, 배치 사이즈에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을 때 메시지들을 전송합니다.(DEFAULT 0)
- max.request.size : 보낼 수 있는 메시지의 최대 바이크 사이즈입니다. (DEFAULT 1MB)
해당 글의 모든 레퍼런스는 "카프카, 데이터 플랫폼의 최강자" (고승범, 공용준 지음) 을 알립니다.
"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."