Redis의 Publish/Subscribe 기능은 ChannelTopic값을 가지고 같은 Topic
을 구독하고 있는 사용자에게 메세지를 송/수신한다.
처음 채팅방을 생성할 때, 이 Topic
을 room + roomId.toString()
으로 생성하여 RedisMessageListenerContainer
에 MessageListener
를 구현한 RedisSubscriber
와 생성한 Topic
을 새로운 MessageListener
로 등록하여 메세지 송수신에 사용한다.
@Service
@Slf4j
@RequiredArgsConstructor
public class RoomService {
private final Map<String, ChannelTopic> topics;
private final RedisMessageListenerContainer redisMessageListener;
private final RedisSubscriber redisSubscriber;
public Long createRoom(long receiverId, MemberDetails memberDetails) {
Member receiver = memberService.validateVerifyMember(receiverId);
Member sender = memberService.validateVerifyMember(memberDetails.getMemberId());
ChatRoom chatRoom =
ChatRoom
.builder()
.sender(sender)
.receiver(receiver)
.build();
ChatRoom saveChatRoom = roomRepository.save(chatRoom);
// 토픽 생성
String roomId = "room" + saveChatRoom.getRoomId();
if(!topics.containsKey(topicRoomId)) {
ChannelTopic topic = new ChannelTopic(topicRoomId);
redisMessageListener.addMessageListener(redisSubscriber, topic);
topics.put(topicRoomId, topic);
}
return saveChatRoom.getRoomId();
}
public ChatRoom findRoom(long roomId) {
ChatRoom chatRoom = findExistRoom(roomId);
return chatRoom;
}
}
@Configuration
@RequiredArgsConstructor
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> chatRedisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> chatRedisTemplate = new RedisTemplate<>();
chatRedisTemplate.setConnectionFactory(connectionFactory);
chatRedisTemplate.setKeySerializer(new StringRedisSerializer());
chatRedisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return chatRedisTemplate;
}
// redis pub/sub 메세지를 처리하는 listener 설정
@Bean
public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
@Service
@RequiredArgsConstructor
public class RedisPublisher {
@Resource(name = "chatRedisTemplate")
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, PublishMessage message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
@Service
@RequiredArgsConstructor
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
@Resource(name = "chatRedisTemplate")
private final RedisTemplate<String, Object> redisTemplate;
private final SimpMessageSendingOperations messagingTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
PublishMessage chatMessage = objectMapper.readValue(publishMessage, PublishMessage.class);
messagingTemplate.convertAndSend("/sub/chats/" + chatMessage.getRoomId(), chatMessage);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
🚨 메세지 전송에는 문제가 발생하지 않았으나 서버를 재시작하였을 때, 이미 생성된 채팅방에 메세지가 전송되지 않는 문제점이 발생했다.
로그를 여러번 찍어 확인해 본 결과, ChannelTopic과 MessageListener의 정보를 어디에도 저장하지 않아 발생한 문제였다.
@Service
@Slf4j
@RequiredArgsConstructor
public class RoomService {
private final Map<String, ChannelTopic> topics;
private final RedisMessageListenerContainer redisMessageListener;
private final RedisSubscriber redisSubscriber;
public Long createRoom(long receiverId, MemberDetails memberDetails) {
Member receiver = memberService.validateVerifyMember(receiverId);
Member sender = memberService.validateVerifyMember(memberDetails.getMemberId());
ChatRoom chatRoom =
ChatRoom
.builder()
.sender(sender)
.receiver(receiver)
.build();
ChatRoom saveChatRoom = roomRepository.save(chatRoom);
String roomId = "room" + saveChatRoom.getRoomId();
createTopic(roomId);
return saveChatRoom.getRoomId();
}
// 채팅방 하나 찾기
public ChatRoom findRoom(long roomId) {
ChatRoom chatRoom = findExistRoom(roomId);
String topicRoomId = "room" + chatRoom.getRoomId();
createTopic(topicRoomId);
return chatRoom;
}
...
private void createTopic(String topicRoomId) {
if(!topics.containsKey(topicRoomId)) {
ChannelTopic topic = new ChannelTopic(topicRoomId);
redisMessageListener.addMessageListener(redisSubscriber, topic);
topics.put(topicRoomId, topic);
}
}
}
🚨 그렇다면 생성할 때 뿐만 아니라 채팅방 정보를 가져올 때의 로직에도 Topic을 생성하고, MessageListener를 등록하면 어떨까?
다행히도 문제가 없었고, 서버가 재시작 되더라도 메세지 송수신이 원활하게 이루어졌다.🤔 근데 조금 그렇지 않나? 이미 만들어진 Topic을 굳이 또 만들고 MessageListener에 등록하는 과정이 효율적이지 않다는 생각이 들었다.
메세지가 송신되면 RedisSubscriber
클래스에서 SimpMessageSendingOperations
에 의해 수신된다.
messagingTemplate.convertAndSend("/sub/chats/" + chatMessage.getRoomId(), chatMessage);
메세지를 보낼 채팅방의 URL를roomId
을 통해 전달하기 때문에, 채팅방마다 다른 Topic, MessageListener를 생성할 필요가 없다
따라서, 공통으로 사용할ChannelTopic
과MessageListener
을 Bean으로 등록하였다.
@Configuration
@RequiredArgsConstructor
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> chatRedisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> chatRedisTemplate = new RedisTemplate<>();
chatRedisTemplate.setConnectionFactory(connectionFactory);
chatRedisTemplate.setKeySerializer(new StringRedisSerializer());
chatRedisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return chatRedisTemplate;
}
// ChannelTopic 등록
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("chatroom");
}
// MessageListenerAdapter 등록
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber redisSubscriber) {
return new MessageListenerAdapter(redisSubscriber, "sendMessage");
}
// 등록한 ChannelTopic, MessageListenerAdapter로 RedisMessageListenerContainer 등록
@Bean
public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
ChannelTopic channelTopic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, channelTopic);
return container;
}
}
@Service
@RequiredArgsConstructor
public class RedisSubscriber{
private final ObjectMapper objectMapper;
private final SimpMessageSendingOperations messagingTemplate;
public void sendMessage(String message) {
try {
PublishMessage publishMessage = objectMapper.readValue(message, PublishMessage.class);
messagingTemplate.convertAndSend("/sub/chats/" + publishMessage.getRoomId(), publishMessage);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class RoomService {
private final MemberService memberService;
private final RoomRepository roomRepository;
public Long createRoom(long receiverId, MemberDetails memberDetails) {
Member receiver = memberService.validateVerifyMember(receiverId);
Member sender = memberService.validateVerifyMember(memberDetails.getMemberId());
ChatRoom chatRoom =
ChatRoom
.builder()
.sender(sender)
.receiver(receiver)
.build();
ChatRoom saveChatRoom = roomRepository.save(chatRoom);
return saveChatRoom.getRoomId();
}
public ChatRoom findRoom(long roomId) {
ChatRoom chatRoom = findExistRoom(roomId);
return chatRoom;
}
☝️ RedisPublisher는 MessageController와 통합하였다.
@RestController
@RequiredArgsConstructor
public class MessageController {
private final ChatService chatService;
private final ChatMapper mapper;
private final ChannelTopic topic;
@Resource(name = "chatRedisTemplate")
private final RedisTemplate redisTemplate;
@MessageMapping("/chats/messages/{room-id}")
public void message(@DestinationVariable("room-id") Long roomId, MessageDto messageDto) {
PublishMessage publishMessage =
new PublishMessage(messageDto.getRoomId(), messageDto.getSenderId(), messageDto.getContent(), LocalDateTime.now());
redisTemplate.convertAndSend(topic.getTopic(), publishMessage);
chatService.saveMessage(messageDto, roomId);
}
}
개인 학습 내용이기 때문에 잘못된 정보가 있을 수 있습니다.
잘못된 정보는 댓글로 알려주시면 바로 수정할 수 있도록 하겠습니다. 🙇