다른 블로그들을 보면서 kafka 연동 작업을 하는데
org.springframework.kafka.support.serializer.JsonDeserializer
를 써서 config를 작성하라고 했다
그렇게 해서 잘 작동되나 싶었더니 오류가 하나 발생했다
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Invalid value org.springframework.kafka.support.serializer.JsonDeserializer@1c3a64b6 for configuration value.deserializer: Expected a Class instance or class name.
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.24.jar:5.3.24]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.24.jar:5.3.24]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.24.jar:5.3.24]
at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.24.jar:5.3.24]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.24.jar:5.3.24]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.24.jar:5.3.24]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.24.jar:5.3.24]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) ~[spring-boot-2.7.7.jar:2.7.7]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731) ~[spring-boot-2.7.7.jar:2.7.7]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.7.jar:2.7.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) ~[spring-boot-2.7.7.jar:2.7.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) ~[spring-boot-2.7.7.jar:2.7.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292) ~[spring-boot-2.7.7.jar:2.7.7]
한참을 구글링을하다가 못찾겠어서 직접 kafka 파일들을 뜯어보았다
그러다가 하나를 발견했는데
case CLASS:
if (value instanceof Class)
return value;
else if (value instanceof String) {
ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader();
// Use loadClass here instead of Class.forName because the name we use here may be an alias
// and not match the name of the class that gets loaded. If that happens, Class.forName can
// throw an exception.
Class<?> klass = contextOrKafkaClassLoader.loadClass(trimmed);
// Invoke forName here with the true name of the requested class to cause class
// initialization to take place.
return Class.forName(klass.getName(), true, contextOrKafkaClassLoader);
} else
throw new ConfigException(name, value, "Expected a Class instance or class name.");
이 코드의 마지막 else에서 걸리는거였다
설명하기전에 내가 작성한 코드부터 보여주겠다
@EnableKafka
@Configuration
public class ListenerConfiguration {
@Value("${kafka.consumer-bootstrap-server}")
private String BOOTSTRAP_SERVER;
@Value("${kafka.consumer-group-id}")
private String GROUP_ID;
@Value("${kafka.consumer-auto-offset-reset}")
private String AUTO_OFFSET_RESET;
@Bean
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, KafkaMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.consumerProps());
}
private Map<String, Object> consumerProps() {
JsonDeserializer<KafkaMessage> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("*");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.AUTO_OFFSET_RESET);
return props;
}
}
consumerProps의 deserializer
부분을 계속 의심하고 있었는데 위에 있는 kafka 파일에 있는 코드랑 비교해보니 class로 넘겨주지 않아서 발생했던 문제였다
그래서 deserializer
-> deserializer.getClass()
로 바꿔서 실행해보니
정상적으로 작동이 잘 되었다
이 문제가 발생한 친구들은 나처럼 삽질하지 말기를 바란다...