[Spring Boot + Kafka] Spring Boot + WebSocket 채팅 서버 분산 처리(Scale out)

이홍준·2024년 3월 8일
2

Spring Boot

목록 보기
11/12
post-thumbnail

서론

MSA를 공부하는 중이었다. 그래서 채팅서비스를 구현하고 트래픽을 분산처리하도록 구성해보면서 관련 기술을 공부한것에 대해 정리하고자 작성해 보았다.

사전에 알아야 할 개념

  • java 17의 record
  • Eureka Server & Client (Service Discovery)
  • Pub/Sub 기본
  • Spring Cloud Gateway
  • STOMP, WebSocket
  • Apache kafka & Zookeeper
  • 서비스 간의 통신 개념

문제 정의

기존에 구축했던 프로젝트의 채팅 기능은 간단하게 구현하기 위해 WebSocket+ STOMP 방식을 채택했었다.

  1. 기존 방식(단일 서버)

    해당 방식은 그림과 같이 단일 서버에 해당하는 Topic을 구독해서 사용하는 방식이었다. 하지만 웹소켓 방식은 stateful하므로 리소스를 많이 사용한다는 단점이있다.

  1. 여러 인스턴스로 구성된 채팅 서비스

    서버의 리소스를 분산시키고 트래픽을 어느정도 분산시키고자 했다. 그래서 해당 인스턴스들을 Eureka Server(Service Discovery)에 등록하고 Spring Cloud Gateway로 로드밸런싱을 시켜주는 방식으로 구성했다.

하지만 Websocket은 기본적으로 양방향 방식이며 Stateful 방식이다. 그래서 로드밸런싱을 하더라도 클라이언트는 서버와 세션이 연결되어있다. 그래서 같은 채팅방에 접속하더라도 다른 서버 인스턴스를 구독하고있으면 실시간으로 채팅을 전달 받을 수 없다.

해결 방법

외부 Broker 사용(Kafka or RabbitMQ or Redis)

Broker를 서버 외부에 위치 시켜주고 서비스 인스턴스들이 Broker를 구독함으로써 모든 서버가 해당 메시지를 받을 수 있도록 해야 한다.

기능 정리

  • 해당 입장 된 채팅방(Path value)에 채팅 메시지 발송
  • 해당 채팅방에 입장 된 사용자들 모두 수신

