
채팅 메세지를 보내면 RabbitMQ로 메세지 전송 후 MongoDB에 해당 메세지 백업 후 MySQL에 마지막으로 보낸 메세지와 날짜를 업데이트 한다.
이 3가지의 과정이 동기적으로 일어나므로, 이 또한 부하를 줄 수 있을 것으로 판단되어 비동기처리를 적용한다.
현재 RabbitMQ라는 메세지 큐를 사용하고 있기 때문에 비동기처리가 가능하다.
GET /api/v1/chat/rooms/{roomId}/messages 요청을 보낸다. (스크롤을 올릴 경우 ?lastMessageTimestamp=... 포함)ChatMessageController: 요청을 받아 ChatMessageReader(Service)를 호출한다.ChatMessageRepository: (roomId, timestamp) 복합 인덱스를 사용하여 MongoDB에서 과거 메시지를 효율적으로 조회(페이지네이션)한다.roomId에 대한 웹소켓 구독(subscribe)을 시작하여 실시간 메시지를 받을 준비 한다.실시간 전달과 영구 저장이 두 개의 경로로 나뉘어 동시에 처리된다.
[클라이언트 A] --(WebSocket SEND)--> [ChatController]
|
| (1. 메시지 발행)
V
[RabbitMQ Exchange]
|
+-----------------------------------+-----------------------------------+
| |
V (경로 A: 실시간 전달) V (경로 B: 비동기 저장)
[ChatMessageConsumer] --(2)--> [STOMP Broker] [ChatMessageBackupConsumer]
| |
| (3. 실시간 방송) | (4. 백그라운드 작업)
V V
[구독중인 모든 클라이언트 (A, B)] [MongoDB 저장 & RDBMS 업데이트]
ChatController는 findOrCreateRoom으로 roomId를 확정한 뒤, 받은 메시지를 RabbitMQ Exchange로 즉시 발행하고 자신의 역할을 마친다.ChatMessageConsumer가 메시지를 즉시 받아 STOMP 브로커를 통해 해당 roomId를 구독 중인 모든 클라이언트(A와 B 모두)에게 전달한다.ChatMessageBackupConsumer가 똑같은 메시지를 받아 백그라운드에서 MongoDB에 메시지를 저장하고, RDBMS의 lastMessage를 업데이트한다.현재 구조는 Topic Exchange 를 중심으로 하나의 메세지 발행을 통해 두 가지 다른 작업이 병렬적으로 처리되도록 설계되었다.
| 요소 | 이름 | 타입 | 역할 |
|---|---|---|---|
| Exchange | chat.exchange | Topic | 모든 채팅 메시지를 수신하여 적절한 큐로 라우팅하는 중앙 우체국 |
| Queue 1 | chat.queue | Queue | 실시간 메시지 중계를 위한 큐 |
| Queue 2 | chat.message.backup.queue | Queue | DB 저장을 위한 큐 |
| Binding 1 | chat.room.* | Binding | chat.exchange와 chat.queue를 연결. chat.room.1, chat.room.2 등과 일치. |
| Binding 2 | chat.room.# | Binding | chat.exchange와 backup.queue를 연결. chat.room으로 시작하는 모든 키와 일치. |
chat.room.1 O, chat.room.1.fast X)# (해시): 0개 이상의 단어를 대체 (예: chat.room.1 O, chat.room.1.fast O)@Configuration
class RabbitConfig(
private val rabbitProperties: RabbitProperties,
private val stompProperties: RabbitStompProperties,
) {
@Bean
fun chatQueue(): Queue {
return Queue(rabbitProperties.chatQueue.name, true)
}
@Bean
fun chatExchange(): TopicExchange {
return TopicExchange(rabbitProperties.chatExchange.name, true, false)
}
@Bean
fun chatBinding(): Binding {
return BindingBuilder
.bind(chatQueue())
.to(chatExchange())
.with(rabbitProperties.chatRouting.key + ".*")
}
@Bean
fun backupQueue(): Queue {
return Queue(rabbitProperties.backupQueue.name, true)
}
@Bean
fun backupBinding(chatExchange: TopicExchange, backupQueue: Queue): Binding {
return BindingBuilder
.bind(backupQueue)
.to(chatExchange)
.with(rabbitProperties.chatRouting.key + ".#")
}
...
}
chatExchange(): 모든 메세지가 처음 도착하는 TopicExchange를 생성chatQueue() / backupQueue(): 각각 “실시간 중계”와 “DB 백업”이라는 다른 목적을 가진 독립된 큐를 생성chatBinding() / backupBinding(): chat.exchange로 들어온 메세지를 어떤 규칙(라우팅 키)에 따라 각 큐에 복사하여 전달할지 정의한다. chat.room.# 바인딩 덕붕네 모든 채팅 메세지가 backupQueue에 전달되는 것이 보장된다.class ChatMessageConsumer(
private val messagingTemplate: SimpMessagingTemplate,
) {
@RabbitListener(queues = ["#{rabbitProperties.chatQueue.name}"])
fun receiveChatMessage(message: ChatMessage) {
messagingTemplate.convertAndSend("/topic/chat.${message.roomId}", message)
}
}
ChatMessageConsumer: 실시간 메시지 중계기@RabbitListener(queues = ...): chat.queue를 구독receiveChatMessage 메서드가 실행된다.messagingTemplate.convertAndSend(...): RabbitMQ로부터 받은 메시지를 STOMP 브로커를 통해 /topic/chat.{roomId} 토픽으로 전달한다.class ChatMessageBackupConsumer(
private val chatMessageService: ChatMessageService,
private val chatRoomService: ChatRoomService,
) {
private val logger = LoggerFactory.getLogger(ChatMessageBackupConsumer::class.java)
@RabbitListener(queues = ["#{rabbitProperties.backupQueue.name}"])
fun handleMessageBackup(message: ChatMessage) {
try {
chatMessageService.backupMessage(message)
chatRoomService.updateLastMessage(message.roomId, message.content)
logger.info("비동기 메시지 백업 및 업데이트 성공. Room ID: {}", message.roomId)
} catch (e: Exception) {
logger.error("비동기 메시지 백업 실패. Message: {}", message, e)
}
}
}
ChatMessageBackupConsumer: 비동기 데이터베이스 작업자@RabbitListener(queues = ...): chat.message.backup.queue를 구독handleMessageBackup(...): 이 메서드는 ChatController와 완전히 분리된 별도의 스레드에서 실행된다.chatMessageService.backupMessage(message): 메시지 본문을 MongoDB에 저장chatRoomService.updateLastMessage(...): 채팅방의 마지막 메시지 정보를 RDBMS에 업데이트ChatMessageConsumer와는 아무런 관련이 없으므로 사용자 경험에 영향을 주지 않는다.