해당 문서의 경우, 책에서는 모두 java로 짜여져 있으나, 작성자의 경우 python을 주 언어로 사용하기 때문에 해당 책의 코드를 모두 python으로 변경했습니다.
> python version : python 3.8.13
카프카 클라이언트 라이브러리를 사용하여 카프카 클러스터에 명령을 내리거나 데이터를 송수신을 하며, 이를 이용하여 애플리케이션을 개발한다.
크게 3가지로 나뉘는데, 여기서 진행할 때에는 kafka-python
을 사용하고자 한다.
pip install kafka-python
카프카의 데이터 시작점은 프로듀서라고 보면 된다. 프로듀서 애플리케이션은 필요 데이터 선언 및 브로커의 특정 토픽의 파티션에 전송한다. 데이터를 전송할 떄 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신하고, 직렬화하여 카프카 브로커로 보낸다. 직렬화로 동영상, 이미지와 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.
from kafka.producer import KafkaProducer
TOPIC_NAME = "kafka.client.tutorial" # producer는 생성한 레코드를 전송하기 위해 전송하고자 하는 토픽을 알고 있어야 한다.
BOOTSTRAP_SERVER_HOST = "kafka_tutorial:9092" # 전송하고자 하는 카프카 클러스터 서버의 host와 IP를 지정
KEY_SERIALIZER = str.encode
VALUE_SERIALIZER = str.encode
producer = KafkaProducer(
bootstrap_servers=[BOOTSTRAP_SERVER_HOST],
key_serializer=KEY_SERIALIZER,
value_seriakuzer=VALUE_SERIALIZER
)
# test message without key
test_message_value = "testMessage for python with Kafka"
# send는 즉각 전송이 아닌 배치 전송이다.
producer.send(topic=TOPIC_NAME, value=test_message_value)
# flush를 통해 내부 버퍼에 가지고 있던 레코드 배치를 브로커에 전송
producer.flush()
# producer 리소스 종료
producer.close()
# topic 생성
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --create --topic kafka.client.tutorial --partitions 3
## WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
## Created topic kafka.client.tutorial.
# producer 실행
python ${PYTHON_CODE_PATH}/simple-kafka-producer/simple-kafka-producer.py
# consumer에 제대로 들어왔는지 확인
bin/kafka-console-consumer.sh --bootstrap-server kafka_tutorial:9092 --topic kafka.client.tutorial --from-beginning
## testMessage for python with Kafka
책에 있는 코드를 한 번 python으로 바꿔봄..
producer가 전송한 데이터는 broker에 적재되며, consumer는 broker로부터 데이터를 가져와 필요한 처리를 한다.
컨슈머 그룹으로 운영
컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식
토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있으며, 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하여, 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 같거나 작아야 한다.
컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있어서, 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다.
만약, 컨슈머 그룹의 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권을 넘기는 리밸런싱이 이루어진다. 리밸런싱은 크게 2가지 상황에서 일어나는데, 컨슈머가 추가되는 상황과 제외되는 상황에서 발생한다. 리밸런싱은 유용하지만 자주 일어나서는 안되는데, 리밸런싱이 발생할 때 파티션의 소유권을 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문이다.
컨슈머는 커밋을 통해 브로커로부터 데이터를 어디까지 가져갔는지 기록한다. __consumer_offsets
를 통해 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 브로커 내부에서 가용되는 내부토픽에 기록된다. 만약 __consumer_offsets
토픽에 어느 레코드 까지 읽어갔는지 오프셋 커핏이 기록되지 못했다며느, 데이터 처리가 중복이 발생하기 때문에 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증해야 한다.
오프셋 커밋의 경우 명시적, 비명시적으로 수행할 수 있다. 기본 옵션은 poll()
메소드가 수행될 때 일정 간경마다 오프셋을 커밋하도록 enable.auto.commit=true
로 설정 되어 있는데 이렇게 일정 간격마다 자동으로 커밋되는 것을 비명시 오프셋 커밋이라고 부른다. poll()
메서드가 auto.commit.interval.ms
에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋하는 방식인데, 코드상에서 따로 커밋 관련 코드를 작성할 필요가 없어 편리하지만 호출 이후에 리벨런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 취약점이 있다. 데이터 중복이나 유실을 허용하지 않는 서비스라면 이를 사용해서는 안 된다.
명시적으로 오프셋 커밋의 경우 commitSync()
메소드를 호출하면 된다. 해당 메소드의 경우, poll()
메소드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다. 이는 브로커에 커밋 요청을 보내고, 정상 처리되었는지 응답까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다. 이를 위해 commitAsnc()
메서드를 사용할 수 있는데, 이는 커밋 요청을 전송하고 응답이 오기 전까지 데이터 처리를 수행할 수 있다. 이러한 비동기 커밋은 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수도 있다.
컨슈머의 내부 구조를 보게 되면, 컨슈머 애플리케이션을 실행하게 되면 내부에서 Fetcher 인스턴스가 생성되어 poll()
메서드를 호출하기 전에 미리 레코드들을 내부 큐로 가져오고, 메서드를 호출하면 그때 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.
책에 있는 코드를 한 번 python으로 바꿔봄..
여기서 컨슈머의 안전한 종료인 kafka-consumer-sync-offset-commit-shutdown-hook.py
의 경우, 현재 내가 알고 있는 지식으로 작성했기 때문에 정확하지 않고 수정이 필요합니다.
Thread safety
- The KafkaProducer can be used across threads without issue, unlike the KafkaConsumer which cannot.
- While it is possible to use the KafkaConsumer in a thread-local manner, multiprocessing is recommended.
실제 운영환경에서 프로듀서와 컨슈머를 통해 데이터를 주고받는 것 외에 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 중요하다. AdminClient를 통해 클러스터 옵션 관련된 부부을 자동화할 수 있다.
admin api를 사용할 때 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해줘야 한다.