[서버] Kafka 와 Spring Boot 애플리케이션 연동
[aws][카프카] aws msk(kafka) serverless 사용하기
왜 쓰나요??
서비스 중 가장 중요한 게 AI 모델을 통한 영상 분석
이다. 영상을 분석하면서 시간이 얼만큼 걸릴지 확신할 수 없다.
(얼마나 넓은 크기, 긴 시간동안 분석할건지에 따라 다르지만) 예상컨대 최소 3분 이상 걸린다. 하지만 서버에서는 몇분동안 클라이언트에게 응답을 안 줄 수는 없다.
따라서 응답은 바로 하되, 서버 백그라운드에서 AI 영상분석이 진행되도록 하기 위해서 Kafka 를 사용하기로 했다.
구현하는 것 자체는 어렵지 않았던 것 같다. 큰 에러없이 구현을 했는데, 테스트한다고 String 을 넘기는 Producer, Consumer 를 만들었다. 이걸 내가 원하는대로 바꾸려고 Dto 객체를 넘기려고 했을 때 에러가 발생했다.
Producer 에서 직렬화해서 넘기고, Consumer 에서 역직렬화해서 받는 형식이다.
Producer → Key :StringSerializer
Value :JsonSerializer
Consumer → Key :StringDeserializer
Value :JsonDeserializer
둘 다 올바르게 해줬는데도 에러가 발생해서 찾아봤더니 Kafka 정책(?), 보안(?) 측면에서 문제가 발생하였다. Kafka 는 알려지지 않은, 위험할 수도 있는 Package 는 받지 않도록 설정이 되어있기 때문에, 내가 만든 Dto 객체를 넘기는 것이 제한되어 있었다. 따라서 내가 넘기는 Package 가 안전해요!! 라는 걸 알려줘야한다. 그래서 나는 jsonDeserializer 를 세팅해서 KafkaConsumerFactory 에 넘겨주었다.
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
return props;
}
@Bean
public ConsumerFactory<String, KafkaDto> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.consumerConfig(), new StringDeserializer(), jsonDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,KafkaDto> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}
@Bean
public JsonDeserializer<KafkaDto> jsonDeserializer() {
JsonDeserializer<KafkaDto> deserializer = new JsonDeserializer<>(KafkaDto.class);
deserializer.addTrustedPackages("com.capstone.server.dto");
return deserializer;
}
‼️ 주의해야할 점
version
, kafka - image - wurstmeister/kafka:***
위 부분에서 버전 신경을 써야한다. springboot 3.2.3 을 쓰고 있어서, 맞는 버전을 찾아야한다.
🔗 springboot-kafka 호환 버전 공식 문서
kafka image version** 을 3.6.0 을 써야한다… 3.6.1이다… 등등 있었는데 나는 저렇게 하니까
정상 실행돼서 저렇게 하게 되었다.. 공식 문서에서는 springboot 3.2.x 는 kafka 3.6.0 을 쓰라고 하고,
실제로 실행될 때 kafka version 을 찍어보니 3.6.1 이었는데 아마 Docker image 는 버전이 다른 것 같다.
찾아보니 Docker Hub 에서 확인을 해보라고 나와서, 그냥 뒤에 아무것도 안 붙이면 latest 를 가져오는 것 같다.
version: "3" # docker-compose 버전 지정
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost # TODO : 환경에 맞게 ip 변경 ex) 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- /var/run/docker.sock:/var/run/docker.sock