RabbitMQ를 활용한 채팅방 구현 (2)

LSH·2025년 6월 23일

RabbitMQ

목록 보기
2/2

핵심 코드만 추가하였고, 전체 코드는 깃허브를 통해 확인이 가능합니다.

서비스 구성

앞서 설계한 서비스(RabbitMQ, MongoDB, Redis, MySQL)를 실행하기 위해 docker-compose를 작성
RabbitMQ는 STOMP 플러그인 활성화를 위한 명령어를 추가

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
      - "61613:61613"
    command: >
      /bin/bash -c "rabbitmq-plugins enable --offline rabbitmq_management rabbitmq_stomp && rabbitmq-server"
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

  mongodb:
    image: mongo:latest
    container_name: mongodb
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: 1234
      MONGO_INITDB_DATABASE: rabbitmq
    ports:
      - "27017:27017"
    networks:
      - my_network

  redis:
    image: redis:alpine
    container_name: redis
    hostname: redis
    ports:
      - "6379:6379"
    networks:
      - my_network

  mysql:
    image: mysql:8.0
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: 1234
      MYSQL_DATABASE: rabbitmq
    ports:
      - "3306:3306"
    networks:
      - my_network

networks:
  my_network:

연동

Spring과 각 서비스를 연동

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/rabbitmq?serverTimezone=Asia/Seoul&characterEncoding=UTF-8
    username: root
    password: 1234
  jpa:
    hibernate:
      ddl-auto: create
  data:
    mongodb:
      uri: mongodb://root:1234@localhost:27017/rabbitmq?authSource=admin&authMechanism=SCRAM-SHA-1
      username: root
      password: 1234
    redis:
      host: localhost
      port: 6379

jwt:
  secret: lsh2613 #jwt secret key
  access:
    expiration: 1800000
    header: Authorization
  refresh:
    expiration: 259200000
    header: Authorization-Refresh

rabbitmq:
  host: "localhost"
  port: 5672
  virtual-host: "/"
  username: "guest"
  password: "guest"
  relay:
    port: 61613
    system-login: guest
    system-passcode: guest
    client-login: guest
    client-passcode: guest
  chat:
    queue:
      name: "chat.queue"
    exchange:
      name: "chat.exchange"
    routing:
      key: "*.room."

logging:
  level:
    org.springframework.data.redis.connection: DEBUG
    org.springframework.cache: DEBUG
@Configuration
@EnableRabbit
@RequiredArgsConstructor
public class RabbitConfig {

    @Value("${rabbitmq.chat.queue.name}")
    private String chatQueueName;
    @Value("${rabbitmq.chat.exchange.name}")
    private String chatExchangeName;
    @Value("${rabbitmq.chat.routing.key}")
    private String routingKey;

    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.port}")
    private int port;
    @Value("${rabbitmq.virtual-host}")
    private String virtualHost;
    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;

    // Queue 등록
    @Bean
    public Queue queue() {
        return new Queue(chatQueueName, true);
    }

    // Exchange 등록
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(chatExchangeName);
    }

    // Exchange와 Queue바인딩
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(routingKey);
    }

    // RabbitMQ와의 메시지 통신을 담당하는 클래스
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());

        rabbitTemplate.setExchange(chatExchangeName);
        return rabbitTemplate;
    }

    // RabbitMQ와의 연결을 관리하는 클래스
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setVirtualHost(virtualHost);
        factory.setUsername(username);
        factory.setPassword(password);
        return factory;
    }
}
  • new TopicExchange(chatExchangeName) -> 설정한 이름으로 TopicExchange를 생성
  • return BindingBuilder.bind(queue).to(exchange).with(routingKey) -> queue와 exchange를 routingKey(여기선 .room.)를 통해 바인딩
  • rabbitTemplate.setExchange(chatExchangeName) -> 이 세팅을 통해 RabbitTemplate을 통해 메시지 전송 시 따로 전송할 exchangeName을 설정할 필요 없음
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.relay.port}")
    private int port;
    @Value("${rabbitmq.relay.system-login}")
    private String systemLogin;
    @Value("${rabbitmq.relay.client-passcode}")
    private String systemPasscode;
    @Value("${rabbitmq.relay.client-login}")
    private String clientLogin;
    @Value("${rabbitmq.relay.client-passcode}")
    private String clientPasscode;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // socketJs 클라이언트가 WebSocket 핸드셰이크를 하기 위해 연결할 endpoint를 지정할 수 있다.
        registry.addEndpoint("/chat/inbox")
                .setAllowedOriginPatterns("*"); // cors 허용을 위해 꼭 설정해주어야 함. setCredential() 설정시에 AllowedOrigin 과 같이 사용될 경우 오류가 날 수 있으므로 OriginPatterns 설정으로 사용하였음
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 메시지 브로커 설정
        registry.setPathMatcher(new AntPathMatcher(".")); // url을 chat/room/3 -> chat.room.3으로 참조하기 위한 설정

        registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
                .setRelayHost(host)
                .setRelayPort(port)
                .setSystemLogin(systemLogin)
                .setSystemPasscode(systemPasscode)
                .setClientLogin(clientLogin)
                .setClientPasscode(clientPasscode);

        // 클라이언트로부터 메시지를 받을 api의 prefix를 설정함
        // publish
        registry.setApplicationDestinationPrefixes("/pub");

    }
}
  • registry.addEndpoint("/chat/inbox") -> ws://{서버ip}:{port}/chat/inbox를 통해 소켓 연결
  • registry.enableStompBrokerRelay(...) -> stomp 외부 메시지 브로커 사용 허가
  • registry.setApplicationDestinationPrefixes("/pub") -> 메시지 보낼 저장될 큐의 prefix

