[Probee] 스프링부트 & 카프카 (Springboot & Kafka)

suhwani·2024년 5월 27일
0
post-thumbnail

Kafka 도입


  • 카프카 동작 테스트

[서버] Kafka 와 Spring Boot 애플리케이션 연동

  • AWS msk(kafka) 사용

[aws][카프카] aws msk(kafka) serverless 사용하기

Kafka 를 도입하자


업로드중..

왜 쓰나요??

서비스 중 가장 중요한 게 AI 모델을 통한 영상 분석 이다. 영상을 분석하면서 시간이 얼만큼 걸릴지 확신할 수 없다.

(얼마나 넓은 크기, 긴 시간동안 분석할건지에 따라 다르지만) 예상컨대 최소 3분 이상 걸린다. 하지만 서버에서는 몇분동안 클라이언트에게 응답을 안 줄 수는 없다.

따라서 응답은 바로 하되, 서버 백그라운드에서 AI 영상분석이 진행되도록 하기 위해서 Kafka 를 사용하기로 했다.

구현하면서 어려웠던 점


구현하는 것 자체는 어렵지 않았던 것 같다. 큰 에러없이 구현을 했는데, 테스트한다고 String 을 넘기는 Producer, Consumer 를 만들었다. 이걸 내가 원하는대로 바꾸려고 Dto 객체를 넘기려고 했을 때 에러가 발생했다.

Producer 에서 직렬화해서 넘기고, Consumer 에서 역직렬화해서 받는 형식이다.
Producer → Key : StringSerializer Value : JsonSerializer
Consumer → Key : StringDeserializer Value : JsonDeserializer

둘 다 올바르게 해줬는데도 에러가 발생해서 찾아봤더니 Kafka 정책(?), 보안(?) 측면에서 문제가 발생하였다. Kafka 는 알려지지 않은, 위험할 수도 있는 Package 는 받지 않도록 설정이 되어있기 때문에, 내가 만든 Dto 객체를 넘기는 것이 제한되어 있었다. 따라서 내가 넘기는 Package 가 안전해요!! 라는 걸 알려줘야한다. 그래서 나는 jsonDeserializer 를 세팅해서 KafkaConsumerFactory 에 넘겨주었다.

    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, KafkaDto> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(this.consumerConfig(), new StringDeserializer(), jsonDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,KafkaDto> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.consumerFactory());
        return factory;
    }

    @Bean
    public JsonDeserializer<KafkaDto> jsonDeserializer() {
        JsonDeserializer<KafkaDto> deserializer = new JsonDeserializer<>(KafkaDto.class);
        deserializer.addTrustedPackages("com.capstone.server.dto");
        return deserializer;
    }

Kafka 를 Docker 로!!


docker-compose.yml

‼️ 주의해야할 점

version, kafka - image - wurstmeister/kafka:***
위 부분에서 버전 신경을 써야한다. springboot 3.2.3 을 쓰고 있어서, 맞는 버전을 찾아야한다.
🔗 springboot-kafka 호환 버전 공식 문서

kafka image version** 을 3.6.0 을 써야한다… 3.6.1이다… 등등 있었는데 나는 저렇게 하니까
정상 실행돼서 저렇게 하게 되었다.. 공식 문서에서는 springboot 3.2.x 는 kafka 3.6.0 을 쓰라고 하고,
실제로 실행될 때 kafka version 을 찍어보니 3.6.1 이었는데 아마 Docker image 는 버전이 다른 것 같다.
찾아보니 Docker Hub 에서 확인을 해보라고 나와서, 그냥 뒤에 아무것도 안 붙이면 latest 를 가져오는 것 같다.

version: "3" # docker-compose 버전 지정
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost # TODO : 환경에 맞게 ip 변경 ex) 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

도커 카프카 커넥트

profile
Backend-Developer

0개의 댓글