RabbitMQ 기반 메세지 저장 비동기 처리

임동혁 Ldhbenecia·2025년 8월 28일

SpringBoot

목록 보기
25/28
post-thumbnail

개요

채팅 메세지를 보내면 RabbitMQ로 메세지 전송 후 MongoDB에 해당 메세지 백업 후 MySQL에 마지막으로 보낸 메세지와 날짜를 업데이트 한다.

이 3가지의 과정이 동기적으로 일어나므로, 이 또한 부하를 줄 수 있을 것으로 판단되어 비동기처리를 적용한다.

현재 RabbitMQ라는 메세지 큐를 사용하고 있기 때문에 비동기처리가 가능하다.

핵심 시나리오별 데이터 흐름

A. 채팅방 입장 및 과거 메시지 조회 (PULL)

  1. 클라이언트: 채팅방 목록에서 특정 방을 클릭한다.
  2. API 요청: 클라이언트는 서버에 GET /api/v1/chat/rooms/{roomId}/messages 요청을 보낸다. (스크롤을 올릴 경우 ?lastMessageTimestamp=... 포함)
  3. ChatMessageController: 요청을 받아 ChatMessageReader(Service)를 호출한다.
  4. ChatMessageRepository: (roomId, timestamp) 복합 인덱스를 사용하여 MongoDB에서 과거 메시지를 효율적으로 조회(페이지네이션)한다.
  5. 응답: 조회된 과거 메시지 목록이 클라이언트로 반환되고, 화면에 표시된다.
  6. 웹소켓 구독: 클라이언트는 해당 roomId에 대한 웹소켓 구독(subscribe)을 시작하여 실시간 메시지를 받을 준비 한다.

B. 메시지 전송 및 실시간 수신 (PUSH & 비동기 저장)

실시간 전달영구 저장이 두 개의 경로로 나뉘어 동시에 처리된다.

[클라이언트 A] --(WebSocket SEND)--> [ChatController]
                                             |
                                             | (1. 메시지 발행)
                                             V
                                     [RabbitMQ Exchange]
                                             |
         +-----------------------------------+-----------------------------------+
         |                                                                       |
         V (경로 A: 실시간 전달)                                                   V (경로 B: 비동기 저장)
 [ChatMessageConsumer] --(2)--> [STOMP Broker]                             [ChatMessageBackupConsumer]
         |                                                                       |
         | (3. 실시간 방송)                                                         | (4. 백그라운드 작업)
         V                                                                       V
 [구독중인 모든 클라이언트 (A, B)]                                            [MongoDB 저장 & RDBMS 업데이트]
  1. 메시지 발행: ChatControllerfindOrCreateRoom으로 roomId를 확정한 뒤, 받은 메시지를 RabbitMQ Exchange로 즉시 발행하고 자신의 역할을 마친다.
  2. 경로 A (실시간): ChatMessageConsumer가 메시지를 즉시 받아 STOMP 브로커를 통해 해당 roomId를 구독 중인 모든 클라이언트(A와 B 모두)에게 전달한다.
  3. 실시간 방송: 클라이언트 A와 B는 DB와 상관없이 거의 즉시 화면에서 새 메시지를 보게 된다.
  4. 경로 B (저장): 위 과정과 동시에, ChatMessageBackupConsumer가 똑같은 메시지를 받아 백그라운드에서 MongoDB에 메시지를 저장하고, RDBMS의 lastMessage를 업데이트한다.
    이 작업은 실시간 전달 경로에 전혀 영향을 주지 않는다.

어떻게 두 경로로 갈릴 수 있는가?

현재 구조는 Topic Exchange 를 중심으로 하나의 메세지 발행을 통해 두 가지 다른 작업이 병렬적으로 처리되도록 설계되었다.

핵심 RabbitMQ 구성 요소

요소이름타입역할
Exchangechat.exchangeTopic모든 채팅 메시지를 수신하여 적절한 큐로 라우팅하는 중앙 우체국
Queue 1chat.queueQueue실시간 메시지 중계를 위한 큐
Queue 2chat.message.backup.queueQueueDB 저장을 위한 큐
Binding 1chat.room.*Bindingchat.exchangechat.queue를 연결. chat.room.1, chat.room.2 등과 일치.
Binding 2chat.room.#Bindingchat.exchangebackup.queue를 연결. chat.room으로 시작하는 모든 키와 일치.
  • (애스터리스크): 단어 하나를 대체 (예: chat.room.1 O, chat.room.1.fast X)
  • # (해시): 0개 이상의 단어를 대체 (예: chat.room.1 O, chat.room.1.fast O)
