[Spring] WebSocket으로 채팅 구현하기 - Redis pub/sub을 이용한 여러 대의 채팅 서버 간 메시지 공유

김강욱·2024년 5월 14일
0

Spring

목록 보기
11/17
post-thumbnail

이번 포스팅은 지난 포스팅에 이어 Redis PUB/SUB을 이용하여 여러 대의 채팅 서버 간의 메시지를 공유하는 시간을 가져보도록 하겠습니다.

이전 포스팅 - [Spring] WebSocket으로 채팅 구현하기 - STOMP를 이용한 채팅 고도화

이전 포스팅에서 구현했던 방식에는 몇가지 문제가 발생합니다.

1. 서버 재시작 시 채팅방 정보 리셋

채팅방의 메인 저장소가 서버에서 관리하고 있기 때문에 서버 재시작 시 해당 메모리가 초기화되는 문제가 발생합니다.

이를 해결하기 위해 외부 데이터 저장소를 사용하여야합니다. 외부 데이터 저장소는 DB를 사용하거나 인메모리 저장소를 사용할 수 있습니다.

2. 채팅 서버가 여러 대일 경우 서버 간 채팅방을 공유할 수 없다

채팅방을 WebSocketStomp PUB/SUB을 이용하여 구현하였기 때문에 PUB/SUB가 발생한 서버 내에서만 메시지를 주고 받는 것이 가능합니다.

즉, 구독 대상인 채팅방(Topic)이 생성된 서버 안에서만 유효하므로 다른 서버로 접속한 클라이언트는 해당 채팅방이 보이지 않고 구독도 불가능합니다.

이를 해결하기 위해 공통으로 사용할 수 있는 PUB/SUB 시스템을 구축하고 모든 서버들이 해당 시스템을 통하여 PUB/SUB 메시지를 주고 받도록 Redis PUB/SUB 시스템을 사용하여 다중 서버 간 채팅 메시지 공유를 구현해보도록 하겠습니다.

🚗 Redis PUB/SUB 적용하기


build.gradle 설정하기

implementation 'org.springframework.boot:spring-boot-starter-data-redis'
//embedded-redis
// 버전 0.7.2를 써야한다 안그러면 loggin 중복 에러난다
compile group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'

로컬 환경에서 Redis 설치 없이 간단하게 스프링 부트의 Embedded Redis를 사용하여 환경 구축을 하였습니다.


Embedded Redis 서버 사용을 위한 설정

package com.websocket.chat.config;
// import 생략...

/**
 * 로컬 환경일경우 내장 레디스가 실행됩니다.
 */
@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void redisServer() {
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        if (redisServer != null) {
            redisServer.stop();
        }
    }
}

채팅 서버가 실행될 때 Embedded Redis 서버도 동시에 실행될 수 있도록 EmbeddedRedisConfig 설정을 추가해줍니다.


Redis 설정

package com.websocket.chat.config;

// import 생략...

@Configuration
public class RedisConfig {

