이번 포스팅에서는 이전 포스팅의 구현 방식에서 Redis Topic
공유를 통해 메시지 전송 프로세스를 간소화하는 작업을 진행하도록 하겠습니다.
package com.websocket.chat.config;
// import ... 내용 생략
@RequiredArgsConstructor
@Configuration
public class RedisConfig {
/**
* 단일 Topic 사용을 위한 Bean 설정
*/
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("chatroom");
}
/**
* redis에 발행(publish)된 메시지 처리를 위한 리스너 설정
*/
@Bean
public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
ChannelTopic channelTopic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, channelTopic);
return container;
}
/**
* 실제 메시지를 처리하는 subscriber 설정 추가
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "sendMessage");
}
............. 내용 생략
}
ChannelTopic
와 RedisMessageListenerContainer
스프링 빈으로 등록하여 토픽과 메시지 리스너를 단일화 시켜주었습니다.
MessageListenerAdapter
를 생성하여 메시지를 구독자에게 보내는 역할을 하는 스프링 빈으로 추가해주었습니다.
메시지 리스너의 단일화 및 RedisTemplate
을 이용하여 메시지를 발행할 수 있기 때문에 기능 대체가 가능합니다.
package com.websocket.chat.pubsub;
// import ... 생략
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber {
private final ObjectMapper objectMapper;
private final SimpMessageSendingOperations messagingTemplate;
/**
* Redis에서 메시지가 발행(publish)되면 대기하고 있던 Redis Subscriber가 해당 메시지를 받아 처리한다.
*/
public void sendMessage(String publishMessage) {
try {
// ChatMessage 객채로 맵핑
ChatMessage chatMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
// 채팅방을 구독한 클라이언트에게 메시지 발송
messagingTemplate.convertAndSend("/sub/chat/room/" + chatMessage.getRoomId(), chatMessage);
} catch (Exception e) {
log.error("Exception {}", e);
}
}
}
Redis
설정에서 추가한 설정으로 인해 메시지 리스너에 메시지가 수신되면 RedisSubscriber.sendMessage
가 수행되게 됩니다.
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "sendMessage");
}
수신된 메시지는 /sub/chat/room/{roomId}
를 구독한 WebSocket
클라이언트에게 메시지가 발송됩니다.
@MessageMapping("/chat/message")
public void message(ChatMessage message, @Header("token") String token) {
String nickname = jwtTokenProvider.getUserNameFromJwt(token);
// 로그인 회원 정보로 대화명 설정
message.setSender(nickname);
// 채팅방 입장시에는 대화명과 메시지를 자동으로 세팅한다.
if (ChatMessage.MessageType.ENTER.equals(message.getType())) {
message.setSender("[알림]");
message.setMessage(nickname + "님이 입장하셨습니다.");
}
// Websocket에 발행된 메시지를 redis로 발행(publish)
redisTemplate.convertAndSend(channelTopic.getTopic(), message);
}
RedisPublisher
가 삭제되었기 때문에 RedisTemplate
을 통해 바로 ChannelTopic
으로 메시지를 발행하도록 수정하였습니다.
@RequiredArgsConstructor
@Service
public class ChatRoomRepository {
// Redis
private static final String CHAT_ROOMS = "CHAT_ROOM";
private final RedisTemplate<String, Object> redisTemplate;
private HashOperations<String, String, ChatRoom> opsHashChatRoom;
@PostConstruct
private void init() {
opsHashChatRoom = redisTemplate.opsForHash();
}
// 모든 채팅방 조회
public List<ChatRoom> findAllRoom() {
return opsHashChatRoom.values(CHAT_ROOMS);
}
// 특정 채팅방 조회
public ChatRoom findRoomById(String id) {
return opsHashChatRoom.get(CHAT_ROOMS, id);
}
// 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
public ChatRoom createChatRoom(String name) {
ChatRoom chatRoom = ChatRoom.create(name);
opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
return chatRoom;
}
}
채팅방 입장 시 새로운 Topic
을 생성하고, RedisMessageListener
와 연동시키던 작업이 모두 필요없게 되었으므로 코드를 간소화하였습니다.