@Configuration
class RabbitConfig(
    private val rabbitProperties: RabbitProperties,
    private val stompProperties: RabbitStompProperties,
) {

    @Bean
    fun chatQueue(): Queue {
        return Queue(rabbitProperties.chatQueue.name, true)
    }

    @Bean
    fun chatExchange(): TopicExchange {
        return TopicExchange(rabbitProperties.chatExchange.name, true, false)
    }

    @Bean
    fun chatBinding(): Binding {
        return BindingBuilder
            .bind(chatQueue())
            .to(chatExchange())
            .with(rabbitProperties.chatRouting.key + ".*")
    }

    @Bean
    fun backupQueue(): Queue {
        return Queue(rabbitProperties.backupQueue.name, true)
    }

    @Bean
    fun backupBinding(chatExchange: TopicExchange, backupQueue: Queue): Binding {
        return BindingBuilder
            .bind(backupQueue)
            .to(chatExchange)
            .with(rabbitProperties.chatRouting.key + ".#")
    }

    ...
}
  • chatExchange(): 모든 메세지가 처음 도착하는 TopicExchange를 생성
  • chatQueue() / backupQueue(): 각각 “실시간 중계”와 “DB 백업”이라는 다른 목적을 가진 독립된 큐를 생성
  • chatBinding() / backupBinding(): chat.exchange로 들어온 메세지를 어떤 규칙(라우팅 키)에 따라 각 큐에 복사하여 전달할지 정의한다. chat.room.# 바인딩 덕붕네 모든 채팅 메세지가 backupQueue에 전달되는 것이 보장된다.
class ChatMessageConsumer(
    private val messagingTemplate: SimpMessagingTemplate,
) {

    @RabbitListener(queues = ["#{rabbitProperties.chatQueue.name}"])
    fun receiveChatMessage(message: ChatMessage) {
        messagingTemplate.convertAndSend("/topic/chat.${message.roomId}", message)
    }
}

ChatMessageConsumer: 실시간 메시지 중계기

  • @RabbitListener(queues = ...): chat.queue를 구독
    즉, 이 큐에 메시지가 들어올 때마다 receiveChatMessage 메서드가 실행된다.
  • messagingTemplate.convertAndSend(...): RabbitMQ로부터 받은 메시지를 STOMP 브로커를 통해 /topic/chat.{roomId} 토픽으로 전달한다.
    이 토픽을 구독하고 있는 모든 웹소켓 클라이언트(채팅방에 참여한 사용자들)는 이 메시지를 실시간으로 수신하게 된다.
class ChatMessageBackupConsumer(
    private val chatMessageService: ChatMessageService,
    private val chatRoomService: ChatRoomService,
) {

    private val logger = LoggerFactory.getLogger(ChatMessageBackupConsumer::class.java)

    @RabbitListener(queues = ["#{rabbitProperties.backupQueue.name}"])
    fun handleMessageBackup(message: ChatMessage) {
        try {
            chatMessageService.backupMessage(message)
            chatRoomService.updateLastMessage(message.roomId, message.content)
            logger.info("비동기 메시지 백업 및 업데이트 성공. Room ID: {}", message.roomId)
        } catch (e: Exception) {
            logger.error("비동기 메시지 백업 실패. Message: {}", message, e)
        }
    }
}

ChatMessageBackupConsumer: 비동기 데이터베이스 작업자

  • @RabbitListener(queues = ...): chat.message.backup.queue를 구독
  • handleMessageBackup(...): 이 메서드는 ChatController와 완전히 분리된 별도의 스레드에서 실행된다.
    1. chatMessageService.backupMessage(message): 메시지 본문을 MongoDB에 저장
    2. chatRoomService.updateLastMessage(...): 채팅방의 마지막 메시지 정보를 RDBMS에 업데이트
  • 장점: 이 작업들이 실패하거나 오래 걸리더라도, 실시간 메시지를 전달하는 ChatMessageConsumer와는 아무런 관련이 없으므로 사용자 경험에 영향을 주지 않는다.

0개의 댓글