Spring Kafka ErrorHandlingDeserializer

kshired·2024년 1월 24일
1

Kafka consumer를 사용하다보면, JsonDeserializer를 많이 사용하게 됩니다.

하지만 가끔 정의한 schema와 다른 value가 의도치않게 들어오는 경우가 생기는데 ( 이벤트 수기 발행이라던지, 프로듀서 쪽에서 이벤트를 잘못 던졌다던지.. ), 이 경우 spring-kafka 에서 DeserializeException 이 발생하며 무한 재시도를 하려고 합니다.

그럼 분명 대부분의 사람들은 ErrorHandler를 정의했는데, 왜 그것이 동작하지 않는 것인지 궁금해 할 것이라고 생각합니다.

그 이유와 해결법을 이제부터 알아보겠습니다.

How spring-kafka consumer works?

일단, Spring kafka가 어떻게 동작하는지 부터 알아봐야 합니다.

Spring kafka consumer는 대략 아래의 4단계의 순서대로 진행됩니다.

  1. 직렬화된 데이터를 브로커로부터 가져오기
  2. 가져온 데이터를 역직렬화
  3. 역직렬화된 데이터를 처리
  4. 브로커에게 commit 요청

Spring-kafka ErrorHandler

Spring kafka를 사용할 때, 사용자가 정의하는 ErrorHandler는 3번 과정 ( kafkaListener ) 에서 처리 됩니다.

그렇기에, 3번 과정에서 실패하게 되면 정의 된 retry 횟수 만큼 재시도하고 4번 과정으로 넘어가 커밋을 하게 됩니다.

관련자료 : https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#error-handlers

DeserializeException

하지만, DeserializeException 의 경우는 조금 다릅니다. DeserializeException은 2번 단계에서 발생하기 때문에, ErrorHandler에 도달하지 못합니다.

결국 commit이 되지 않아, 같은 offset에서 계속 재시도하게 됩니다. 이로 인해, spring-kafka는 ErrorHandlingDeserializer를 도입하기로 했습니다.

ErrorHandlingDeserializer는 우리가 사용하는 Deserializer를 한 번 래핑한 형태의 Deserializer입니다. 이 코드를 보시면 ErrorHandlingDeserializer가 delegate라는 변수로 실제 Deserializer를 가지고 있는 것을 알 수 있습니다.

그럼 ErrorHandlingDeserializer는 어떻게 동작할까요?

간단합니다. ErrorHandlingDeserializer는 사용자가 정의한 delegated Deserializer로 deserialize 시도를하고, deserialize 실패시 null을 반환하게 설계 되어있습니다.

이를 통해, 결과는 null이지만 deserialize는 성공하여 해당 데이터는 listener 까지 도달 할 수 있게 됩니다.

참고1, 참고2

그 후 에러가 나면, 위 ErrorHandler 파트에서 말했듯이 이미 Listener에 도달했기 때문에 ErrorHandler가 그 에러를 처리하고 retry하다가 최종적으로 offset을 commit 하게 됩니다.

How to use?

사용법은 아주 간단합니다.

Deserializer 클래스를 ErrorHandlingDeserializer로 지정하고, delegate에 원래 사용하던 serializer를 넣어주면 됩니다.

// error handling deserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

// delegate 하는 deserializer
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);

만약, deserializer를 consumerFactory 생성시에 동적으로 넘겨주고 있었다면 아래와 같이 하면 됩니다.

private fun <T> consumerFactory(deserializer: JsonDeserializer<T>): ConsumerFactory<String, T> {
    val errorHandlingKeyDeserializer = ErrorHandlingDeserializer(StringDeserializer())
    val errorHandlingValueDeserializer = ErrorHandlingDeserializer(deserializer)

    val props = mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrapServers
    )

    return DefaultKafkaConsumerFactory(props, errorHandlingKeyDeserializer, errorHandlingValueDeserializer)
}

TL;DR

  • DeserializeException은 spring에서 기본적으로 처리할 수 없음.
  • ErrorHandlingDeserializer를 사용하여, deserialize를 delegate 하는 방식을 사용하여 해결해야 함.

references

profile
글 쓰는 개발자

1개의 댓글

comment-user-thumbnail
2024년 10월 11일

잘 읽고 갑니다!

답글 달기