기본 설정 외에 필요해서 추가한 설정
ConsumerConfiguration.java
예시@Bean
public ConcurrentKafkaListenerContainerFactory<String, Example> orderBoxTrackingListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Example> factory = new ConcurrentKafkaListenerContainerFactory<>();
ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonDeserializer jsonDeserializer = new JsonDeserializer(Example.class, objectMapper);
factory.setConsumerFactory(getConsumerFactory(jsonDeserializer));
factory.setErrorHandler(errorLog());
return factory;
}
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
consumer 이하 에서 발생한 error handler 잡는 부분.
나는 error를 잡아서 로그로 찍는 부분만 구현해서 아래 메소드만 추가로 더 구현했었다.
private ErrorHandler errorLog() {
return (exception, data) -> log.error("Handle Consumer Exception: {} | topic: {} | key: {} | partition: {} | offset: {}",
exception.getMessage(), data.topic(), data.key(), data.partition(), data.offset(), exception);
}
DLT 토픽을 가지게 되는 경우 setErrorHandler()
이쪽에 아래처럼 구현하면 된다.
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(1000L, 2L)));