채팅방 입장/퇴장(CONNECT/DISCONNECT)

채팅방 입장 시 처리해야 되는 로직은 다음과 같다.
1. Redis에 채팅방 입장 유저 추가
2. 나의 마지막 접속 시간 이후에 생성된 메시지 존재 && 현재 채팅방에 입장해있는 유저가 존재한다면, 먼저 입장해있는 유저에게 '싱크 요청' 메시지 전송

먼저 이 프로젝트에서는 STOMP 헤더에 JWT, chat-room-id를 추가하여 유저를 식별하고, 접속하려는 채팅방 정보를 전달받았다.

public class JwtAuthenticationInterceptor implements ChannelInterceptor {
    private final TokenUtil tokenUtil;
    private final StompHeaderAccessorUtil stompHeaderAccessorUtil;
    private final ChatRoomService chatRoomService;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();

        if (command==StompCommand.CONNECT)
            handleConnect(accessor);
        if (command==StompCommand.DISCONNECT)
            handleDisconnect(accessor);

        return message;
    }

    private void handleConnect(StompHeaderAccessor accessor) {
        String accessToken = tokenUtil.extractToken(accessor, TokenType.ACCESS_TOKEN);
        Long memberId = tokenUtil.validateTokenAndGetMemberId(accessToken);
        stompHeaderAccessorUtil.setMemberIdInSession(accessor, memberId);

        Long chatRoomId = stompHeaderAccessorUtil.getChatRoomIdInHeader(accessor);
        stompHeaderAccessorUtil.setChatRoomIdInSession(accessor, chatRoomId);

        chatRoomService.enterChatRoom(memberId, chatRoomId);
    }

    private void handleDisconnect(StompHeaderAccessor accessor) {
        Long memberId = stompHeaderAccessorUtil.removeMemberIdInSession(accessor);
        Long chatRoomId = stompHeaderAccessorUtil.removeChatRoomIdInSession(accessor);
        chatRoomService.exitChatRoom(memberId, chatRoomId);
    }
}
public class ChatRoomService {
    public void enterChatRoom(Long memberId, Long chatRoomId) {
        Member member = entityFacade.getMember(memberId);

        ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);

        redisChatUtil.enterChatRoom(chatRoomId, memberId);
        readUnreadMessages(chatRoom, member.getId());
    }

    private void readUnreadMessages(ChatRoom chatRoom, Long memberId) {
        ChatRoomMember chatRoomMember = chatRoomMemberRepository.findByChatRoomIdAndMemberId(chatRoom.getId(), memberId)
                .orElseThrow(() -> new RuntimeException("채팅방 참가자를 찾을 수 없습니다"));
        LocalDateTime lastEntryTime = chatRoomMember.getLastEntryTime();

        boolean existsUnreadMessage = chatMessageRepository.existsByChatRoomIdAndCreatedAtAfter(chatRoom.getId(), lastEntryTime);
        boolean existsOnlineChatRoomMember = redisChatUtil.getOnlineChatRoomMemberCnt(chatRoom.getId()) > 1; // 1은 본인

        if (existsUnreadMessage && existsOnlineChatRoomMember)
            sendChatSyncRequestMessage(chatRoom.getId());
    }
    
        @Transactional
    public void exitChatRoom(Long memberId, Long chatRoomId) {
        Member member = entityFacade.getMember(memberId);

        ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);

        ChatRoomMember chatRoomMember = chatRoomMemberRepository.findByChatRoomIdAndMemberId(chatRoom.getId(), memberId)
                .orElseThrow(() -> new RuntimeException("채팅방 참가자를 찾을 수 없습니다"));

        chatRoomMember.updateLastEntryTime();

        redisChatUtil.exitChatRoom(chatRoom.getId(), member.getId());
    }

채팅 메시지 전송

