이번 포스팅은 지난 포스팅에 이어 Redis PUB/SUB
을 이용하여 여러 대의 채팅 서버 간의 메시지를 공유하는 시간을 가져보도록 하겠습니다.
이전 포스팅에서 구현했던 방식에는 몇가지 문제가 발생합니다.
채팅방의 메인 저장소가 서버에서 관리하고 있기 때문에 서버 재시작 시 해당 메모리가 초기화되는 문제가 발생합니다.
이를 해결하기 위해 외부 데이터 저장소를 사용하여야합니다. 외부 데이터 저장소는 DB를 사용하거나 인메모리 저장소를 사용할 수 있습니다.
채팅방을 WebSocket
과 Stomp PUB/SUB
을 이용하여 구현하였기 때문에 PUB/SUB
가 발생한 서버 내에서만 메시지를 주고 받는 것이 가능합니다.
즉, 구독 대상인 채팅방(Topic
)이 생성된 서버 안에서만 유효하므로 다른 서버로 접속한 클라이언트는 해당 채팅방이 보이지 않고 구독도 불가능합니다.
이를 해결하기 위해 공통으로 사용할 수 있는 PUB/SUB
시스템을 구축하고 모든 서버들이 해당 시스템을 통하여 PUB/SUB
메시지를 주고 받도록 Redis PUB/SUB
시스템을 사용하여 다중 서버 간 채팅 메시지 공유를 구현해보도록 하겠습니다.
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
//embedded-redis
// 버전 0.7.2를 써야한다 안그러면 loggin 중복 에러난다
compile group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'
로컬 환경에서 Redis
설치 없이 간단하게 스프링 부트의 Embedded Redis
를 사용하여 환경 구축을 하였습니다.
package com.websocket.chat.config;
// import 생략...
/**
* 로컬 환경일경우 내장 레디스가 실행됩니다.
*/
@Profile("local")
@Configuration
public class EmbeddedRedisConfig {
@Value("${spring.redis.port}")
private int redisPort;
private RedisServer redisServer;
@PostConstruct
public void redisServer() {
redisServer = new RedisServer(redisPort);
redisServer.start();
}
@PreDestroy
public void stopRedis() {
if (redisServer != null) {
redisServer.stop();
}
}
}
채팅 서버가 실행될 때 Embedded Redis
서버도 동시에 실행될 수 있도록 EmbeddedRedisConfig
설정을 추가해줍니다.
package com.websocket.chat.config;
// import 생략...
@Configuration
public class RedisConfig {
/**
* redis pub/sub 메시지를 처리하는 listener 설정
*/
@Bean
public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
/**
* 어플리케이션에서 사용할 redisTemplate 설정
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
}
Redis PUB/SUB
기능을 이용하기 위해 MessageListener
설정을 추가해야합니다. 그리고 애플리케이션 코드에서 Redis
를 사용하기 위해 RedisTemplate
설정도 추가합니다.
package com.websocket.chat.pubsub;
// import 생략...
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, ChatMessage message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
채팅방에 입장하여 메시지를 작성하면 해당 메시지를 Redis Topic
에 발행하는 기능의 서비스입니다.
해당 서비스를 통해 메시지를 발행하면 대기하고 있던 Redis Sub
서비스가 메시지를 구독자들에게 전달하게 됩니다.
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate redisTemplate;
private final SimpMessageSendingOperations messagingTemplate;
/**
* Redis에서 메시지가 발행(publish)되면 대기하고 있던 onMessage가 해당 메시지를 받아 처리한다.
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// redis에서 발행된 데이터를 받아 deserialize
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
// ChatMessage 객채로 맵핑
ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
// Websocket 구독자에게 채팅 메시지 Send
messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
Redis
에서 메시지가 발행될 때까지 대기하다가 메시지가 발행되는 시점에 해당 메시지를 수신하여 처리하는 리스너입니다.
MessageListener
를 상속받아 onMessage
메서드를 재작성하게 됩니다.
Redis
에서 메시지가 발행되면 해당 메세지를 ChatMessage
로 역직렬화하고 MessagingTemplate
을 이용하여 채팅방의 모든 WebSocket
클라이언트들에게 메시지를 전달하도록 구현하였습니다.
package com.websocket.chat.controller;
// import 생략...
@RequiredArgsConstructor
@Controller
public class ChatController {
private final RedisPublisher redisPublisher;
private final ChatRoomRepository chatRoomRepository;
/**
* websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
*/
@MessageMapping("/chat/message")
public void message(ChatMessage message) {
if (ChatMessage.MessageType.ENTER.equals(message.getType())) {
chatRoomRepository.enterChatRoom(message.getRoomId());
message.setMessage(message.getSender() + "님이 입장하셨습니다.");
}
// Websocket에 발행된 메시지를 redis로 발행한다(publish)
redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
}
}
클라이언트가 채팅방 입장 시 채팅방(Topic
)에서 대화가 가능하도록 리스너를 연동해주는 enChatRoom
메서드를 사용합니다.
채팅방에 발행된 메시지는 서로 다른 서버에서 공유하기 위해 Redis Topic
으로 발행하게 됩니다.
@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
// 채팅방(topic)에 발행되는 메시지를 처리할 Listner
private final RedisMessageListenerContainer redisMessageListener;
// 구독 처리 서비스
private final RedisSubscriber redisSubscriber;
// Redis
private static final String CHAT_ROOMS = "CHAT_ROOM";
private final RedisTemplate<String, Object> redisTemplate;
private HashOperations<String, String, ChatRoom> opsHashChatRoom;
// 채팅방의 대화 메시지를 발행하기 위한 redis topic 정보. 서버별로 채팅방에 매치되는 topic정보를 Map에 넣어 roomId로 찾을수 있도록 한다.
private Map<String, ChannelTopic> topics;
@PostConstruct
private void init() {
opsHashChatRoom = redisTemplate.opsForHash();
topics = new HashMap<>();
}
public List<ChatRoom> findAllRoom() {
return opsHashChatRoom.values(CHAT_ROOMS);
}
public ChatRoom findRoomById(String id) {
return opsHashChatRoom.get(CHAT_ROOMS, id);
}
/**
* 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
*/
public ChatRoom createChatRoom(String name) {
ChatRoom chatRoom = ChatRoom.create(name);
opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
return chatRoom;
}
/**
* 채팅방 입장 : redis에 topic을 만들고 pub/sub 통신을 하기 위해 리스너를 설정한다.
*/
public void enterChatRoom(String roomId) {
ChannelTopic topic = topics.get(roomId);
if (topic == null) {
topic = new ChannelTopic(roomId);
redisMessageListener.addMessageListener(redisSubscriber, topic);
topics.put(roomId, topic);
}
}
public ChannelTopic getTopic(String roomId) {
return topics.get(roomId);
}
}
채팅방 정보는 초기화 되지 않도록 생성 시 Redis Hash
구조에 저장하도록 처리합니다.
채팅방 정보를 조회할 때는 Redis Hash
에 저장된 데이터를 불러오도록 메서드 내용을 수정합니다.
채팅방 입장 시에는 채팅방 Id로 Redis Topic
을 조회하여 Redis PUB/SUB
메시지 리스너와 연동하게 됩니다.
@Getter
@Setter
public class ChatRoom implements Serializable {
private static final long serialVersionUID = 6494678977089006639L;
private String roomId;
private String name;
public static ChatRoom create(String name) {
ChatRoom chatRoom = new ChatRoom();
chatRoom.roomId = UUID.randomUUID().toString();
chatRoom.name = name;
return chatRoom;
}
}
Redis
에 저장되는 객체들은 Serialize
가 가능해야 하므로 Serializable
을 참조하도록 선언하고 serialVersionUID
을 설정해줍니다.