[Spring Boot] Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException

이상협·2023년 8월 17일
0

다른 블로그들을 보면서 kafka 연동 작업을 하는데

org.springframework.kafka.support.serializer.JsonDeserializer를 써서 config를 작성하라고 했다

그렇게 해서 잘 작동되나 싶었더니 오류가 하나 발생했다

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Invalid value org.springframework.kafka.support.serializer.JsonDeserializer@1c3a64b6 for configuration value.deserializer: Expected a Class instance or class name.
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.24.jar:5.3.24]
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.24.jar:5.3.24]
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.24.jar:5.3.24]
	at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.24.jar:5.3.24]
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.24.jar:5.3.24]
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.24.jar:5.3.24]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.24.jar:5.3.24]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) ~[spring-boot-2.7.7.jar:2.7.7]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731) ~[spring-boot-2.7.7.jar:2.7.7]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.7.jar:2.7.7]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) ~[spring-boot-2.7.7.jar:2.7.7]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) ~[spring-boot-2.7.7.jar:2.7.7]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292) ~[spring-boot-2.7.7.jar:2.7.7]

한참을 구글링을하다가 못찾겠어서 직접 kafka 파일들을 뜯어보았다
그러다가 하나를 발견했는데

case CLASS:
	if (value instanceof Class)
		return value;
	else if (value instanceof String) {
		ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader();
		// Use loadClass here instead of Class.forName because the name we use here may be an alias
		// and not match the name of the class that gets loaded. If that happens, Class.forName can
		// throw an exception.
		Class<?> klass = contextOrKafkaClassLoader.loadClass(trimmed);
		// Invoke forName here with the true name of the requested class to cause class
		// initialization to take place.
		return Class.forName(klass.getName(), true, contextOrKafkaClassLoader);
	} else
		throw new ConfigException(name, value, "Expected a Class instance or class name.");

이 코드의 마지막 else에서 걸리는거였다
설명하기전에 내가 작성한 코드부터 보여주겠다

@EnableKafka
@Configuration
public class ListenerConfiguration {

    @Value("${kafka.consumer-bootstrap-server}")
    private String BOOTSTRAP_SERVER;
    @Value("${kafka.consumer-group-id}")
    private String GROUP_ID;
    @Value("${kafka.consumer-auto-offset-reset}")
    private String AUTO_OFFSET_RESET;

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, KafkaMessage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(this.consumerProps());
    }

    private Map<String, Object> consumerProps() {
        JsonDeserializer<KafkaMessage> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("*");

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.AUTO_OFFSET_RESET);

        return props;
    }
}

consumerProps의 deserializer 부분을 계속 의심하고 있었는데 위에 있는 kafka 파일에 있는 코드랑 비교해보니 class로 넘겨주지 않아서 발생했던 문제였다

그래서 deserializer -> deserializer.getClass() 로 바꿔서 실행해보니
정상적으로 작동이 잘 되었다

이 문제가 발생한 친구들은 나처럼 삽질하지 말기를 바란다...

0개의 댓글