JwtAuthenticationInterceptor에서 유저를 인증하고 세션에 userId, chatRoomId를 저장해뒀기 때문에 message만 전달받아 그대로 전송해주기만 하면 된다.

중요한 점은, 메시지를 보낼 때도 '안 읽은 수'를 계산해줘야 한다.
이 계산은 메시지 조회 API와 다르게 '채팅방 인원' - '현재 해당 채팅방에 참가 중인 유저 수'로 계산된다. 따라서 조회 시점과, 전송 시점으로 구분되어 메소드명을 만들어놨다.

public class ChatMessageController {
    private final ChatMessageService chatMessageService;

    /**
     * Destination Queue: /pub/chat.message를 통해 호출 후 처리 되는 로직
     */
    @MessageMapping("chat.message")
    public void sendMessage(StompHeaderAccessor accessor, ChatMessageReq message) {
        chatMessageService.sendMessage(accessor, message);
    }

public class ChatMessageService {
    @Transactional
    public void sendMessage(StompHeaderAccessor accessor, ChatMessageReq req) {
        Long memberId = stompHeaderAccessorUtil.getMemberIdInSession(accessor);
        Member member = entityFacade.getMember(memberId);

        Long chatRoomId = stompHeaderAccessorUtil.getChatRoomIdInSession(accessor);
        ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);

        ChatMessage chatMessage = saveChatMessage(req, chatRoom, member);

        int unreadCnt = calculateUnreadCntAtPublish(chatRoom.getId());
        sendMessage(member, chatMessage, unreadCnt, chatRoom);
    }
}

채팅 조회

채팅 조회는 성능상 모두 조회보단 무한스크롤로 적용해야 되지만, 편의상 생략했다.

'안 읽은 수'는 앞서 설명했던 것처럼 '채팅방 인원' - '현재 입장 중인 유저 수' - '현재 입장 중이지 않은 유저 중 마지막 입장 시간>메시지 생성시간을 만족하는 유저의 수'로 계산된다.

public class ChatMessageService {

    @Transactional(readOnly = true)
    public List<MessageRes> getChatMessages(Long chatRoomId) {
        ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);

        List<ChatMessage> chatMessages = chatMessageRepository.findByChatRoomIdOrderByCreatedAtAsc(chatRoom.getId());

        List<MessageRes> messageResList = chatMessages.stream()
                .map(chatMessage -> {
                    /*
                    현재 접속 중인 채팅방 유저를 제외하고 나머지 채팅방 유저의 마지막 입장 시간을 가져옴
                    그 중에서 메시지 생성 시간보다 늦은 시간에 입장한 유저(해당 메시지를 읽지 않았다는 의미)의 수를 세어줌
                    unreadCnt = 채팅방 유저 수 - 현재 접속 중인 유저 수 - 메시지 생성 시간보다 늦은 시간에 입장한 유저 수
                     */
                    int unreadCnt = calculateUnreadCntAtReadTime(chatRoom.getId(), chatMessage.getCreatedAt());
                    Member member = entityFacade.getMember(chatMessage.getMemberId());
                    return ChatMessageRes.createRes(member.getNickname(), chatMessage, unreadCnt);
                })
                .toList();

        return messageResList;
    }
        private int calculateUnreadCntAtReadTime(Long chatRoomId, LocalDateTime messageCreatedAt) {
        List<ChatRoomMember> chatRoomMembers = chatRoomMemberRepository.findAllByChatRoomId(chatRoomId);
        Set<Long> onlineChatRoomMembers = redisChatUtil.getOnlineChatRoomMembers(chatRoomId);
        List<LocalDateTime> lastEntryTimes = getLastEntryTimesExcludingOnlineMembers(chatRoomMembers, onlineChatRoomMembers);

        int memberCntAfterMessageCreated = (int) lastEntryTimes.stream()
                .filter(time -> time.isAfter(messageCreatedAt))
                .count();

        return chatRoomMembers.size() - onlineChatRoomMembers.size() - memberCntAfterMessageCreated;
    }

    private List<LocalDateTime> getLastEntryTimesExcludingOnlineMembers(List<ChatRoomMember> chatRoomMembers, Set<Long> onlineMemberIds) {
        return chatRoomMembers.stream()
                .filter(chatRoomMember -> !onlineMemberIds.contains(chatRoomMember.getMember().getId()))
                .map(ChatRoomMember::getLastEntryTime)
                .toList();
    }
}

시연 영상

채팅 테스트를 위해 직접 STOMP.js를 활용하여 페이지를 제작했다. 하는 김에 간단한 API docs도 추가했다.

핵심 기능

  1. 메시지 발행/조회 시 안 읽은 수 계산
  2. 새로운 유저 입장 시 채팅 싱크 맞추기

0개의 댓글