스프링 카프카 컨슈머에서 오프셋을 커밋하기 전에 SIGTERM
, SIGINT
와 같은 종료 요청이 오면 어떻게 동작하는지 문득 궁금증이 생겨서 테스트를 진행해 보았다.
종료전에 처리한 오프셋을 커밋해서 graceful하게 shutdown 되는 기능을 제공할까? 결론부터 말하자면 graceful shutdown을 지원한다. 이에 대한 옵션은 spring.kafka.listener.immediate-stop
과 관련이 있다.
spring.kafka.consumer.max-poll-records=5
spring.kafka.listener.immediate-stop=false # 기본값
2023-04-13T19:05:53.248+09:00 INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-2
2023-04-13T19:05:58.255+09:00 INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-7
^C (SIGINT 호출)
2023-04-13T19:06:03.259+09:00 INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-8
2023-04-13T19:06:08.265+09:00 INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-9
2023-04-13T19:06:13.270+09:00 INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-11
2023-04-13T19:06:13.340+09:00 INFO 36204 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Revoke previously assigned partitions test-topic-0
SIGINT를 호출하면 실행중인 컨슈머는 바로 종료되지 않고, poll()로 가져온 모든 레코드를 처리하려한다. 모든 레코드를 처리하고 나서야 오프셋을 커밋후 종료한다.
파티션별로 5개의 레코드를 처리해 위와 같이 Current Offset이 5인 것을 확인할 수 있다.
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.immediate-stop=false # 기본값
2023-04-13T18:34:27.169+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-16
^C (SIGINT 호출)
2023-04-13T18:34:32.173+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-33
2023-04-13T18:34:37.178+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-39
2023-04-13T18:34:42.183+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-46
2023-04-13T18:34:47.189+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-47
2023-04-13T18:34:52.189+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-48
2023-04-13T18:34:57.194+09:00 INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer : message=test-49
2023-04-13T18:34:57.772+09:00 INFO 34367 --- [ionShutdownHook] o.s.c.support.DefaultLifecycleProcessor : Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]
SIGINT를 호출하면 실행중인 컨슈머는 바로 종료되지 않고, poll()로 가져온 모든 레코드를 처리하려한다.
하지만 SpringApplicationShutdownHook 스레드에서 Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms
와 같이 timeout으로 인한 실패 로그를 확인할 수 있다.
기본 shutdown timeout값인 30초를 넘으면 오프셋 커밋에 실패한다는 것을 알 수 있다.
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.immediate-stop=true
2023-04-13T19:23:36.957+09:00 INFO 37318 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-2
^C (SIGINT 호출)
2023-04-13T19:23:41.963+09:00 INFO 37318 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer : message=test-7
2023-04-13T19:23:41.994+09:00 INFO 37318 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Revoke previously assigned partitions test-topic-0
immediate-stop 옵션을 true로 설정하는 경우, 현재 레코드가 처리된 이후에 종료 처리를 진행한다. 위의 예시는 2번째 레코드 처리 도중 SIGINT 요청이 발생했고 2번째 레코드가 완전히 처리된 이후 종료된 것을 확인할 수 있다.
0번 파티션에서 2개의 레코드를 처리해 현재 Current Offset이 2인 것을 확인할 수 있다.
카프카 컨슈머에서는 어떻게 graceful shutdown을 처리하는 걸까? 답은 shutdown hook과 KafkaConsumer.wakeup()을 활용함에 있다. 자바의 경우 shutdown hook을 구현하여 안전한 종료를 명시적으로 구현할 수 있는데 이때 컨슈머의 wakeup()을 호출하면, 다음 poll() 메소드가 호출될 때 WakeupException 예외가 발생한다. 예외가 발생하면 catch 문에서 WakeupException을 받아 컨슈머 종료전에 offset을 커밋처리하도록 구현하면 된다.
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdonwThread());
}
static class ShutdownThread extends Thread {
public void run() {
consumer.wakeup();
}
}
url : https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
new HashMap<>();
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}
레코드를 처리할 때마다 currentOffsets을 갱신하고, WakeupException이 발생하여 컨슈머가 중단되면 currentOffsets을 최종 커밋하여 안전하게 종료처리할 수 있다.
KafkaMessageListenerContainer.java
@Override
protected void doStart() {
...
}
@Override
protected void doStop(final Runnable callback, boolean normal) {
if (isRunning()) {
this.listenerConsumerFuture.whenComplete(new StopCallback(callback));
setRunning(false);
this.listenerConsumer.wakeIfNecessaryForStop();
setStoppedNormally(normal);
}
}
void wakeIfNecessaryForStop() {
if (this.polling.getAndSet(false)) {
this.consumer.wakeup();
}
}
스프링 카프카 KafkaMessageListenerContainer 클래스의 경우에도 doStop() 메소드가 호출되는 경우 내부에서 consumer.wakeup()이 호출되는 것을 확인할 수 있다.
https://github.com/yellowsunn/learning-projects/tree/main/kafka-graceful-shutdown
참고자료
https://dgle.dev/kafkashutodwn
https://blog.voidmainvoid.net/264
https://jessyt.tistory.com/151