스프링 카프카 컨슈머는 graceful shutdown을 지원하는가?

yellowsunn·2023년 4월 13일
0
post-thumbnail

스프링 카프카 컨슈머에서 오프셋을 커밋하기 전에 SIGTERM, SIGINT 와 같은 종료 요청이 오면 어떻게 동작하는지 문득 궁금증이 생겨서 테스트를 진행해 보았다.

종료전에 처리한 오프셋을 커밋해서 graceful하게 shutdown 되는 기능을 제공할까? 결론부터 말하자면 graceful shutdown을 지원한다. 이에 대한 옵션은 spring.kafka.listener.immediate-stop 과 관련이 있다.

공통 조건

  • record 하나를 처리하는데 5초가 걸린다고 가정
  • poll() 한 레코드를 모두 처리하면 오프셋 커밋 (AckMode: Batch)

테스트 1 (immediate-stop=false, max.poll.size=5)

spring.kafka.consumer.max-poll-records=5
spring.kafka.listener.immediate-stop=false # 기본값

SIGINT 호출 결과

2023-04-13T19:05:53.248+09:00  INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-2
2023-04-13T19:05:58.255+09:00  INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-7
^C (SIGINT 호출)
2023-04-13T19:06:03.259+09:00  INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-8
2023-04-13T19:06:08.265+09:00  INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-9
2023-04-13T19:06:13.270+09:00  INFO 36204 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-11
2023-04-13T19:06:13.340+09:00  INFO 36204 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Revoke previously assigned partitions test-topic-0

SIGINT를 호출하면 실행중인 컨슈머는 바로 종료되지 않고, poll()로 가져온 모든 레코드를 처리하려한다. 모든 레코드를 처리하고 나서야 오프셋을 커밋후 종료한다.

파티션별로 5개의 레코드를 처리해 위와 같이 Current Offset이 5인 것을 확인할 수 있다.

테스트 2 (immediate-stop=false, max.poll.size=100)

spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.immediate-stop=false # 기본값

SIGINT 호출 결과

2023-04-13T18:34:27.169+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-16
^C (SIGINT 호출)
2023-04-13T18:34:32.173+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-33
2023-04-13T18:34:37.178+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-39
2023-04-13T18:34:42.183+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-46
2023-04-13T18:34:47.189+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-47
2023-04-13T18:34:52.189+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-48
2023-04-13T18:34:57.194+09:00  INFO 34367 --- [ntainer#0-2-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-49
2023-04-13T18:34:57.772+09:00  INFO 34367 --- [ionShutdownHook] o.s.c.support.DefaultLifecycleProcessor  : Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]

SIGINT를 호출하면 실행중인 컨슈머는 바로 종료되지 않고, poll()로 가져온 모든 레코드를 처리하려한다.
하지만 SpringApplicationShutdownHook 스레드에서 Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms 와 같이 timeout으로 인한 실패 로그를 확인할 수 있다.
기본 shutdown timeout값인 30초를 넘으면 오프셋 커밋에 실패한다는 것을 알 수 있다.

테스트 3 (immediate-stop=true, max.poll.size=100)

spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.immediate-stop=true
SIGINT 호출 결과
2023-04-13T19:23:36.957+09:00  INFO 37318 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-2
^C (SIGINT 호출)
2023-04-13T19:23:41.963+09:00  INFO 37318 --- [ntainer#0-0-C-1] c.y.k.consumer.TestTopicConsumer         : message=test-7
2023-04-13T19:23:41.994+09:00  INFO 37318 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Revoke previously assigned partitions test-topic-0

immediate-stop 옵션을 true로 설정하는 경우, 현재 레코드가 처리된 이후에 종료 처리를 진행한다. 위의 예시는 2번째 레코드 처리 도중 SIGINT 요청이 발생했고 2번째 레코드가 완전히 처리된 이후 종료된 것을 확인할 수 있다.

0번 파티션에서 2개의 레코드를 처리해 현재 Current Offset이 2인 것을 확인할 수 있다.

작동 원리

카프카 컨슈머에서는 어떻게 graceful shutdown을 처리하는 걸까? 답은 shutdown hook과 KafkaConsumer.wakeup()을 활용함에 있다. 자바의 경우 shutdown hook을 구현하여 안전한 종료를 명시적으로 구현할 수 있는데 이때 컨슈머의 wakeup()을 호출하면, 다음 poll() 메소드가 호출될 때 WakeupException 예외가 발생한다. 예외가 발생하면 catch 문에서 WakeupException을 받아 컨슈머 종료전에 offset을 커밋처리하도록 구현하면 된다.

구현 예시

public static void main(String[] args) {
	Runtime.getRuntime().addShutdownHook(new ShutdonwThread());
}

static class ShutdownThread extends Thread {
	public void run() {
    	consumer.wakeup();
    }
}

url : https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>();

try {
	while(true) {
    	ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
        	logger.info("{}", record);
            currentOffsets.put(
                 new TopicPartition(record.topic(), record.partition()),
                 new OffsetAndMetadata(record.offset() + 1, null));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
	// ignore, we're closing
} finally {
	try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
    }
}

레코드를 처리할 때마다 currentOffsets을 갱신하고, WakeupException이 발생하여 컨슈머가 중단되면 currentOffsets을 최종 커밋하여 안전하게 종료처리할 수 있다.

스프링 카프카 컨슈머 코드

KafkaMessageListenerContainer.java

@Override
protected void doStart() {
	...
}    

@Override
protected void doStop(final Runnable callback, boolean normal) {
	if (isRunning()) {
    	this.listenerConsumerFuture.whenComplete(new StopCallback(callback));
        setRunning(false);
        this.listenerConsumer.wakeIfNecessaryForStop();
        setStoppedNormally(normal);
    }
}

void wakeIfNecessaryForStop() {
	if (this.polling.getAndSet(false)) {
		this.consumer.wakeup();
	}
}

스프링 카프카 KafkaMessageListenerContainer 클래스의 경우에도 doStop() 메소드가 호출되는 경우 내부에서 consumer.wakeup()이 호출되는 것을 확인할 수 있다.

소스코드

https://github.com/yellowsunn/learning-projects/tree/main/kafka-graceful-shutdown

참고자료
https://dgle.dev/kafkashutodwn
https://blog.voidmainvoid.net/264
https://jessyt.tistory.com/151

0개의 댓글