MSA를 공부하는 중이었다. 그래서 채팅서비스를 구현하고 트래픽을 분산처리하도록 구성해보면서 관련 기술을 공부한것에 대해 정리하고자 작성해 보았다.
기존에 구축했던 프로젝트의 채팅 기능은 간단하게 구현하기 위해 WebSocket+ STOMP 방식을 채택했었다.
기존 방식(단일 서버)
해당 방식은 그림과 같이 단일 서버에 해당하는 Topic을 구독해서 사용하는 방식이었다. 하지만 웹소켓 방식은 stateful하므로 리소스를 많이 사용한다는 단점이있다.
여러 인스턴스로 구성된 채팅 서비스
서버의 리소스를 분산시키고 트래픽을 어느정도 분산시키고자 했다. 그래서 해당 인스턴스들을 Eureka Server(Service Discovery)에 등록하고 Spring Cloud Gateway로 로드밸런싱을 시켜주는 방식으로 구성했다.
하지만 Websocket은 기본적으로 양방향 방식이며 Stateful 방식이다. 그래서 로드밸런싱을 하더라도 클라이언트는 서버와 세션이 연결되어있다. 그래서 같은 채팅방에 접속하더라도 다른 서버 인스턴스를 구독하고있으면 실시간으로 채팅을 전달 받을 수 없다.
외부 Broker 사용(Kafka or RabbitMQ or Redis)
Broker를 서버 외부에 위치 시켜주고 서비스 인스턴스들이 Broker를 구독함으로써 모든 서버가 해당 메시지를 받을 수 있도록 해야 한다.
Kafka & Zookeeper 서버작동 방법은 여기를 참고하길 바란다.
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-websocket' // STOMP
implementation 'org.springframework.kafka:spring-kafka' // kafka
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client' // eureka client 사용시
}
application.yml
server:
port: 0
spring:
application:
name: chat-service
datasource:
url: jdbc:mysql://localhost:3306/mysql?useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul&allowPublicKeyRetrieval=true
username: [username]
password: [password]
driver-class-name: com.mysql.cj.jdbc.Driver
eureka:
instance:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${spring.application.instance_id:${random.value}}
lease-renewal-interval-in-seconds: 10
lease-expiration-duration-in-seconds: 10
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://127.0.0.1:8761/eureka # eureka server
WebSocketConfig: websocket 관련 config
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
.setAllowedOriginPatterns("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(160 * 64 * 1024);
registry.setSendTimeLimit(100 * 10000);
registry.setSendBufferSizeLimit(3 * 512 * 1024);
}
}
ProducerConfiguration: producer 설정
@EnableKafka
@Configuration
@RequiredArgsConstructor
@Slf4j
public class ProducerConfiguration {
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}
// Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
@Bean
public Map<String, Object> producerConfigurations() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.build();
}
// KafkaTemplate을 생성하는 Bean 메서드
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
ConsumerConfiguration: consumer 설정
@EnableKafka
@Configuration
@RequiredArgsConstructor
@Slf4j
public class ConsumerConfiguration {
// KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// Kafka ConsumerFactory를 생성하는 Bean 메서드
@Bean
public ConsumerFactory<String, String> consumerFactory() {
JsonDeserializer<String> deserializer = new JsonDeserializer<>(String.class);
deserializer.addTrustedPackages("*");
// 패키지 신뢰 오류로 인해 모든 패키지를 신뢰하도록 작성
// ErrorHandlingDeserializer로 감싸기
ErrorHandlingDeserializer<String> errorHandlingValueDeserializer = new ErrorHandlingDeserializer<>(deserializer);
// Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaChatConstants.GROUP_ID) // GROUP_ID를 인스턴스마다 구분
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, errorHandlingValueDeserializer)
.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, deserializer.getClass().getName())
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build();
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), errorHandlingValueDeserializer);
}
}
KafkaChatConstants ( kafka 관련 상수 설정)
public class KafkaChatConstants {
private static String name = UUID.randomUUID().toString(); // group id을 식별하기 위함
public static final String GROUP_ID = name;
}
ChatMessageRequest: 채팅 메시지 요청 DTO
@Builder
public record ChatMessageRequest(String from, String text) implements Serializable {
}
kafka 통신하는 메시지는 네트워크를 통해 전송될 수 있도록 Serializable
을 상속 받아야 한다.
ChatMessageResponse: 채팅 메시지 응답 DTO
@Builder
public record ChatMessageResponse(Long id, Long roomId, String content, String writer) implements Serializable {
}
ChatMessageCreateCommand: 채팅 메시지 DB 저장을 위한 요청 entity
@Builder
public record ChatMessageCreateCommand(Long roomId, String content, String from) {
}
ChatMessageCreateUseCase(Service): ChatMessge를 저장하기 위한 Service
@RequiredArgsConstructor
class KafkaChatMessageConsumer {
private final SimpMessagingTemplate simpMessagingTemplate;
@KafkaListener(topics = "chat.room.message.sending")
public void sendMessage(ChatMessageResponse chatMessageResponse) {
simpMessagingTemplate.convertAndSend("/topic/public/rooms/"+chatMessageResponse.roomId(), chatMessageResponse);
}
}
KafkaProducer: kafka Client의 producer
@Component
@RequiredArgsConstructor
class KafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishMessage(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
}
KafkaChatMessageConsumer: kafka Client의 consumer
@RequiredArgsConstructor
@Component
class KafkaChatMessageConsumer {
private final SimpMessagingTemplate simpMessagingTemplate;
@KafkaListener(topics = "chat.room.message.sending")
public void sendMessage(ChatMessageResponse chatMessageResponse) {
simpMessagingTemplate.convertAndSend("/topic/public/rooms/"+chatMessageResponse.roomId(), chatMessageResponse);
}
}
ChatController: 채팅 메시지 컨트롤러
import com.vandemarket.chatservice.chat.adapter.in.web.dto.ChatMessageRequest;
import com.vandemarket.chatservice.chat.adapter.in.web.dto.ChatMessageResponse;
import com.vandemarket.chatservice.chat.application.port.in.ChatMessageCreateUseCase;
import com.vandemarket.chatservice.chat.application.port.in.command.ChatMessageCreateCommand;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.*;
import org.springframework.stereotype.Controller;
@Controller
@RequiredArgsConstructor
@Slf4j
class ChatController {
private final ChatMessageCreateUseCase chatMessageCreateUseCase; // service(MVC) 계층
private final KafkaProducer kafkaProducer;
public void sendMessage(@DestinationVariable Long roomId, @Payload ChatMessageRequest chatMessage) {
ChatMessageCreateCommand chatMessageCreateCommand = ChatMessageCreateCommand.builder()
.content(chatMessage.text())
.from(chatMessage.from())
.roomId(roomId)
.build();
Long chatId = chatMessageCreateUseCase.createChatMessage(chatMessageCreateCommand); // DB에 등록 후 Chat Message Id 반환
ChatMessageResponse chatMessageResponse = ChatMessageResponse.builder()
.id(chatId)
.roomId(roomId)
.content(chatMessage.text())
.writer(chatMessage.from())
.build();
kafkaProducer.publishMessage("chat.room.message.sending", chatMessageResponse);
}
}
Eureka Server 에 등록된 Chat Service 3개의 인스턴스들
채팅방 화면
현재 구성된 아키텍처 구조이다.
채팅 서비스와 같은 websocket방식은 실시간성을 위해 양방향으로 연결되어야 하기 때문에 여러 인스턴스에 대한 세션 동기화에 대해 고민해보았다. 그래서 kafka 및 pub/sub 방식 그리고 세션동기화하는 방식에 대해 고민함으로써 아키텍처를 좀 더 크게 생각하게 되는 계기가 되었다.
chatMessageCreateUseCase 코드가 잘못되어 있는걸로 보입니다~