아파치 카프카 2

justindevcode·2024년 5월 7일

카프카

목록 보기
1/3
post-thumbnail

아파치 카프카 2

지난 시간과 다르게 좀더 새부적인 내용을 알아보자

Kafka Cluster

카프카는 Kafka Cluster안에 Broker가 여러개 존재한다고 생각하면된다. 이는 데이터 유실방지 때문인데 Broker가 하나의 카프카 서버인데 이를 여러개 띄워 서로의 복사본을 가지고 있는 하나의 Kafka Cluster가 된다.

1

클러스터 내에서 파티션 각각이 서버에 분산된 형태로 나누어 처리될 수 있다고 표현한 이유는 위 그림이 담고 있다. 기본적으로 토픽 내의 파티션이 추가 생성되면 카프카는 등록된 브로커 서버에 분산된 형태로 새 파티션을 배치(?) 시켜주는 개념이다.
1개 파티션은 리더와 팔로워가 존재한다. 카프카에서는 파티션마다 replica 를 설정해서 이를 실현하며, 사실 이 특징은 브로커 노드의 장애나 fail 에 대비하는 것에 목표를 두었다.
replication factor 가 3이라면, 클러스터 내에 최소 3대의 브로커 서버 등록이 보장되어야 하며 1개의 리더 노드와 2개의 팔로워 노드로 구성될 수 있다.

replication factor 가 2 이상으로 설정된 파티션은 반드시 팔로워 노드가 존재하는 데, 리더 노드는 클라이언트(Producer, Consumer) 를 통한 데이터 write (쓰기 연산) / read (읽기 연산) 를, 팔로워 노드는 데이터 복제만을 담당한다.
즉, 리더 노드에 발행된 데이터를 팔로워 노드가 복제만 해가는 구조이다. 이걸 부하 분산의 관점에서 다시 바라보면, 결국 각 파티션의 리더가 클러스터 내의 브로커들에 균등하게 분배되도록 설계되었다는걸 알 수 있다.

여기서 설정하게 되는 replication factor 는 파티션 각각에 대해 카프카 클러스터 내의 모든 브로커에 동일한 값으로 설정되어야 하는 값이다. 이 값으로 클러스터 내의 몇 개 브로커에 데이터를 저장 / 복제해둘지를 결정한다.

토픽 내의 파티션은 이렇게 리더 - 팔로워 구조를 유지하고 있다가, 리더 노드로 등록된 브로커가 죽었을 때 팔로워들 중 하나를 다시 리더로 선출하여 데이터의 유실을 방지하고 복구를 진행할 수 있도록 한다. 따라서 N+1 의 replication factor 를 가진 TopicPartition 이라면 N 번의 장애까지 견딜 수 있다. 개인적으로는 카프카가 가용성을 확보하기 위한 수단 중 하나라고 생각한다.

Producer

Producer 는 카프카 클러스터에서 호스팅하는 토픽에 데이터 입력을 요청할 수 있는 클라이언트이다. 정확히는, 토픽 파티션의 리더 노드에게 쓰기 연산에 대한 요청을 보내서 지정된 토픽 파티션에 메시지를 발행하는 주체이다.
이 때 메시지에는 header (String key, byte[] value), key, value 가 포함된다. 만약 key 를 별도로 지정하지 않는다면 카프카는 라운드로빈 방식으로 파티션에 메시지를 분배하여 발행하게 된다.

Producer config(설정할 수 있는값들) 중 주요한것

  • acks (default = 1)
    개인적으로는 카프카에서 QoS 를 보장하는 수단이 될 수 있는 옵션이라 생각한다.
    Producer 가 토픽 파티션의 리더 노드에게 메시지 발행을 요청한 후, 해당 요청을 완료하는 것에 대한 기준이 되는 설정이다.
    acks 의 값이 클수록 퍼포먼스는 느려지고 안정성은 증가할 수 있다.
    acks = 0 이면 프로듀서가 어떤 acks 응답도 기다리지 않는다. 즉, 카프카 서버가 데이터를 받았는지를 보장하지 않았으므로 전송 (요청) 실패에 따른 재요청도 없다.
    acks = 1 이면 리더 노드가 메시지 발행 요청을 받은 건 확인하지만, 팔로워 노드가 그걸 복제해갔는지에 대해선 확인하지 않는다. 속도와 안정성 측면에서 가장 많이 쓰인다고 한다.
    acks = all (-1) 이면 리더 노드 뿐만 아니라 모든 팔로워 노드들의 응답을 기다리므로 데이터가 손실되지 않는다.
  • max.in.flight.requests.per.connection (default = 5)
    공식문서 정의 : The maximum number of unacknowledged requests the client will send on a single connection before blocking.
    프로듀서가 입력하는 데이터의 최소 단위는 batch.size 에 지정한 값이 되는데, 이 옵션으로는 발행될 이벤트에 대한 batch 단위의 순서를 보장할 수 있다. (물론 1개 파티션 내에서의 이야기가 되겠다.)
    예를 들어, 2개의 batch 레코드 셋이 1개 파티션으로 발행 요청된 상황이 있다고 가정해보자. 이 상황에서 첫번째에 발행된 레코드 셋은 요청에 실패하여 retry 를 시도하고 있지만, 두번째 발행된 레코드 셋은 곧바로 성공하였다.
    주어진 상황에서 max.in.flight.requests.per.connection 이 1 보다 큰 값으로 설정되어 있다면, 두번째 batch 레코드셋이 먼저 발행에 성공할 수 있게 된다.
    * max.in.flight.requests.per.connection 을 1 로 설정한다면, 두번째 레코드셋은 첫번째에서 retry 를 성공할 때까지 발행 요청을 진행하지 않으므로, 파티션 내에서 프로듀서가 이벤트 발행을 요청한 순서를 보장한다. 다만 개인적으로는, 이렇게 되면 flexible 한 가용성을 충족시켜주진 못할 것 같다.

