이번 글에서는 저번 글에 이어서, KafkaConsumer의 JavaDoc을 읽고, 해석하고 요약하고, 다른 지식들을 추가해서 정리해보겟다.
카프카 클라이언트는 컨슈머 그룹을 사용해서 consume을 로드 밸런싱 할 수 있도록, 브로커와 상호 작용한다.
컨슈머는 데이터를 가져오기(fetch data) 위해서 필요한 브로커들과 TCP연결을 유지한다. 컨슈머를 close 하지 못했을 경우, 이 연결에 대한 leak이 발생한다.
이 컨슈머(KafkaConsumer)는 Thread-Safe하지 않다.
이 부분은 버전마다 다르므로 직접 확인을 하는게 좋을것 같다.
중요한 것은 브로커 버전에 맞지 않는 API를 호출하면, UnsupportedVersionException
이 발생한다는 것이다.
중요한 내용이다.
Kafka는 파티션의 각 레코드에 대한 숫자 오프셋을 유지한다.(index, id은 개념인듯 하다.) 이 오프셋은 해당 파티션 내의 레코드에 대한 고유 식별자로 작용한하며, 파티션에서의 소비자의 위치를 표시하는데 사용된다.
실제로 두개의 위치 개념이 존재합니다.
컨슈머의 Position
는 다음에 받게될 오프셋을 반환합니다. 이것은 컨슈머가 그 파티션에서 받은(has been seen) 레코드의 중 가장 높은 오프셋보다 하나 많은 값입니다.
이것은 poll()
메서드를 호출해서 컨슈머가 메시지를 받을 때마다 자동적으로 갱신됩니다.(advanced)
이 글의 답변 에 의하면, 이 Position의 경우, 브로커가 아닌 컨슈머에 의해 결정된다고 한다.
즉 커밋을 진행하지 않아도, 리밸런싱이나 failure가 발생하지 않으면 계속해서 다음 Position을 추적할 수 있다.
이는 카프카 컨슈머에 관련되어서 설명을 적어놓은 한국 블로그들에서 혼동해서 많이 적어놓았다.(그래서 굉장히 혼란스러웠다.) 커밋을 한 포지션에 의해 다음 consume할 record가 정해지는 것이 아니라, 이 Position이라는 개념은 컨슈머가 관리한다.
이 개념은 마지막으로 안전하게 저장된 오프셋을 의미한다.(아마 브로커에 내부 토픽으로서 저장된다는 말인것 같다.)
프로세스가 실패하고 다시 시작하면, 다시 복구할 오프셋을 의미한다.
중요한 점은 committed Offset의 경우 컨슈머가 다음번에 읽어야 할 오프셋 넘버를 제공해야한다는 것이다. 아마 이는 클라이언트에서 관리하는 position과 비슷한 개념인것 같다. 이것은 뒷쪽 예제에서 더 나올 예정이다.
아무런 설정을 하지 않으면 auto-commit 전략이 선택되며, 일정 주기마다 컨슈머는 자동적으로 커밋을 진행한다.
auto-commit을 하지 않도록 설정하고, 커밋을 컨트롤 할 수도 있다. commitSync
나 commitAsync
같은 API를 통해 커밋을 진행할 수 있다.
이렇게 개념을 두개로 나눠 놓음으로서, 컨슈머는 어디까지가 소비된 레코드로 취급될 것인지 제어할 수 있습니다. 이와 관련된 개념은 아래에서 더 자세히 다룬다.
아래는 공식 JavaDoc에 다른 섹션에 나와있는 예시 코드이다. 하지만 committed position이 뭔지 확실하게 이해시켜준다
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
자동 커밋은 "최소 1회 배달"(at-least once delevery)을 제공할 수 있다.(그니까 여러번 전달 될 수도 있다는 말)
하지만 다음번 poll 호출이나 close가 되기 전에 받은 데이터를 다 소비된 상태여야합니다. (poll호출시 시간 경과를 체크하게 되고, 시간이 넘었을시 커밋을 진행하기 때문)
만약 소비되지 않은 상태에서 커밋이 이루어지고, 추후 레코드 처리가 실패했을 경우, 레코드가 무시되는(처리되지 않는)무시무시한 결과로 이어지게 됩니다.
카프카는 Consumer Group 이라는 개념을 가지고 있다. 이를 통해 작업을 공유해서 소비하고 처리할 프로세스 풀을 가질 수 있다.
이 프로세스(컨슈머) 들은 한개의 machine안에 존재할 수 있고, 수평적 확장(scalability)와 fault tolerance을 위해 많은 machine으로 분산되어 있을 수도 있다.
같은 group.id
를 사용하는 컨슈머 인스턴스는 컨슈머 그룹의 일부가 됩니다.
각각의 Consumer 들은 subscribe
API를 통해 동적으로 토픽들의 리스트를 설정할 수 있습니다.(dynamically set the list of topics)
카프카는 구독된 토픽의 메시지를 컨슈머 그룹 내의 하나의 프로세스에게 전달합니다.
이 말은 한개의 파티션은 컨슈머 그룹당 정확히 하나의 컨슈머에게 배정이 된다는 것을 의미합니다. 반대로 파티션 여러개가 하나의 컨슈머에게 배정되는 것은 가능합니다.
예를 들어서 4개의 파티션이 있고, 두개의 컨슈머 그룹이 각각 두개의 컨슈머를 가지고 있다면, 각 컨슈머는 두개의 파티션 씩을 배정받게 됩니다.
쉽게 말해서, 컨슈머 그룹단위로 배정이 관리되며, 파티션은 무조건 하나의 담당 컨슈머가 있다는 것을 의미합니다. 한개의 파티션을 두개의 컨슈머에서 소비하는 경우는 없습니다.
멤버쉽보다는 배정이라는 표현이 더 적절한 것같아서 그렇게 표현했습니다.
컨슈머 그룹내의 파티션 배정은 동적으로 관리됩니다.
자동적으로 리밸런싱이 일어나면, 컨슈머는 ConsumerRebalanceListener를 통해 알림을 보내며, 이는 알림을 받은 컨슈머 측이 필요한 애플리케이션 레벨 로직을 처리할 수 있도록 합니다.
대표적으로 상태 초기화, 수동 커밋, Offset 외부 저장 등의 작업들이 있습니다.
이와 관련된 자세한 설명은 저의 이전 포스트에 매우 자세히 나와있습니다.
assign
메서드를 활용하면, 수동적으로 특정 파티션을 지정하여 배정할 수도 있습니다. (좀 더 전통적인 컨슈머 처럼)
이 경우 동적 파티션 할당이나 컨슈머 그룹 재조정 같은 기능들이 비활성화 됩니다.
poll()
API를 호출하면 컨슈머는 자동적으로 컨슈머 그룹에 가입됩니다.poll()
API를 지속적으로 호출하는 한, 소비자는 그룹에 남아서 계속해서 메시지를 수신합니다.컨슈머가 아무런 작업을 하지 않는데 heartbeat만 보낼 경우, livelock 상황이 벌어질 수 있습니다.
이를 방지하기 위해서는 max.poll.interval.ms
을 설정하여야 합니다. 여기에 설정된 시간 이상보다 오래 poll이 호출되지 않으면, 클라이언트는 스스로 그룹을 떠나서 다른 컨슈머가 파티션의 레코드를 처리할 수 있도록 합니다.
이러한 경우 offset commit failure를 마주치게 됩니다. (commitSync()시 CommitFailedException
) 이러한 방법은 오직 활성화된 멤버만, 오프셋을 기록할 수 있도록 보장할 수 있습니다.
따라서 그룹 안에 계속해서 남아있기 위해서는 지속적으로 poll을 호출해야합니다.
max.poll.interval.ms
: 이 값을 증가시킴으로서, 컨슈머가 poll을 통해 가져온 배치를 처리하기 위해 허용된 시간을 늘릴 수있습니다.
대신 단점은 리밸런싱에 걸리는 시간을 지연시킬 수 있습니다. 왜냐하면, 컨슈머는 poll()을 호출할 때에만 리밸런스에 참여하기 때문입니다.
(즉 이 시간을 늘리면, poll()을 더 늦게 호출하는 것이 허용되므로, 리밸런싱이 완료되기 까지 지연을 유발한다는 것 같다. 나의 생각으로는 리밸런싱이 지연될 수록, 배치는 더 쌓이므로 악순환이 발생할 것 같다.)
max.poll.records
: 이 설정을 사용해서 poll()당 가져올 레코드의 갯수를 제한 시킬 수 있다. 이 설정을 통해 방금 말했던 문제를 해결할 수 있을 것 같다. 한번 poll()당 가져올 갯수를 평균 수행 시간으로 계산해서 poll interval을 결정해야할 것 같다.
아직은 모르지만, 이렇게 두 값을 고정시켜놓은 다음 scale out 에 관련된 부분은 브로커측에서 잉여량을 측정해서 scale out을 하는 방식으로 진행해야할 것 같다.
공식 문서에서는 사용 케이스에 대한 다른 경우도 제시하고 있는데, 만약 나의 생각처럼 처리시간이 고정되지 않고 굉장히 유동적인 경우 이러한 옵션들이 충분하지 않을 수도 있다고 말을 하고 있다. 이러한 사례를 처리하기 위해서는 메시지 처리를 다른 스레드로 옮기는 방법을 제안하고 있다.
하지만 이런 방법을 고려할 때는 committed offset 이 실제 처리된 offset을 넘어가지 않도록 주의가 필요하다고 한다. 이런 경우 자동 커밋을 비활성화하고, 처리가 끝난 경우에 대해서 수동으로 커밋을 하는 것을 제안하고 있다.
또한 이전에 받은 레코드들이 쓰레드에서 처리될 때까지 pause
메서드를 호출해서 poll
호출시 더이상 레코드를 받아오지 않도록 하는 처리도 필요하다고 말을 하고 있다.
Java Doc 에 보게되면, 좀더 세밀하게 커밋을 하는 법을 알려주고 있다.
그냥 commitSync만 호출하게되면, 자신에게 할당된 모든 파티션에 대해 offset commit을 시도한다.
때로는 이렇게 커밋을 하기 보다는, 각각의 파티션에 대해 세밀하게 커밋을 시도 하고 싶을 수 있다. 이 예시에서는 그 방법을 대략적으로 보여준다.
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
이 코드에서 주의깊게 봐야할 점은 다음과 같다.
List partitionRecords
에 담겨오는 순서는 앞쪽이 먼저 발생한 오프셋이다. 따라서 맨 마지막 인덱스로 조회하면, 가장 높은 오프셋을 얻을 수 있다.ConsumerRecords<String, String> records
에 대해 records.partitions()
를 호출해서 현재 가져온 '파티션과 토픽'(TopicPartition)들 을 가져올 수 있다.records
에 대해 records(TopicPartition partition)
메서드를 호출해서 해당 기록들을 뽑아올 수 있다.문서에서는 수동 커밋에 관련된 정보를 제공하고 있다.
이전에 자주 사용하던 subscribe
api 를 활용해서 구독을 하게 되면, 카프카가 동적으로 토픽에 대한 파티션을 해당 컨슈머 그룹의 활성화된 컨슈머 들에게 나누어 주게된다.
하지만 특정 상황에서는 카프카가 consumer fail을 감지하고 동적으로 관리할 필요가 없을 수도 있다.
이러한 경우 수동으로 파티션을 지정해서 할당할 수 있으며, 이때는 subscribe
대신 assign
을 통해서 구독하면 된다.
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
이렇게 되면 poll을 호출할 수 있으며, 파티션의 할당은 오직 assign
의 호출에 의해서만 이루어집니다.(자동으로 할당/재할당 되지 않음)
이렇게 되면 각각의 컨슈머들은 컨슈머 그룹 아이디를 공유하고 있더라도 독립적으로 동작합니다. 커밋 과정에서 충돌을 방지하려면, 한 컨슈머 그룹안에서 같은 파티션에 할당되지 않도록 구현해야합니다. 또한 같은 파티션에 대해서는 모든 컨슈머가 유니크한 그룹 아이디를 사용하도록 관리해야합니다.
수동 배정과 자동 배정은 함께 사용할 수 없습니다
컨슈머 어플리케이션은 반드시 카프카의 빌트인 오프셋 스토리지를 사용할 필요는 없으며, 선택한 오프셋 스토리지를 사용할 수 있습니다.
대표적인 use case 는 어플리 케이션이 오프셋과 그 처리결과를 원자적으로 같이 저장되도록 하는 것입니다.
이것이 항상 가능한 것은 아니지만, 이것은 소비를 완전히 원자적으로 처리할 수 있고, "exactly once"을 보장할 수 있습니다. (기본 카프카의 커밋 기능은 "at-least once"를 보장합니다.)
다음과 같은 사용처가 있을 수 있습니다
다음과 같은 설정을 하여 사용할 수 있습니다.
ConsumerRecord
가 제공하는 오프셋을 활용해서 오프셋 저장하기seek(TopicPartition, long)
메서드를 활용해서 컨슈머의 position을 복원할 수 있습니다.보통 ConsumerRebalanceListener 를 통해서 revoke될 때 offset을 기록하고, assigned가 호출될때, 그 할당된 파티션에 대해 offset를 찾아서 로드하는 방식으로 많이 구현한다.
카프카는 컨슈머로 하여금 자신의 position을 조정하는 것을 허용한다. 따라서 같은 파티션 내에서 이전의 오프셋 레코드나 앞의 오프셋 레코드를 조회하는 것은 가능하다. 따라서 오래된 레코드를 다시 소비하거나, 가장 최근의 레코드 부터 소비하는 것 또한 가능하다.
이 외에도 컨슈머의 포지션을 제어함으로써 얻을 수 있는 유용한 점은 많이 있다.
이렇게 클라이언트의 position을 조정하기 위해 Kafka client는 seek(TopicPartion, long)
메서드를 제공한다.
또 seekToBeginning(Collection)
하고 seekToEnd(Collection)
을 통해 서버가 유지하고 있는 최신 또는 가장 오래된 오프셋으로 바꿀 수도 있다.
만약 한개의 컨슈머에 여러개의 파티션이 할당된 경우, 특정 파티션에서 최대속도로 받아오고 나머지 파티션은 후순위로 받아오고 싶을 수도 있다.
예를 들어, 다음과 같은 스트림 처리 과정이 있다고 해보자. 프로세서는 두개의 주제에서 스트림을 가져온 후 조인을 수행하는 과정이다. 하나의 주제가 다른 주제보다 한참 뒤쳐질때 프로세서는 앞서간 주제에 대해서 일시 중지 기능이 필요할 수 있다.
또 다른 예시는 컨슈머가 시작될 때 전체 데이터에 대한 부트스트래핑을 하는 경우이다. 만약 가져와야할 히스토리가 많은 경우, 다른 토픽을 가져오기 전에 애플리케이션은 일단 최신 데이터를 먼저 가져오고 싶을 수 있다.
이런 경우 카프카 컨슈머는, pause(Collection)
과 resume(Collection)
을 통해 일시정지와 재개를 제어할 수 있도록 해준다. 이렇게 되면 미래 poll 호출에 대해서 제어가 적용되게 된다.
Kafka Consumer는 Thread Safe하지 않다. 모든 네트워크 IO의 경우 호출한 쓰레드에서 작동하게 된다. 멀티 쓰레드에서 접근하는 것은 적절히 동기화 처리가 되어야한다.(Synchronized)
Un Synchronized 한 접근에 대해서는 ConcurrentModificationException
이 발생하게 된다.
단 한가지 유일한 예외는 wakeup()
메서드 이다. 유일하게 외부쓰레드에서 호출하여 오퍼레이터에 인터럽트를 거는 용도로 사용할 수 있다.
이 경우 블로킹 오퍼레이션에서 WakeupException
이 발생한다. 외부쓰레드에서 컨슈머를 종료시키는 용도로 사용할 수 있다.
이러한 스레드 특성 때문에 문서에서는 다음과 같은 옵션을 제시한다
글쎄 나는 개인적으로 별로인거 같다.
장점은 구현이 쉽다는 것이다. 그리고 파티션 별로 순차처리하는 절차를 쉽게 구현할 수 있다는 것이다.(동기방식으로 처리되므로)
단점은 쓰레드 수가 파티션 수에 제한된다는 것이다. 그리고 또한 소비자당 TCP연결이 생긴다. 일괄처리가 줄어들어서 처리량이 줄어들 수 있다고 한다.
모든 소비를 한쪽에서 수행하고, 실제로 처리하는 로직을 쓰레드풀로 빼는 방식이다.
장점은 소비자와 프로세서 수를 독립적으로 확장할 수 있다는 점이다. 파티션 수에 따른 제한에 걸리지 않는다.
단점은 프로세서간 순서 보장이 힘들다. 스레드의 스케쥴링에 의해 이전데이터 청크가 이후 데이터 청크 후에 처리될 수도 있다.
또 수동 커밋이 더 어려워진다.
글쎄 내가 생각했을 때 이런 방식은 어떤가 싶다.
기본적으로는 소비와 처리를 분리하고, 뒷단은 쓰레드 풀이 아니라, webflux처럼 reactive programming을 활용해서 처리하는 것이다. 애초에 이벤트에 반응해서 무언가가 처리되는 방식이니까, 이렇게 하면 구지 쓰레드를 잡고 있지 않아도, 이벤트 양에 유동적으로 대처할 수 있지않을까 싶다.
글 재미있게 봤습니다.