안녕하세요 이번 시간에는 Kafka에서 이미 처리한 토픽이 다른 API 실행 시 중복되어 처리되는 오류를 해결하는 포스팅을 해보도록 하겠습니다.
우선 발생한 오류를 먼저 말씀드리겠습니다. 다이어리 초대 요청을 보냈을 때 프로듀서에서 FCM 관련 토픽을 전달하면 컨슈머에서 해당 토픽을 캐치하여 로직을 처리합니다. 이후 메일 발송을 위해 프로듀서에서 메일 관련 토픽을 전달하면 메일 전송과 함께 FCM도 같이 전송되는 문제가 있었습니다.
문제의 원인을 찾아본 결과 Kafka에서 읽은 메시지가 커밋되지 않아 처리한 메시지가 읽지 않은 상태로 변경되어 지속적으로 해당 토픽을 listen한 것이 문제였습니다. Spring Boot의 @KafkaListner는 기본적으로 자동 커밋을 지원하지만 KafkaConfig를 새롭게 커스텀하는 과정에서 자동 커밋이 되지 않아 이런 문제가 발생했습니다.
그렇다면 Kafka에서는 어떻게 커밋을 이용하여 읽은 메시지를 처리할까요? Kafka의 경우 컨슈머가 메시지를 처리한 후 어디까지 읽었는지 Kafka 브로커에 알리는 프로세스로 커밋을 진행합니다. 이로 인해 메시지를 중복 처리하지 않고 장애 발생 시에도 마지막으로 메시지를 처리한 위치를 보존할 수 있어 안정적인 서비스를 구축할 수 있습니다.
Kafka의 커밋 프로세스는 다음과 같습니다.
Kafka는 커밋 주기를 설정하여 커밋이 자동적으로 주기적으로 수행될 수 있도록 설정할 수 있으며, 커밋이 실패할 경우 에러 처리 메커니즘을 사용하여 커밋을 다시 시도하거나 에러를 로그에 기록하고 관리자에게 알릴 수 있습니다.
저는 매 요청마다 커밋을 자동적으로 수행하는 ENABLE_AUTO_COMMIT 옵션을 활성화하기 위해 KafkaConfig 클래스를 수정했습니다. ConsumerFactory에서 자동 커밋 옵션을 관리하기 때문에 이곳에서 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); 코드를 추가하여 자동 커밋을 수행합니다. 이를 적용하면 처리된 토픽은 다시 요청되지 않고 한번만 수행됩니다.
@Bean
public ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
return new DefaultKafkaConsumerFactory<>(configProps);
}