카프카 컨슈머에서 List 형태의 메시지를 컨슘할 때, 역직렬화시 LinkedHashMap으로 파싱하여 List<LinkedHashMap<key, value>> 형태로 메시지가 들어오게 된다.
그래서 List로 생각하고 작성한 코드에서 LinkedHashMap 을 캐스트 할 수 없다는 오류가 발생하였다.
LinkedHashMap cannot be cast to class...
List형태로 변경하려면 아래와 같이 추가하면 간단하게 해결 가능하다
List<TestDto> dataList = objectMapper.convertValue(record.value(), new TypeReference<>() { });
protected JsonDeserializer<TestDto> testDtoDeserializer() {
ObjectMapper om = new ObjectMapper();
JavaType javaType = om.getTypeFactory().constructParametricType(List.class, TestDto.class); // LinkedListHashMap => TestDto
return new JsonDeserializer<>(javaType, om, false);
}
JsonDeserializer< TestDto>를 반환하는 testDtoDeserializer 호출하여 객체를 생성하고 VALUE_DESERIALIZER_CLASS_CONFIG로 설정한다.
@Bean
public ConsumerFactory<String, TestDto> testConsumerFactory() {
JsonDeserializer<TestDto> deserializer = testDtoDeserializer(); // 1 JsonDeserializer<TestDto> 가져오기
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TEST_COLLECT_GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); // 2 VALUE_DESERIALIZER_CLASS_CONFIG 로 설정
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), new ErrorHandlingDeserializer<>(deserializer));
}