Kafka client

Log·2022년 10월 30일
0

Kafka

목록 보기
7/9
post-thumbnail

해당 문서의 경우, 책에서는 모두 java로 짜여져 있으나, 작성자의 경우 python을 주 언어로 사용하기 때문에 해당 책의 코드를 모두 python으로 변경했습니다.

> python version : python 3.8.13

Kafka client

카프카 클라이언트 라이브러리를 사용하여 카프카 클러스터에 명령을 내리거나 데이터를 송수신을 하며, 이를 이용하여 애플리케이션을 개발한다.

Kafka with Python

크게 3가지로 나뉘는데, 여기서 진행할 때에는 kafka-python을 사용하고자 한다.

  • confluent-kafka-python: 퍼포먼스가 가장 좋음. confluent의 공식 클라이언트.
  • kafka-python: pure python. confluent-python에 비해서는 속도가 느림
  • pykafka: 2018년 이후 업데이트가 잘 안 된다고 한다.

install

pip install kafka-python

Producer

카프카의 데이터 시작점은 프로듀서라고 보면 된다. 프로듀서 애플리케이션은 필요 데이터 선언 및 브로커의 특정 토픽의 파티션에 전송한다. 데이터를 전송할 떄 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신하고, 직렬화하여 카프카 브로커로 보낸다. 직렬화로 동영상, 이미지와 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.

simple-kafka-producer

Code
from kafka.producer import KafkaProducer

TOPIC_NAME = "kafka.client.tutorial" # producer는 생성한 레코드를 전송하기 위해 전송하고자 하는 토픽을 알고 있어야 한다.
BOOTSTRAP_SERVER_HOST = "kafka_tutorial:9092" # 전송하고자 하는 카프카 클러스터 서버의 host와 IP를 지정
KEY_SERIALIZER = str.encode
VALUE_SERIALIZER = str.encode


producer = KafkaProducer(
    bootstrap_servers=[BOOTSTRAP_SERVER_HOST],
    key_serializer=KEY_SERIALIZER,
    value_seriakuzer=VALUE_SERIALIZER
)
# test message without key
test_message_value = "testMessage for python with Kafka"

# send는 즉각 전송이 아닌 배치 전송이다.
producer.send(topic=TOPIC_NAME, value=test_message_value)

# flush를 통해 내부 버퍼에 가지고 있던 레코드 배치를 브로커에 전송
producer.flush()

# producer 리소스 종료
producer.close()
실행
# topic 생성
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --create --topic kafka.client.tutorial --partitions 3
## WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
## Created topic kafka.client.tutorial.

# producer 실행
python ${PYTHON_CODE_PATH}/simple-kafka-producer/simple-kafka-producer.py

# consumer에 제대로 들어왔는지 확인
bin/kafka-console-consumer.sh --bootstrap-server kafka_tutorial:9092 --topic kafka.client.tutorial --from-beginning           
## testMessage for python with Kafka

책에 있는 코드를 한 번 python으로 바꿔봄..



Consumer

producer가 전송한 데이터는 broker에 적재되며, consumer는 broker로부터 데이터를 가져와 필요한 처리를 한다.

컨슈머 운영 방법

컨슈머 그룹으로 운영
컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식
토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있으며, 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하여, 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 같거나 작아야 한다.
컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있어서, 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다.
만약, 컨슈머 그룹의 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권을 넘기는 리밸런싱이 이루어진다. 리밸런싱은 크게 2가지 상황에서 일어나는데, 컨슈머가 추가되는 상황과 제외되는 상황에서 발생한다. 리밸런싱은 유용하지만 자주 일어나서는 안되는데, 리밸런싱이 발생할 때 파티션의 소유권을 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문이다.

컨슈머 작동 방식