구현 방법

  1. Kafka & Zookeeper 서버작동 방법은 여기를 참고하길 바란다.

  2. build.gradle

    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-websocket' // STOMP
        implementation 'org.springframework.kafka:spring-kafka' // kafka
        implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client' // eureka client 사용시
    }
  3. application.yml

    server:
      port: 0 
    spring:
      application:
        name: chat-service
      datasource:
        url: jdbc:mysql://localhost:3306/mysql?useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul&allowPublicKeyRetrieval=true
        username: [username]
        password: [password]
        driver-class-name: com.mysql.cj.jdbc.Driver
    eureka:
      instance:
        prefer-ip-address: true
        instance-id: ${spring.cloud.client.ip-address}:${spring.application.instance_id:${random.value}}
        lease-renewal-interval-in-seconds: 10
        lease-expiration-duration-in-seconds: 10
      client:
        register-with-eureka: true
        fetch-registry: true
        service-url:
          defaultZone: http://127.0.0.1:8761/eureka # eureka server 
  4. WebSocketConfig: websocket 관련 config

    @Configuration
    @EnableWebSocketMessageBroker
    @RequiredArgsConstructor
    public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/chat")
                    .setAllowedOriginPatterns("*");
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.setApplicationDestinationPrefixes("/app");
        }
    
        @Override
        public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
            registry.setMessageSizeLimit(160 * 64 * 1024);
            registry.setSendTimeLimit(100 * 10000);
            registry.setSendBufferSizeLimit(3 * 512 * 1024);
        }
    }
  5. ProducerConfiguration: producer 설정

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class ProducerConfiguration {
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigurations());
        }
    
        // Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
        @Bean
        public Map<String, Object> producerConfigurations() {
            return ImmutableMap.<String, Object>builder()
                    .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
                    .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                    .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                    .build();
        }
    
        // KafkaTemplate을 생성하는 Bean 메서드
        @Bean
        public KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    
  6. ConsumerConfiguration: consumer 설정

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class ConsumerConfiguration {
        // KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        // Kafka ConsumerFactory를 생성하는 Bean 메서드
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            JsonDeserializer<String> deserializer = new JsonDeserializer<>(String.class);
            deserializer.addTrustedPackages("*");
            // 패키지 신뢰 오류로 인해 모든 패키지를 신뢰하도록 작성
            // ErrorHandlingDeserializer로 감싸기
            ErrorHandlingDeserializer<String> errorHandlingValueDeserializer = new ErrorHandlingDeserializer<>(deserializer);
            // Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
            Map<String, Object> consumerConfigurations =
                    ImmutableMap.<String, Object>builder()
                            .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
                            .put(ConsumerConfig.GROUP_ID_CONFIG, KafkaChatConstants.GROUP_ID) // GROUP_ID를 인스턴스마다 구분
                            .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                            .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, errorHandlingValueDeserializer)
                            .put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, deserializer.getClass().getName())
                            .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                            .build();
    
            return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), errorHandlingValueDeserializer);
        }
    }
  7. KafkaChatConstants ( kafka 관련 상수 설정)

    public class KafkaChatConstants {
        private static String name = UUID.randomUUID().toString(); // group id을 식별하기 위함
        public static final String GROUP_ID = name;
    }
    
  8. ChatMessageRequest: 채팅 메시지 요청 DTO

    @Builder
    public record ChatMessageRequest(String from, String text) implements Serializable {
    }

    kafka 통신하는 메시지는 네트워크를 통해 전송될 수 있도록 Serializable 을 상속 받아야 한다.

  9. ChatMessageResponse: 채팅 메시지 응답 DTO

    @Builder
    public record ChatMessageResponse(Long id, Long roomId, String content, String writer) implements Serializable {
    }
  10. ChatMessageCreateCommand: 채팅 메시지 DB 저장을 위한 요청 entity

    @Builder
    public record ChatMessageCreateCommand(Long roomId, String content, String from) {
    }
    
  11. ChatMessageCreateUseCase(Service): ChatMessge를 저장하기 위한 Service

    @RequiredArgsConstructor
    class KafkaChatMessageConsumer {
        private final SimpMessagingTemplate simpMessagingTemplate;
        @KafkaListener(topics = "chat.room.message.sending")
        public void sendMessage(ChatMessageResponse chatMessageResponse) {
            simpMessagingTemplate.convertAndSend("/topic/public/rooms/"+chatMessageResponse.roomId(), chatMessageResponse);
        }
    }
    
  12. KafkaProducer: kafka Client의 producer

    @Component
    @RequiredArgsConstructor
    class KafkaProducer {
        private final KafkaTemplate<String, Object> kafkaTemplate;
    
        public void publishMessage(String topic, Object message) {
            kafkaTemplate.send(topic, message);
        }
    }
  13. KafkaChatMessageConsumer: kafka Client의 consumer

    @RequiredArgsConstructor
    @Component
    class KafkaChatMessageConsumer {
        private final SimpMessagingTemplate simpMessagingTemplate;
        @KafkaListener(topics = "chat.room.message.sending")
        public void sendMessage(ChatMessageResponse chatMessageResponse) {
            simpMessagingTemplate.convertAndSend("/topic/public/rooms/"+chatMessageResponse.roomId(), chatMessageResponse);
        }
    }
    
  14. ChatController: 채팅 메시지 컨트롤러

    import com.vandemarket.chatservice.chat.adapter.in.web.dto.ChatMessageRequest;
    import com.vandemarket.chatservice.chat.adapter.in.web.dto.ChatMessageResponse;
    import com.vandemarket.chatservice.chat.application.port.in.ChatMessageCreateUseCase;
    import com.vandemarket.chatservice.chat.application.port.in.command.ChatMessageCreateCommand;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.messaging.handler.annotation.*;
    import org.springframework.stereotype.Controller;
    
    @Controller
    @RequiredArgsConstructor
    @Slf4j
    class ChatController {
        private final ChatMessageCreateUseCase chatMessageCreateUseCase; // service(MVC) 계층
        private final KafkaProducer kafkaProducer;
    
        public void sendMessage(@DestinationVariable Long roomId, @Payload ChatMessageRequest chatMessage) {
            ChatMessageCreateCommand chatMessageCreateCommand = ChatMessageCreateCommand.builder()
                    .content(chatMessage.text())
                    .from(chatMessage.from())
                    .roomId(roomId)
                    .build();
            Long chatId = chatMessageCreateUseCase.createChatMessage(chatMessageCreateCommand); // DB에 등록 후 Chat Message Id 반환
            ChatMessageResponse chatMessageResponse = ChatMessageResponse.builder()
                    .id(chatId)
                    .roomId(roomId)
                    .content(chatMessage.text())
                    .writer(chatMessage.from())
                    .build();
            kafkaProducer.publishMessage("chat.room.message.sending", chatMessageResponse);
        }
    }

실행 화면

  1. Eureka Server 에 등록된 Chat Service 3개의 인스턴스들

  2. 채팅방 화면

Architecture

현재 구성된 아키텍처 구조이다.

후기

채팅 서비스와 같은 websocket방식은 실시간성을 위해 양방향으로 연결되어야 하기 때문에 여러 인스턴스에 대한 세션 동기화에 대해 고민해보았다. 그래서 kafka 및 pub/sub 방식 그리고 세션동기화하는 방식에 대해 고민함으로써 아키텍처를 좀 더 크게 생각하게 되는 계기가 되었다.


References

profile
I'm a web developer.

3개의 댓글

comment-user-thumbnail
2024년 6월 3일

chatMessageCreateUseCase 코드가 잘못되어 있는걸로 보입니다~

1개의 답글
comment-user-thumbnail
2024년 10월 8일

안녕하세요 글 잘봤습니다.
혹시 채팅 개발 하시면서 예외처리는 어떻게 하셨나요?
HTTP는 통신 같은 경우에는 ExceptionHandler, ControllerAdvice 같은 필살기(?)가 있는데 websocket은 어떻게 처리할지 궁금하네요

답글 달기