[Kafka] Deserializer 관련 이슈 - Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'

rockstar·2023년 6월 12일
1

Issue

목록 보기
2/7

이슈

2023-06-12 18:26:11 org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
2023-06-12 18:26:11     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
2023-06-12 18:26:11     at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:934) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:587) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.0.6.jar!/:3.0.6]
2023-06-12 18:26:11     at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:732) ~[spring-boot-3.0.6.jar!/:3.0.6]
2023-06-12 18:26:11     at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434) ~[spring-boot-3.0.6.jar!/:3.0.6]
2023-06-12 18:26:11     at org.springframework.boot.SpringApplication.run(SpringApplication.java:310) ~[spring-boot-3.0.6.jar!/:3.0.6]
2023-06-12 18:26:11     at org.springframework.boot.SpringApplication.run(SpringApplication.java:1304) ~[spring-boot-3.0.6.jar!/:3.0.6]
2023-06-12 18:26:11     at org.springframework.boot.SpringApplication.run(SpringApplication.java:1293) ~[spring-boot-3.0.6.jar!/:3.0.6]
2023-06-12 18:26:11     at com.hexagonal.order.OrderApplication.main(OrderApplication.java:10) ~[classes!/:0.0.1-SNAPSHOT]
2023-06-12 18:26:11     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
2023-06-12 18:26:11     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
2023-06-12 18:26:11     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
2023-06-12 18:26:11     at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
2023-06-12 18:26:11     at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[app.jar:0.0.1-SNAPSHOT]
2023-06-12 18:26:11     at org.springframework.boot.loader.Launcher.launch(Launcher.java:95) ~[app.jar:0.0.1-SNAPSHOT]
2023-06-12 18:26:11     at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[app.jar:0.0.1-SNAPSHOT]
2023-06-12 18:26:11     at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65) ~[app.jar:0.0.1-SNAPSHOT]
2023-06-12 18:26:11 Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2023-06-12 18:26:11     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[kafka-clients-3.4.0.jar!/:na]
2023-06-12 18:26:11     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[kafka-clients-3.4.0.jar!/:na]
2023-06-12 18:26:11     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:852) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:381) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:531) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:226) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:531) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:383) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:328) ~[spring-kafka-3.0.4.jar!/:3.0.4]
2023-06-12 18:26:11     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.8.jar!/:6.0.8]
2023-06-12 18:26:11     ... 21 common frames omitted
2023-06-12 18:26:11 Caused by: org.apache.kafka.common.KafkaException: class org.springframework.kafka.support.serializer.JsonSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
2023-06-12 18:26:11     at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[kafka-clients-3.4.0.jar!/:na]
2023-06-12 18:26:11     at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[kafka-clients-3.4.0.jar!/:na]
2023-06-12 18:26:11     at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[kafka-clients-3.4.0.jar!/:na]
2023-06-12 18:26:11     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:716) ~[kafka-clients-3.4.0.jar!/:na]
2023-06-12 18:26:11     ... 35 common frames omitted
2023-06-12 18:26:11 

분명 컨테이너에 프로젝트를 빌드해서 올리기 전에 실행했을 때는 발생하지 않던 문제였는데, 왜 그런지 Docker에서는 해당 에러가 나기 시작했다. 문제의 원인을 찾고 싶었지만 구글링을 해도 검색결과가 그렇게 많지 않았어서 애를 먹었고, 하루 종일 해결 방법을 찾아본 것 같다. 지금은 문제 원인을 찾아서 해결하긴 했는데, 정말 허탈할 정도지만 우선 정리를 해보려고 한다.

원인

Spring Kafka Dependency를 추가해서 사용하고 있고, Consumer쪽에서 받고 싶은 메시지가 Map<String, Object> Type이었다. Object자리에는 다른 도메인에서 필요한 정보들을 담은 객체 타입을 넣고 싶어서 fasterxml의 JsonDeserializer로 설정했는데 그로 인해 나온 에러였다. 사실 Kafka의 JsonDeserializer는 fasterxml에서 파생된 것인데 기존의 기능을 가져온 후 Kafka에 특화된 추가적인 기능까지 더해져있는 Deserializer라고 보면 된다. 그러한 차이 때문에 에러가 날 수도 있겠다는 생각이 들었고 위에서 아래로 수정을 하게 되었다.

수정 전
'com.fasterxml.jackson.databind'의 JsonDeserializer

수정 후
'org.springframework.kafka.support.serializer'

만약 Spring Kafka를 사용하지 않고, Apache Kafka를 사용하는 경우에는 기본적으로 지원하는 Deserializer가 없을 수 있기 때문에 아래처럼 Gson/Jackson 라이브러리를 추가 후 CustomDeserializer를 만들어서 사용하면 된다.

package com.example.kafka;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;

public class CustomJsonDeserializer<T> implements Deserializer<T> {
	private final Gson gson = new Gson();
	private final Class<T> targetType;
	
	public CustomJsonDeserializer(Class<T> targetType) {
		this.targetType = targetType;
	}

	@Override
	public T deserialize(String topic, byte[] data) {
		System.out.println("Received message from topic: " + topic);
		return gson.fromJson(new String(data), targetType);
	}
}

Configuration 클래스는 아래와 같이 작성하면 된다.

Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap-servers");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomJsonDeserializer.class);

Consumer<String, YourValueType> consumer = new KafkaConsumer<>(consumerProps);

잘못된 정보는 지적해주시면 감사하겠습니다.

0개의 댓글