[SpringBoot] 채팅의 pub/sub을 Redis로 처리하기 - 3

이의찬·2023년 7월 17일
0

Springboot

목록 보기
6/12

🐱 Github

기존에 ChatRoom을 HashMap에 저장하던 데이터를 Redis에 저장하면서 Redis의 pub/sub기능을 통해 메시지를 전송해보자.

이전 게시물과 채팅에 쓰일 레디스는 아래 문서에 있다.
채팅 기능에 Stomp 적용하기(+Vue) - 2
85. [SpringBoot] 채팅 서버에 사용될 Redis hashes

폴더 구조

구현

build.gradle

implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'

embedded-redis(Spring boot server 내장)를 사용하여서 구현할 것이다. 현재 컴퓨터에 redis가 다운받아져 있다면 두 번째 의존성과 이후 작성할 EmbeddedRedisConfig은 제외해도 된다.

(Embedded Redis는 일종의 H2 데이터베이스처럼 테스트 용도로 사용된다.)

application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379

💡 Spring Boot 3.X.X application.yml 설정

spring:
  data:
    redis:
      host: 127.0.0.1
      port: 6379

EmbeddedRedisConfig

먼저 Embedded Redis의 Config를 스프링 빈에 등록해줘야 한다.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import redis.embedded.RedisServer;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
public class EmbeddedRedisConfig {
    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void startRedisServer() {
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        if (redisServer != null) redisServer.stop();
    }
}

EmbeddedRedisConfig는 스프링 빈에 등록될 때 레디스 서버를 실행하고 생명주기가 끝날 때 서버를 멈추게 하였다.

RedisConfig

다음으로 레디스의 메시지 리스너와 RedisTemplate를 스프링 빈에 등록하자.

설정해야하는 부분이 있기 때문에 @Bean으로 등록하였다.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    /**
     * redis pub/sub 메시지를 처리하는 listener 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    /**
     * 어플리케이션에서 사용할 redisTemplate 설정
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }

}

먼저 RedisMessageListenerContainer는 pub/sub 구조에서 항상 발행된(pub) 데이터가 있는지 확인해야 하고 처리해야하기 때문에 Listener를 등록해줘야 한다.

RedisTemplate를 통해 SpringBoot에서 redis에 데이터를 넣고 뺄 수 있다.

ChatRoomRepository

Redis를 이용하여 채팅방 정보를 저장하기 때문에 repository를 생성하자

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
import site.leui.chat_example.chat.dto.ChatRoom;

import javax.annotation.PostConstruct;
import java.util.*;

@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
    private static final String CHAT_ROOMS = "CHAT_ROOM";
    private final RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, ChatRoom> opsHashChatRoom;

    @PostConstruct
    private void init() {
        opsHashChatRoom = redisTemplate.opsForHash();
        redisTemplate.delete(CHAT_ROOMS);
        for (int i = 0; i < 5; i++) {
            ChatRoom chatRoom = ChatRoom.of("test_" + i);
            opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
        }
    }

    public List<ChatRoom> findAll() {
        return opsHashChatRoom.values(CHAT_ROOMS);
    }

    public ChatRoom findById(String id) {
        return opsHashChatRoom.get(CHAT_ROOMS, id);
    }

    public void save(ChatRoom chatRoom) {
        opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
    }
}

RedisTemplate와 HashOperations에 대해서는 85. [SpringBoot] 채팅 서버에 사용될 Redis hashes 이 문서에 정리해 두었다.

WebSocketConfig

v2와 같은 코드이다. Redis를 통해 publish를 하지만 여전히 Stomp를 사용하기 때문에 필요하다. (messageTemplate를 통해 메시지를 전달하는 방식은 v2와 같다.)

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/sub");
        registry.setApplicationDestinationPrefixes("/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-stomp")
                .withSockJS();
    }
}

RedisPublisher

redis publisher는 레디스로 topic과 메시지를 가지고 전송할 객체이다. 여기서 topic은 roomId이다.

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
import site.leui.chat_example.chat.dto.ChatMessage;

@RequiredArgsConstructor
@Service
public class RedisPublisher {
    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatMessage message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

RedisSubscriber

RedisSubscriber는 채팅방이 생성되었을 때 생성되어 위에 있는 RedisConfigRedisMessageListenerContainer에 메시지 리스너로서 들어가게 된다.

ChatRoomService enterChatRoom()

redis publisher가 publish를 하게되면 RedisMessageListenerContainer에 있는 리스너 중에 같은 topic을 가진 리스너의 onMessage()를 실행하게 된다.

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
import site.leui.chat_example.chat.dto.ChatMessage;

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
    private final ObjectMapper objectMapper;
    private final RedisTemplate<String, Object> redisTemplate;
    private final SimpMessageSendingOperations messageTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String publishMessage = redisTemplate
                    .getStringSerializer()
                    .deserialize(message.getBody());

            ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
            messageTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

onMessage()를 보면 messageTemplate를 사용하는 것을 볼 수 있다. 이는 v2에서 다루었던 방식과 일치한다.

ChatController

/pub/chat/message url로 들어온 발행을 처리할 수 있는 컨트롤러를 만들어야 한다. v2와 거의 일치하는데 publish를 redis를 통해 하게된다.

import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import site.leui.chat_example.chat.dto.ChatMessage;
import site.leui.chat_example.chat.service.ChatRoomService;
import site.leui.chat_example.base.redis.service.RedisPublisher;

@RequiredArgsConstructor
@Controller
public class ChatController {

    private final RedisPublisher redisPublisher;
    private final ChatRoomService chatRoomService;

    @MessageMapping("/chat/message")
    public void sendMessage(ChatMessage message) {
        if (isJoin(message)) {
            chatRoomService.enterChatRoom(message.getRoomId());
            message.setMessage(message.getSender() + "님이 입장하셨습니다.");
        }
        redisPublisher.publish(chatRoomService.getTopic(message.getRoomId()), message);
    }

    private boolean isJoin(ChatMessage messageType) {
        return messageType.getMessageType().equals(ChatMessage.MessageType.JOIN);
    }
}

redisPublisher.publish()를 하게 되면 redisTemplate를 통해 publish가 되고 적절한 리스너인 RedisSubscriber를 찾아서 onMessage()를 실행하게 된다.

onMessage()에서는 v2와 마찬가지로 messageTemplate.convertAndSend()를 통해 메시지를 구독자에게 전달한다.

흐름

이제 전체 흐름을 보자

  1. WebSocketConfig를 통해 구독과 발행에 대해 설정해 두었다.
  2. /pub/chat/messageurl로 chat message가 들어오면 redis에 roomId(topic)을 가지고 publish를 한다.
  3. Redis Message Listener Container에서 이 topic을 가지고 messageTemplate을 통해 전송한다.
  4. Vue에서 이미 구독을 하고 있던 session들에게 메시지를 전송한다.

핵심 코드만 설명되어 있기 때문에 전체 코드는 github에서 확인할 수 있다.

1개의 댓글

comment-user-thumbnail
2023년 7월 17일

글 잘 봤습니다, 많은 도움이 되었습니다.

답글 달기