카프카 List<LinkedHashMap>를 List<Object> 형태로 파싱하기

zzung·2024년 12월 8일

카프카 컨슈머에서 List 형태의 메시지를 컨슘할 때, 역직렬화시 LinkedHashMap으로 파싱하여 List<LinkedHashMap<key, value>> 형태로 메시지가 들어오게 된다.

그래서 List로 생각하고 작성한 코드에서 LinkedHashMap 을 캐스트 할 수 없다는 오류가 발생하였다.

LinkedHashMap cannot be cast to class...  

List형태로 변경하려면 아래와 같이 추가하면 간단하게 해결 가능하다

방법1) convertValue로 형변환

List<TestDto> dataList = objectMapper.convertValue(record.value(), new TypeReference<>() { });

방법2) JsonDeserializer 커스텀

1 TestDtoDeserializer 생성하는 메서드

protected JsonDeserializer<TestDto> testDtoDeserializer() {
    ObjectMapper om = new ObjectMapper();
    JavaType javaType = om.getTypeFactory().constructParametricType(List.class, TestDto.class); // LinkedListHashMap => TestDto
    return new JsonDeserializer<>(javaType, om, false);
}

2 VALUE_DESERIALIZER_CLASS_CONFIG 설정

JsonDeserializer< TestDto>를 반환하는 testDtoDeserializer 호출하여 객체를 생성하고 VALUE_DESERIALIZER_CLASS_CONFIG로 설정한다.

@Bean
public ConsumerFactory<String, TestDto> testConsumerFactory() {
    JsonDeserializer<TestDto> deserializer = testDtoDeserializer(); // 1 JsonDeserializer<TestDto> 가져오기
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, TEST_COLLECT_GROUP_ID);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); // 2 VALUE_DESERIALIZER_CLASS_CONFIG 로 설정
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
 
    return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), new ErrorHandlingDeserializer<>(deserializer));
}

0개의 댓글