컨슈머는 커밋을 통해 브로커로부터 데이터를 어디까지 가져갔는지 기록한다. __consumer_offsets를 통해 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 브로커 내부에서 가용되는 내부토픽에 기록된다. 만약 __consumer_offsets토픽에 어느 레코드 까지 읽어갔는지 오프셋 커핏이 기록되지 못했다며느, 데이터 처리가 중복이 발생하기 때문에 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증해야 한다.
오프셋 커밋의 경우 명시적, 비명시적으로 수행할 수 있다. 기본 옵션은 poll()메소드가 수행될 때 일정 간경마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정 되어 있는데 이렇게 일정 간격마다 자동으로 커밋되는 것을 비명시 오프셋 커밋이라고 부른다. poll() 메서드가 auto.commit.interval.ms에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋하는 방식인데, 코드상에서 따로 커밋 관련 코드를 작성할 필요가 없어 편리하지만 호출 이후에 리벨런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 취약점이 있다. 데이터 중복이나 유실을 허용하지 않는 서비스라면 이를 사용해서는 안 된다.
명시적으로 오프셋 커밋의 경우 commitSync()메소드를 호출하면 된다. 해당 메소드의 경우, poll()메소드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다. 이는 브로커에 커밋 요청을 보내고, 정상 처리되었는지 응답까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다. 이를 위해 commitAsnc()메서드를 사용할 수 있는데, 이는 커밋 요청을 전송하고 응답이 오기 전까지 데이터 처리를 수행할 수 있다. 이러한 비동기 커밋은 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수도 있다.
컨슈머의 내부 구조를 보게 되면, 컨슈머 애플리케이션을 실행하게 되면 내부에서 Fetcher 인스턴스가 생성되어 poll()메서드를 호출하기 전에 미리 레코드들을 내부 큐로 가져오고, 메서드를 호출하면 그때 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.


책에 있는 코드를 한 번 python으로 바꿔봄..
여기서 컨슈머의 안전한 종료인 kafka-consumer-sync-offset-commit-shutdown-hook.py의 경우, 현재 내가 알고 있는 지식으로 작성했기 때문에 정확하지 않고 수정이 필요합니다.

Thread safety

  • The KafkaProducer can be used across threads without issue, unlike the KafkaConsumer which cannot.
  • While it is possible to use the KafkaConsumer in a thread-local manner, multiprocessing is recommended.

Admin API

실제 운영환경에서 프로듀서와 컨슈머를 통해 데이터를 주고받는 것 외에 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 중요하다. AdminClient를 통해 클러스터 옵션 관련된 부부을 자동화할 수 있다.

admin api를 사용할 때 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해줘야 한다.


요약

  • python을 이용하여 kafka producer, consumer, admin api를 진행하였다.
  • Producer
    • 프로듀서는 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.
    • 프로듀서 인스턴스가 send() 메서드를 호출하면 ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해지며, 따로 설정하지 않으면 DefaultPartitioner로 설정된다.
    • Accumulator에 데이터를 전송하기 전에 버퍼로 쌓아놓고 발송하며 배치로 묶어 전송함으로써 처리량을 향상시키는데 도움을 준다.
    • 압축 옵션을 통해 브로커로 전송 시 압축방식을 정할 수 있으나, 네트워크 처리량에 이득을 볼 수 있으나 CPU/메모리 리소스를 사용하므로 적절한 압축 옵션을 사용해야 한다.
  • Consumer
    • 컨슈머늘 브로커에 적재된 데이터를 가져와 처리를 한다.
    • 컨슈머 그룹을 이용하게 되면 각 컨슈머 그룹끼리 격리된 환경에서 안정적으로 운영할 수 있도록 도와주며, 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.
    • 컨슈머 그룹의 컨슈머에 장애 발생 시 리밸런싱이 일어나는데, 자주 일어날 경우 행이 걸릴 수도 있다.(리밸런싱 과정에서 컨슈머들은 토픽의 데이터를 읽을 수 없기 때문)
    • 오프셋 커밋을 이용하여 컨슈머는 어느 오프셋까지 데이터를 가져갔는 지 확인 가능하다.
    • 자동 오프셋 커밋의 경우 사용에 편리하지만, 데이터 중복/유실에 대한 가능성이 있다.(async 방식은 중복 처리 발생 가능성이 있음)
    • 컨슈머 애플리케이션은 안전하게 종료되어야 하며, 정상적 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게 된다.
  • Admin API
    • 내부 옵션 확인 및 토픽 추가 등을 할 수 있다.
    • 카프카 CLI의 경우 일회성 작업에 그치지만, 이를 이용하면 자동화가 가능하다.

참고 문서

profile
열심히 정리하는 습관 기르기..

0개의 댓글