Consumer

1
Consumer 를 설명할 때에 빠질 수 없는 건 consumer group 이다. 1개 토픽에 대해서 여러개 컨슈머 그룹이 각각 다른 목적으로 존재할 수 있으며, 1개 토픽에 발행된 데이터는 그 발행 횟수에 관계 없이 여러 컨슈머 그룹이 각자의 목적에 맞게 처리하기 위해 여러번 읽어갈 수 있다.

덧붙이자면, offset 을 지정한다면 1개 컨슈머 그룹이 같은 토픽 내의 데이터를 n 번 읽어갈 수 있으며 이 또한 발행 횟수와는 관계 없다.

이는 카프카가 가진 스토리지 특성과도 관련이 있다. 카프카는 컨슈머에 의해 처리된 메시지를 곧바로 삭제하지 않고 그대로 저장했다가 수명이 지나면 삭제처리를 하는 시스템이다. 따라서 메시지 처리 도중에 문제가 생겼거나, 로직에 변경이 생겼을 경우 컨슈머가 처음부터 데이터를 다시 처리할 수도 있다.

다시 offset 과 컨슈머 그룹에 대한 이야기로 돌아와보겠다. 각 컨슈머 그룹은 토픽 파티션에 대한 offset 정보를 관리하는데, 컨슈머 클라이언트 각각은 config 옵션 중 auto.offset.reset 을 통해 해당 offset 정보를 참조하여 데이터를 읽고 처리할 수 있다. 옵션 값으로 none 이 지정되면 컨슈머 그룹이 저장하고 있는 offset 정보를 읽어와서 해당 offset 부터 데이터를 읽어올 수 있게 된다.

Consumer config(설정할 수 있는값들) 중 주요한것

  • max.poll.interval.ms
  • 컨슈머는 실제 데이터 polling 을 진행하고 있지 않을 때에도 컨슈머 그룹 멤버로서 지속적으로 존재하기 위해 카프카 클러스터 (coordinator) 에게 heartbeat request 를 전송한다.
  • 하지만 컨슈머 클라이언트가 클러스터에 heartbeat 는 보내지만 실제 데이터는 읽어가지 않는 상황이 길어질 경우, 파티션이 무한정으로 점유될 수 있다.
  • 이 같은 상황을 방지하기 위해 이 설정을 추가하여, 해당 값 (시간) 이 만료될 때 까지 message polling 을 시도하지 않는다면 컨슈머 그룹 구성에 변화가 생겼음 (해당 컨슈머가 장애라고 판단하고 그룹에서 제외시킬 수 있다) 을 인지할 수 있도록 한다.

레코드 리스너(MessageListener): 단 1개의 레코드를 처리합니다. (스프링 카프카 컨슈머의 기본 리스너 타입)
배치 리스너(BatchMessageListener): 한 번에 여러 개 레코드들을 처리합니다.

  • max.poll.size (배치 리스너 일때?)
    컨슈머 클라이언트가 특정 토픽을 구독한 뒤부터 poll 작업이 이루어질 수 있는데, 이 때 poll() 한 번에 따라 가져올 수 있는 최대 레코드 수를 지정한다.
    이 때 카프카를 사용하는 애플리케이션에서는 poll 메소드를 호출해서 레코드를 가져오고, 특정 스레드에서 그걸 전부 처리해준 뒤 다시 poll 메소드를 호출해 새로운 레코드를 가져오게 된다.
    * 이 속성의 기본값은 500이므로, 기본 값을 사용하는 경우 poll 메소드로 한번에 최대 500개 레코드까지 가져올 수 있다.

max.poll.size 속성을 통해 컨슈머의 poll 메소드 호출 간격을 결정할 수 있다. 특히 이 호출 간격이 길어진다는 것은 리벨런싱과 직접적으로 연관이 되기 때문에 배포되는 서비스의 경우 중요성을 인지할 필요가 있다.

참조

https://www.geuni.tech/ko/kafka/kafka_introduce_install_cluster/
https://velog.io/@hyeondev/Apache-Kafka-%EC%9D%98-%EA%B8%B0%EB%B3%B8-%EC%95%84%ED%82%A4%ED%85%8D%EC%B3%90
https://zeroco.tistory.com/105
https://hanseom.tistory.com/174#recentComments
https://devoong2.tistory.com/entry/Kafka-%EC%BB%A8%EC%8A%88%EB%A8%B8%EC%9D%98-Poll-%EB%8F%99%EC%9E%91%EA%B3%BC%EC%A0%95-%EB%B0%8F-maxpollrecords-%EC%97%90-%EB%8C%80%ED%95%9C-%EC%98%A4%ED%95%B4
https://devlog-wjdrbs96.tistory.com/442

profile
("Hello World!");

0개의 댓글