Kafka consumer를 사용하다보면, JsonDeserializer를 많이 사용하게 됩니다.
하지만 가끔 정의한 schema와 다른 value가 의도치않게 들어오는 경우가 생기는데 ( 이벤트 수기 발행이라던지, 프로듀서 쪽에서 이벤트를 잘못 던졌다던지.. ), 이 경우 spring-kafka 에서 DeserializeException 이 발생하며 무한 재시도를 하려고 합니다.
그럼 분명 대부분의 사람들은 ErrorHandler를 정의했는데, 왜 그것이 동작하지 않는 것인지 궁금해 할 것이라고 생각합니다.
그 이유와 해결법을 이제부터 알아보겠습니다.
일단, Spring kafka가 어떻게 동작하는지 부터 알아봐야 합니다.
Spring kafka consumer는 대략 아래의 4단계의 순서대로 진행됩니다.
Spring kafka를 사용할 때, 사용자가 정의하는 ErrorHandler는 3번 과정 ( kafkaListener ) 에서 처리 됩니다.
그렇기에, 3번 과정에서 실패하게 되면 정의 된 retry 횟수 만큼 재시도하고 4번 과정으로 넘어가 커밋을 하게 됩니다.
관련자료 : https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#error-handlers
하지만, DeserializeException 의 경우는 조금 다릅니다. DeserializeException은 2번 단계에서 발생하기 때문에, ErrorHandler에 도달하지 못합니다.
결국 commit이 되지 않아, 같은 offset에서 계속 재시도하게 됩니다. 이로 인해, spring-kafka는 ErrorHandlingDeserializer를 도입하기로 했습니다.
ErrorHandlingDeserializer는 우리가 사용하는 Deserializer를 한 번 래핑한 형태의 Deserializer입니다. 이 코드를 보시면 ErrorHandlingDeserializer가 delegate라는 변수로 실제 Deserializer를 가지고 있는 것을 알 수 있습니다.
그럼 ErrorHandlingDeserializer는 어떻게 동작할까요?
간단합니다. ErrorHandlingDeserializer는 사용자가 정의한 delegated Deserializer로 deserialize 시도를하고, deserialize 실패시 null을 반환하게 설계 되어있습니다.
이를 통해, 결과는 null이지만 deserialize는 성공하여 해당 데이터는 listener 까지 도달 할 수 있게 됩니다.
그 후 에러가 나면, 위 ErrorHandler 파트에서 말했듯이 이미 Listener에 도달했기 때문에 ErrorHandler가 그 에러를 처리하고 retry하다가 최종적으로 offset을 commit 하게 됩니다.
사용법은 아주 간단합니다.
Deserializer 클래스를 ErrorHandlingDeserializer로 지정하고, delegate에 원래 사용하던 serializer를 넣어주면 됩니다.
// error handling deserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
// delegate 하는 deserializer
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
만약, deserializer를 consumerFactory 생성시에 동적으로 넘겨주고 있었다면 아래와 같이 하면 됩니다.
private fun <T> consumerFactory(deserializer: JsonDeserializer<T>): ConsumerFactory<String, T> {
val errorHandlingKeyDeserializer = ErrorHandlingDeserializer(StringDeserializer())
val errorHandlingValueDeserializer = ErrorHandlingDeserializer(deserializer)
val props = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrapServers
)
return DefaultKafkaConsumerFactory(props, errorHandlingKeyDeserializer, errorHandlingValueDeserializer)
}
잘 읽고 갑니다!