    /**
     * redis pub/sub 메시지를 처리하는 listener 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    /**
     * 어플리케이션에서 사용할 redisTemplate 설정
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }
}

Redis PUB/SUB 기능을 이용하기 위해 MessageListener 설정을 추가해야합니다. 그리고 애플리케이션 코드에서 Redis를 사용하기 위해 RedisTemplate 설정도 추가합니다.


Redis 발행 서비스 구현

package com.websocket.chat.pubsub;
// import 생략...
@RequiredArgsConstructor
@Service
public class RedisPublisher {
    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatMessage message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

채팅방에 입장하여 메시지를 작성하면 해당 메시지를 Redis Topic에 발행하는 기능의 서비스입니다.

해당 서비스를 통해 메시지를 발행하면 대기하고 있던 Redis Sub 서비스가 메시지를 구독자들에게 전달하게 됩니다.


Redis 구독 서비스 구현

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메시지가 발행(publish)되면 대기하고 있던 onMessage가 해당 메시지를 받아 처리한다.
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // redis에서 발행된 데이터를 받아 deserialize
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            // ChatMessage 객채로 맵핑
            ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
            // Websocket 구독자에게 채팅 메시지 Send
            messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}

Redis에서 메시지가 발행될 때까지 대기하다가 메시지가 발행되는 시점에 해당 메시지를 수신하여 처리하는 리스너입니다.

MessageListener를 상속받아 onMessage 메서드를 재작성하게 됩니다.

Redis에서 메시지가 발행되면 해당 메세지를 ChatMessage로 역직렬화하고 MessagingTemplate을 이용하여 채팅방의 모든 WebSocket 클라이언트들에게 메시지를 전달하도록 구현하였습니다.


ChatController 수정

package com.websocket.chat.controller;

// import 생략...

@RequiredArgsConstructor
@Controller
public class ChatController {

    private final RedisPublisher redisPublisher;
    private final ChatRoomRepository chatRoomRepository;

    /**
     * websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/message")
    public void message(ChatMessage message) {
        if (ChatMessage.MessageType.ENTER.equals(message.getType())) {
            chatRoomRepository.enterChatRoom(message.getRoomId());
            message.setMessage(message.getSender() + "님이 입장하셨습니다.");
        }
        // Websocket에 발행된 메시지를 redis로 발행한다(publish)
        redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
    }
}

클라이언트가 채팅방 입장 시 채팅방(Topic)에서 대화가 가능하도록 리스너를 연동해주는 enChatRoom 메서드를 사용합니다.

채팅방에 발행된 메시지는 서로 다른 서버에서 공유하기 위해 Redis Topic으로 발행하게 됩니다.


ChatRoomRepository 수정

@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
    // 채팅방(topic)에 발행되는 메시지를 처리할 Listner
    private final RedisMessageListenerContainer redisMessageListener;
    // 구독 처리 서비스
    private final RedisSubscriber redisSubscriber;
    // Redis
    private static final String CHAT_ROOMS = "CHAT_ROOM";
    private final RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, ChatRoom> opsHashChatRoom;
    // 채팅방의 대화 메시지를 발행하기 위한 redis topic 정보. 서버별로 채팅방에 매치되는 topic정보를 Map에 넣어 roomId로 찾을수 있도록 한다.
    private Map<String, ChannelTopic> topics;

    @PostConstruct
    private void init() {
        opsHashChatRoom = redisTemplate.opsForHash();
        topics = new HashMap<>();
    }

    public List<ChatRoom> findAllRoom() {
        return opsHashChatRoom.values(CHAT_ROOMS);
    }

    public ChatRoom findRoomById(String id) {
        return opsHashChatRoom.get(CHAT_ROOMS, id);
    }

    /**
     * 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
     */
    public ChatRoom createChatRoom(String name) {
        ChatRoom chatRoom = ChatRoom.create(name);
        opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
        return chatRoom;
    }

    /**
     * 채팅방 입장 : redis에 topic을 만들고 pub/sub 통신을 하기 위해 리스너를 설정한다.
     */
    public void enterChatRoom(String roomId) {
        ChannelTopic topic = topics.get(roomId);
        if (topic == null) {
            topic = new ChannelTopic(roomId);
            redisMessageListener.addMessageListener(redisSubscriber, topic);
            topics.put(roomId, topic);
        }
    }

    public ChannelTopic getTopic(String roomId) {
        return topics.get(roomId);
    }
}

채팅방 정보는 초기화 되지 않도록 생성 시 Redis Hash 구조에 저장하도록 처리합니다.

채팅방 정보를 조회할 때는 Redis Hash에 저장된 데이터를 불러오도록 메서드 내용을 수정합니다.

채팅방 입장 시에는 채팅방 Id로 Redis Topic을 조회하여 Redis PUB/SUB 메시지 리스너와 연동하게 됩니다.


ChatRoom Serialize

@Getter
@Setter
public class ChatRoom implements Serializable {

    private static final long serialVersionUID = 6494678977089006639L;

    private String roomId;
    private String name;

    public static ChatRoom create(String name) {
        ChatRoom chatRoom = new ChatRoom();
        chatRoom.roomId = UUID.randomUUID().toString();
        chatRoom.name = name;
        return chatRoom;
    }
}

Redis에 저장되는 객체들은 Serialize가 가능해야 하므로 Serializable을 참조하도록 선언하고 serialVersionUID을 설정해줍니다.


참고 자료
daddyprogrammer님의 WebSocket 채팅 서버 구현 시리즈

profile
TO BE DEVELOPER

0개의 댓글

관련 채용 정보