이번 프로젝트는 유저들간에 소통이 필요한 앱이기 때문에, 요구사항에 채팅 기능이 포함되어 있다.
WebSocket, Spring Scheduler의 개념과 사용 방법에 대해서는 이전에 포스팅한 적이 있다.
사실, 단일 서버를 사용한다면 굳이 외부 메시지 브로커를 사용할 이유가 없다. 1개의 서버에서 메시지 전송 요청을 받고, publish 하면 모든 유저에게 메시지가 전달되기 때문이다.
하지만 다중 서버를 사용하는 경우에는 이 방법을 사용하면 안된다.
기본적으로 Stomp 통신 시 메시지 브로커와 메시지 큐는 스프링 서버 내부 메모리에 존재
한다.
만일 서버가 여러개라면, 사용자들은 각각 다른 서버에 연결되어 웹 소켓 통신을 하고 있을 것이다. 따라서 서버 1을 통해 publish된 메시지는 서버 1에 연결되어 subscribe하는 유저들에게만 전달될 것이다. 하지만 데이터는 연결된 서버와 상관없이, 해당 channel을 subscribe 하고 있는 모든 유저에게 전송되어야 한다.
이를 해결하기 위해서는 Kafka
와 같은 외부 메시지 브로커를 사용하면 이 문제를 해결할 수 있다.
Redis Pub/Sub의 경우 Kafka에 비해 단점이 있기 때문에 잘 사용하지 않지만, 나는 이미 Redis를 사용하고 있었고, 단지 학습 차원에서 메시지 브로커를 사용하는 것이기 때문에 Redis Pub/Sub를 사용하게 되었다.
토큰 저장을 위한 RedisConfig (TokenRedisConfig)와 분리하여 관리한다.
RedisTemplate
, RedisMessageListenerContainer
, MessageListenerAdapter
를 등록한다.@Configuration
public class ChatRedisConfig {
@Value("${spring.redis.chat.host}")
private String host;
@Value("${spring.redis.chat.port}")
private int port;
@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(chatRedisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class)); // Default serializer
return redisTemplate;
}
@Bean
public RedisConnectionFactory chatRedisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(host);
redisStandaloneConfiguration.setPort(port);
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Bean
@Qualifier("chatRedisTemplate")
public RedisTemplate<String, ChatMessageResponse> chatRedisTemplate() {
RedisTemplate<String, ChatMessageResponse> chatRedisTemplate = new RedisTemplate<>();
chatRedisTemplate.setConnectionFactory(chatRedisConnectionFactory());
chatRedisTemplate.setKeySerializer(new StringRedisSerializer());
chatRedisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(ChatMessageResponse.class));
return chatRedisTemplate;
}
@Bean
@Qualifier("chatMessageRedisTemplate")
public RedisTemplate<String, ChatMessage> chatMessageRedisTemplate() {
RedisTemplate<String, ChatMessage> chatMessageRedisTemplate = new RedisTemplate<>();
chatMessageRedisTemplate.setConnectionFactory(chatRedisConnectionFactory());
chatMessageRedisTemplate.setKeySerializer(new StringRedisSerializer());
chatMessageRedisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(ChatMessage.class));
return chatMessageRedisTemplate;
}
//Redis Channel(Topic)로부터 메시지를 받고, 주입된 리스너들에게 비동기적으로 dispatch 하는 역할을 수행하는 컨테이너이다.
//즉, 발행된 메시지 처리를 위한 listenerAdapter를 설정할 수 있다.
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,
ChannelTopic channelTopic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(chatRedisConnectionFactory());
container.addMessageListener(listenerAdapter, new PatternTopic("chatRoom:*"));
return container;
}
//RedisMessageListenerContainer로부터 메시지를 받고, 실제 메시지를 처리하는 역할을 하는 Subscriber Bean을 추가해준다.
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "onMessage");
}
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("chatRoom:*");
}
}
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry){
//stomp 연결 url -> ex)ws://api.mo-gether.site/ws
registry.addEndpoint("/ws")
.setAllowedOrigins(allowedOrigins)
.addInterceptors(new WebSocketHandshakeInterceptor());
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry){
registry.enableSimpleBroker("/sub"); //메시지 수신 요청 endpoint
registry.setApplicationDestinationPrefixes("/pub"); //메시지 발신 요청 endpoint
}
}
처음에는 ChatMessage가 유저 id 정도만 갖고 있어도 되지 않을까? 라고 생각했지만, 다음과 같은 고려사항들로 인해 닉네임과 이미지url까지 포함시켰다.
- 채팅 메시지를 불러올때 각 메시지마다 닉네임/이미지url이 필요하며, 추가적인 dto 변환 작업이 수행되어야 한다.
- 또한 현재 채팅방에 존재하지 않는 유저가 보냈던 메시지라면, RDB로의 추가 쿼리가 불가피하며, 채팅 메시지는 아주 많이 이루어지기 때문에 몇 개의 추가 쿼리가 생길지 가늠하기 힘들다.
@Entity
public class ChatMessage implements Serializable {
@Id
private String id;
private Long roomId;
private Long senderId;
private String senderNickname;
private String senderImageUrl;
private String message;
private String createdAt;
}
User - ChatRoom은 N:N 관계이기 때문에, 중간 테이블(ChatRoomUser)을 사이에 두어 매핑하였다.
@Entity
public class ChatRoom implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private String gatherType;
private Long gatherId;
@OneToMany(mappedBy = "chatRoom", cascade = REMOVE)
private List<ChatRoomUser> chatRoomUserList = new ArrayList<>();
public ChatRoom(String gatherType, Long gatherId, String roomName) {
this.gatherType = gatherType;
this.gatherId = gatherId;
this.name = roomName;
}
}
특정 채팅방의 채팅 목록 조회가 대부분이기 때문에,prefix + roomId 조합의 KEY를 지정
하였다.
chatMessageRedisTemplate 외에도 refreshTokenRedisTemplate 등 다른 redisTemplate이 존재하기 때문에, @Qualifier
를 통해 우선순위를 지정하였다.
@Repository
public class RedisChatMessageRepository {
private static final String ROOM_KEY_PREFIX = "chatMessage:room:";
private final RedisTemplate<String, ChatMessage> chatMessageRedisTemplate;
private final HashOperations<String, String, ChatMessage> hashOperations;
public RedisChatMessageRepository(@Qualifier("chatMessageRedisTemplate") RedisTemplate<String, ChatMessage> chatMessageRedisTemplate) {
this.chatMessageRedisTemplate = chatMessageRedisTemplate;
this.hashOperations = chatMessageRedisTemplate.opsForHash();
}
public Optional<ChatMessage> findById(Long roomId, String id) {
return Optional.ofNullable(hashOperations.get(getRoomKey(roomId), id));
}
public List<ChatMessage> findByRoomId(Long roomId) {
String roomKey = getRoomKey(roomId);
return hashOperations.values(roomKey);
}
public List<ChatMessage> findAll() {
Set<String> roomKeys = chatMessageRedisTemplate.keys(ROOM_KEY_PREFIX + "*");
List<ChatMessage> chatMessages = new ArrayList<>();
if (roomKeys != null) {
for (String roomKey : roomKeys) {
chatMessages.addAll(hashOperations.values(roomKey));
}
}
return chatMessages;
}
public void save(ChatMessage chatMessage) {
String roomKey = getRoomKey(chatMessage.getRoomId());
hashOperations.put(roomKey, chatMessage.getId(), chatMessage);
chatMessageRedisTemplate.expire(roomKey, Duration.ofHours(25));
}
public void saveAllToRedis(List<ChatMessage> messages) {
for (ChatMessage message : messages) {
String roomKey = getRoomKey(message.getRoomId());
hashOperations.put(roomKey, message.getId(), message);
chatMessageRedisTemplate.expire(roomKey, Duration.ofHours(25));
}
}
public void clearAll() {
Set<String> roomKeys = chatMessageRedisTemplate.keys(ROOM_KEY_PREFIX + "*");
if (roomKeys != null) {
for (String roomKey : roomKeys) {
chatMessageRedisTemplate.delete(roomKey);
}
}
}
public void deleteById(Long roomId, Long id) {
hashOperations.delete(getRoomKey(roomId), String.valueOf(id));
}
private String getRoomKey(Long roomId) {
return ROOM_KEY_PREFIX + roomId;
}
}
@MessageMapping
을 통해 메시지를 수신한다.@MessageMapping("/chat/message")
public void message(ChatMessageRequest chatMessageRequest) {
chatService.sendMessage(chatMessageRequest);
}
public void sendMessage(ChatMessageRequest request) {
//todo: user info caching
User user = userService.findById(request.getSenderId());
//채팅 생성/저장
ChatMessage chatMessage = createChatMessage(request, user);
redisChatMessageRepository.save(chatMessage);
String topic = createTopic(chatMessage.getRoomId());
ChatMessageResponse chatMessageResponse = ChatMessageResponse.of(chatMessage);
//publish
redisPublisher.publish(topic, chatMessageResponse);
}
@Service
public class RedisPublisher {
private final RedisTemplate<String, ChatMessageResponse> chatRedisTemplate;
public RedisPublisher(@Qualifier("chatRedisTemplate") RedisTemplate<String, ChatMessageResponse> chatRedisTemplate) {
this.chatRedisTemplate = chatRedisTemplate;
}
public void publish(String topic, ChatMessageResponse dto) {
chatRedisTemplate.convertAndSend(topic, dto);
}
}
/sub/chat/room/{roomId}
경로로 메시지를 전달하면, 해당 room을 subscribe한 유저들에게 메시지가 전달된다.@Slf4j
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate<String, ChatMessageResponse> chatRedisTemplate;
private final SimpMessageSendingOperations messagingTemplate;
public RedisSubscriber(@Qualifier("chatRedisTemplate") RedisTemplate<String, ChatMessageResponse> chatRedisTemplate,
ObjectMapper objectMapper,
SimpMessageSendingOperations messagingTemplate) {
this.chatRedisTemplate = chatRedisTemplate;
this.objectMapper = objectMapper;
this.messagingTemplate = messagingTemplate;
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
RedisSerializer<String> stringSerializer = chatRedisTemplate.getStringSerializer();
String publishMessage = stringSerializer.deserialize(message.getBody());
ChatMessageResponse chatMessageResponse = objectMapper.readValue(publishMessage, ChatMessageResponse.class);
messagingTemplate.convertAndSend("/sub/chat/room/" + chatMessageResponse.getRoomId(), chatMessageResponse);
} catch (Exception e) {
log.error("### {}", e.getMessage());
}
}
}
학습 차원에서 Spring Batch 사용도 고려했으나, 현재 요구사항에서는 필요하지 않기 때문에 Spring Scheduler만 사용하도록 했다.
사용자의 활동 시간이 가장 적은 새벽 시간에 insert 작업을 수행하도록 했다.
getLastSyncTime
- 이전에 동기화 작업이 진행된 시간을 가져온다.
redisChatMessageRepository.findMessagesAfter(lastSyncTime);
: getLastSyncTime()을 기준으로 이후에 저장된 메시지만 가져와서 RDB에 반영하도록 했다.
updateLastSyncTime()
동기화 작업이 끝난 후, 현재 시간을 기록한다.
@RequiredArgsConstructor
@Component
public class ChatMessageScheduler {
private final ChatMessageRepository chatMessageRepository;
private final RedisChatMessageRepository redisChatMessageRepository;
private final LastSyncTimeRepository lastSyncTimeRepository;
@Transactional
@Scheduled(cron = "0 0 4 * * *") //매일 4AM Redis-MySQL 동기화 작업
public void applyToRDB() {
log.info("### Scheduler 실행");
LocalDateTime lastSyncTime = getLastSyncTime();
List<ChatMessage> newMessages = redisChatMessageRepository.findMessagesAfter(lastSyncTime);
chatMessageRepository.saveAll(newMessages);
updateLastSyncTime();
}
private LocalDateTime getLastSyncTime() {
String lastSyncTime = lastSyncTimeRepository.getLastSyncTime();
return TimeConverter.toLocalDateTime(lastSyncTime);
}
private void updateLastSyncTime() {
String currentTime = TimeConverter.toString(LocalDateTime.now());
lastSyncTimeRepository.updateLastSyncTime(currentTime);
}
}
추가적으로 Redis의 데이터 손실을 고려해 RDB의 데이터를 Redis에 동기화하는 작업도 진행할 수 있을것 같다.