[Spring] 실시간 채팅 기능 회고 - 3

일상 회고록·2024년 1월 16일
0

브릿지 프로젝트

목록 보기
3/4
post-thumbnail
post-custom-banner

안녕하세요!

지난 채팅회고 - 1 , 채팅회고 - 2 를 통해 WebSocket + Stomp 로 채팅 기능을 구현 후, 이벤트 브로커인 Kafka 도입 배경까지 알아봤습니다.

Kafka 관련 자료들은 굉장히 많기 때문에 Kafka로 브로커를 전환하는 과정은 순조롭게 이루어졌는데요

하지만 언제나 개발은 마음대로 흘러가지 않죠,,


그럼 트러블슈팅 시작하겠습니다!

0. 환경

일단 Kafka를 사용하기 위해서 Producer 와 Consumer 설정을 해주어야 합니다.

  • Producer
    ```java
    @EnableKafka
    @Configuration
    public class ProducerConfig {
    
        @Bean
        public ProducerFactory<String, ChatMessageRequest> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigurations());
        }
    
        @Bean
        public Map<String, Object> producerConfigurations() {
            return ImmutableMap.<String, Object>builder()
                    .put(BOOTSTRAP_SERVERS_CONFIG, Constant.BOOTSTRAP_SERVER)
                    .put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                    .put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                    .build();
        }
    
        @Bean
        public KafkaTemplate<String, ChatMessageRequest> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    ```

  • Consumer
    @EnableKafka
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, ChatMessageRequest> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, ChatMessageRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, ChatMessageRequest> consumerFactory() {
            JsonDeserializer<ChatMessageRequest> deserializer = new JsonDeserializer<>();
            deserializer.addTrustedPackages("*");
    
            Map<String, Object> consumerConfigurations =
                    ImmutableMap.<String, Object>builder()
                            .put(BOOTSTRAP_SERVERS_CONFIG, Constant.BOOTSTRAP_SERVER)
                            .put(GROUP_ID_CONFIG, Constant.GROUP_ID)
                            .put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                            .put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                            .put(AUTO_OFFSET_RESET_CONFIG, "latest")
                            .build();
    
            return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
        }
    }

위와 같이 설정해주고,

만들어진 Bean들을 이용해 실질적으로 메세지(프레임)을 받고, 보내주는 메소드를 작성합니다.

  • MessageSender
    @Service
    @RequiredArgsConstructor
    public class MessageSender {
    
        private final KafkaTemplate<String, ChatMessageRequest> kafkaTemplate;
    
        public void send(String topic, ChatMessageRequest message) {
    
            // Kafka Template 을 사용하여 메세지를 지정된 토픽으로 전송
            kafkaTemplate.send(topic, message);
        }
    }

  • MessageReceiver
    @Service
    @RequiredArgsConstructor
    public class MessageReceiver {
    
        private final SimpMessagingTemplate template;
    
        @KafkaListener(topics = Constant.KAFKA_TOPIC, groupId = Constant.GROUP_ID)
        public void receiveMessage(ChatMessageRequest message) {
           
            // 메세지객체 내부의 채팅방 ID 참조 -> 구독자에게 메세지 발송
            template.convertAndSend("/sub/chat/room/" + message.getChatRoomId(), message);
        }
    }

위 코드를 통해 Producer가 메세지를 Kafka의 토픽으로 보내면(이벤트 발생) @KafkaListener 를 통해 감지하고, 해당 메세지를 구독중인 채팅방으로 보내줍니다.

이제 세팅이 끝났으니 배포 전 로컬에서 수행해볼까요?

설정한 groupId인 group1이 파티션에 정상적으로 할당되었습니다.

이후, 테스트 메세지를 보내보면,

세팅한 로그가 찍히면서 아주 성공적으로 잘 전송되었습니다. 👏🏻👏🏻👏🏻

자 그럼 이젠 로컬이 아닌 배포 서버에서 Kafka를 적용해 보겠습니다.

1. 이슈

브릿지 프로젝트의 경우 AWS EC2 + Code Deploy + Git Actions 를 통해 배포 자동화 CI/CD가 구축되어 있는 상태였습니다.

즉, 이미 배포가 이루어진 후 새로운 메세지 브로커인 Kafka로 교체하는 상황이었습니다.

배포 서버에 Kafka를 적용하는 방법은 두가지 선택지가 있었는데요

  1. 직접 Kafka 설치 후 적용
  2. Docker 를 통해 적용

도커를 통한 적용은 단계가 많아 일단 비교적 간단한 첫번째 방법으로 시도해보았습니다.

(과정은 구글링 시 어렵지 않게 찾아볼 수 있습니다)

Kafka를 설치 후, localhost 를 배포 서버 IP로 바꾸어 주고 동일하게 서버를 배포한 후 테스트 메세지를 보내본 결과,

분명 Kafka로 Producer가 메세지를 전송하는 단계까지는 성공했지만, Consumer가 Topic 으로부터 메세지를 읽어오지 못하는 문제가 발생했습니다.

즉, 보내지기만 하고 가져오지를 못했습니다. (오열)

1-1. 삽질

무엇이 문제일까 한참을 고민했습니다.

혹시나 서버 설정이 잘못되었나 싶어 Kafka 삭제 → 재설치를 반복하고 꼼꼼하게 설정을 해주었지만, 문제는 해결되지 않았습니다.

로컬에선 정상적으로 돌아갔기 때문에 Kafka 자체의 설치,설정 관련 문제는 아니라고 생각했습니다.

몇일 동안 SpringBoot 부터 천천히 살펴본 결과 문제를 파악할 수 있었습니다.

1-2. 원인

문제의 핵심 원인은 @KafkaListener 에서 containerFactory 설정을 누락한 것이었습니다.

containerFactory는 Kafka Listener가 사용할 ConsumerFactory를 설정해주는데 사용됩니다.

이부분을 설정해주지 않는다면, SpringBoot는 기본적으로 로컬 환경에 맞게 구성된 containerFactory를 사용하게 됩니다.

이 경우, Kafka Listeners는 실제 Kafka 서버가 아닌 로컬 서버에서 데이터를 읽으려 시도하게 되며, 결과적으로 Topic로부터 데이터를 제대로 가져오지 못하게 됩니다. (로컬환경에서는 정상적이었던 이유)

문제 원인을 알았으니 해결해볼까요?

2.해결

  • MessageReceiver

위의 코드에서 @KafkaListener 부분에 containerFactory 설정을 추가해주면 됩니다.

설정하는 ConsumerFactory의 이름은

ConsumerConfig.java에서 제가 설정한 kafkaListenerContainerFactory 와 동일하게 적어주도록 합니다.

최종적으로 @KafkaListener에 containerFactory 설정이 추가된 것을 확인할 수 있습니다.

설정 완료 후 재배포한 후 테스트 메세지를 보내본 결과, Consumer가 토픽으로부터 정상적으로 메세지를 가져오는 것을 확인할 수 있었습니다.

이렇게 또 하나를 배워가는 것 같습니다.

이번 회고는 여기서 마무리하도록 하겠습니다!

다음은 마지막 회고인 채팅회고 - 4에서 뵙도록 하겠습니다!!

읽어주셔서 감사합니다.




References

트러블 슈팅 : Kafka 리스너(컨슈머)가 Kafka 서버에서 데이터를 읽지 못하는 문제 발생 및 해결

profile
하고 싶은 것들이 많은 개발자입니다
post-custom-banner

0개의 댓글