채팅 기능 구현하기 (1) - feat. Redis Pub/Sub, Scheduler

bagt13·2024년 9월 15일
1

Project

목록 보기
18/19

이번 프로젝트는 유저들간에 소통이 필요한 앱이기 때문에, 요구사항에 채팅 기능이 포함되어 있다.

WebSocket, Spring Scheduler의 개념과 사용 방법에 대해서는 이전에 포스팅한 적이 있다.

Spring에서 웹 소켓(WebSocket) 사용하기

[Project] Spring Scheduler로 조회수 로직 캐싱 구현하기 (feat. Redis)


📚 Redis Pub/Sub를 사용한 이유

🔴 문제

사실, 단일 서버를 사용한다면 굳이 외부 메시지 브로커를 사용할 이유가 없다. 1개의 서버에서 메시지 전송 요청을 받고, publish 하면 모든 유저에게 메시지가 전달되기 때문이다.

하지만 다중 서버를 사용하는 경우에는 이 방법을 사용하면 안된다.

기본적으로 Stomp 통신 시 메시지 브로커와 메시지 큐는 스프링 서버 내부 메모리에 존재한다.

만일 서버가 여러개라면, 사용자들은 각각 다른 서버에 연결되어 웹 소켓 통신을 하고 있을 것이다. 따라서 서버 1을 통해 publish된 메시지는 서버 1에 연결되어 subscribe하는 유저들에게만 전달될 것이다. 하지만 데이터는 연결된 서버와 상관없이, 해당 channel을 subscribe 하고 있는 모든 유저에게 전송되어야 한다.

🟢 해결

이를 해결하기 위해서는 Kafka와 같은 외부 메시지 브로커를 사용하면 이 문제를 해결할 수 있다.

출처: https://www.linkedin.com/pulse/redis-pubsub-vs-streams-osama-ahmed/

Redis Pub/Sub의 경우 Kafka에 비해 단점이 있기 때문에 잘 사용하지 않지만, 나는 이미 Redis를 사용하고 있었고, 단지 학습 차원에서 메시지 브로커를 사용하는 것이기 때문에 Redis Pub/Sub를 사용하게 되었다.


♻️ ChatRedisConfig

토큰 저장을 위한 RedisConfig (TokenRedisConfig)와 분리하여 관리한다.

  • Redis Pub/Sub에 필요한 RedisTemplate, RedisMessageListenerContainer, MessageListenerAdapter를 등록한다.
@Configuration
public class ChatRedisConfig {

    @Value("${spring.redis.chat.host}")
    private String host;

    @Value("${spring.redis.chat.port}")
    private int port;

    @Bean
    @Primary
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(chatRedisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class)); // Default serializer
        return redisTemplate;
    }


    @Bean
    public RedisConnectionFactory chatRedisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(host);
        redisStandaloneConfiguration.setPort(port);
        return new LettuceConnectionFactory(redisStandaloneConfiguration);
    }

    @Bean
    @Qualifier("chatRedisTemplate")
    public RedisTemplate<String, ChatMessageResponse> chatRedisTemplate() {
        RedisTemplate<String, ChatMessageResponse> chatRedisTemplate = new RedisTemplate<>();
        chatRedisTemplate.setConnectionFactory(chatRedisConnectionFactory());
        chatRedisTemplate.setKeySerializer(new StringRedisSerializer());
        chatRedisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(ChatMessageResponse.class));
        return chatRedisTemplate;
    }

    @Bean
    @Qualifier("chatMessageRedisTemplate")
    public RedisTemplate<String, ChatMessage> chatMessageRedisTemplate() {
        RedisTemplate<String, ChatMessage> chatMessageRedisTemplate = new RedisTemplate<>();
        chatMessageRedisTemplate.setConnectionFactory(chatRedisConnectionFactory());
        chatMessageRedisTemplate.setKeySerializer(new StringRedisSerializer());
        chatMessageRedisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(ChatMessage.class));
        return chatMessageRedisTemplate;
    }

    //Redis Channel(Topic)로부터 메시지를 받고, 주입된 리스너들에게 비동기적으로 dispatch 하는 역할을 수행하는 컨테이너이다.
    //즉, 발행된 메시지 처리를 위한 listenerAdapter를 설정할 수 있다.
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,
                                                                       ChannelTopic channelTopic) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(chatRedisConnectionFactory());
        container.addMessageListener(listenerAdapter, new PatternTopic("chatRoom:*"));
        return container;
    }

    //RedisMessageListenerContainer로부터 메시지를 받고, 실제 메시지를 처리하는 역할을 하는 Subscriber Bean을 추가해준다.
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
        return new MessageListenerAdapter(subscriber, "onMessage");
    }

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("chatRoom:*");
    }
}

♻️ WebSocketConfig

  • stomp 연결 endpoint, allowedOrigins 등을 추가한다.
  • 또한, websocket handshake 시 토큰 검증을 위한 커스텀 인터셉터를 추가한다.
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry){
        //stomp 연결 url -> ex)ws://api.mo-gether.site/ws
        registry.addEndpoint("/ws")
                .setAllowedOrigins(allowedOrigins)
                .addInterceptors(new WebSocketHandshakeInterceptor());
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry){
        registry.enableSimpleBroker("/sub"); //메시지 수신 요청 endpoint
        registry.setApplicationDestinationPrefixes("/pub");   //메시지 발신 요청 endpoint
    }
}


Domain

♻️ ChatMessage

처음에는 ChatMessage가 유저 id 정도만 갖고 있어도 되지 않을까? 라고 생각했지만, 다음과 같은 고려사항들로 인해 닉네임과 이미지url까지 포함시켰다.

  • 채팅 메시지를 불러올때 각 메시지마다 닉네임/이미지url이 필요하며, 추가적인 dto 변환 작업이 수행되어야 한다.
  • 또한 현재 채팅방에 존재하지 않는 유저가 보냈던 메시지라면, RDB로의 추가 쿼리가 불가피하며, 채팅 메시지는 아주 많이 이루어지기 때문에 몇 개의 추가 쿼리가 생길지 가늠하기 힘들다.
@Entity
public class ChatMessage implements Serializable {

    @Id
    private String id;

    private Long roomId;
    private Long senderId;
    private String senderNickname;
    private String senderImageUrl;
    private String message;

    private String createdAt;
}

♻️ ChatRoom

User - ChatRoom은 N:N 관계이기 때문에, 중간 테이블(ChatRoomUser)을 사이에 두어 매핑하였다.

@Entity
public class ChatRoom implements Serializable {

     @Id
     @GeneratedValue(strategy = GenerationType.IDENTITY)
     private Long id;

     private String name;

     private String gatherType;
     private Long gatherId;

     @OneToMany(mappedBy = "chatRoom", cascade = REMOVE)
     private List<ChatRoomUser> chatRoomUserList = new ArrayList<>();

     public ChatRoom(String gatherType, Long gatherId, String roomName) {
          this.gatherType = gatherType;
          this.gatherId = gatherId;
          this.name = roomName;
     }
}

♻️ RedisChatRoomRepository

  • 특정 채팅방의 채팅 목록 조회가 대부분이기 때문에,prefix + roomId 조합의 KEY를 지정하였다.

  • chatMessageRedisTemplate 외에도 refreshTokenRedisTemplate 등 다른 redisTemplate이 존재하기 때문에, @Qualifier를 통해 우선순위를 지정하였다.

@Repository
public class RedisChatMessageRepository {

    private static final String ROOM_KEY_PREFIX = "chatMessage:room:";

    private final RedisTemplate<String, ChatMessage> chatMessageRedisTemplate;
    private final HashOperations<String, String, ChatMessage> hashOperations;

    public RedisChatMessageRepository(@Qualifier("chatMessageRedisTemplate") RedisTemplate<String, ChatMessage> chatMessageRedisTemplate) {
        this.chatMessageRedisTemplate = chatMessageRedisTemplate;
        this.hashOperations = chatMessageRedisTemplate.opsForHash();
    }

    public Optional<ChatMessage> findById(Long roomId, String id) {
        return Optional.ofNullable(hashOperations.get(getRoomKey(roomId), id));
    }

    public List<ChatMessage> findByRoomId(Long roomId) {
        String roomKey = getRoomKey(roomId);
        return hashOperations.values(roomKey);
    }

    public List<ChatMessage> findAll() {
        Set<String> roomKeys = chatMessageRedisTemplate.keys(ROOM_KEY_PREFIX + "*");
        List<ChatMessage> chatMessages = new ArrayList<>();

        if (roomKeys != null) {
            for (String roomKey : roomKeys) {
                chatMessages.addAll(hashOperations.values(roomKey));
            }
        }

        return chatMessages;
    }

    public void save(ChatMessage chatMessage) {
        String roomKey = getRoomKey(chatMessage.getRoomId());
        hashOperations.put(roomKey, chatMessage.getId(), chatMessage);
        chatMessageRedisTemplate.expire(roomKey, Duration.ofHours(25));
    }

    public void saveAllToRedis(List<ChatMessage> messages) {
        for (ChatMessage message : messages) {
            String roomKey = getRoomKey(message.getRoomId());
            hashOperations.put(roomKey, message.getId(), message);
            chatMessageRedisTemplate.expire(roomKey, Duration.ofHours(25));
        }
    }

    public void clearAll() {
        Set<String> roomKeys = chatMessageRedisTemplate.keys(ROOM_KEY_PREFIX + "*");
        if (roomKeys != null) {
            for (String roomKey : roomKeys) {
                chatMessageRedisTemplate.delete(roomKey); 
            }
        }
    }

    public void deleteById(Long roomId, Long id) {
        hashOperations.delete(getRoomKey(roomId), String.valueOf(id));
    }

    private String getRoomKey(Long roomId) {
        return ROOM_KEY_PREFIX + roomId;
    }
}

♻️ ChatController, ChatService

  • Stomp 통신을 위해 @MessageMapping을 통해 메시지를 수신한다.
@MessageMapping("/chat/message")
    public void message(ChatMessageRequest chatMessageRequest) {
        chatService.sendMessage(chatMessageRequest);
    }
  • 메시지를 전송할때마다 유저 정보를 조회하는 비효율을 개선하기 위해 유저 정보도 caching 한다.
  • 채팅 메시지를 생성하고, redis에 저장한다. (이후 스케쥴러를 통해 RDB에 insert한다)
  • roomId를 기반으로 topic을 생성하고, subscribe 한 유저들에게 전송할 응답 dto를 생성 후 publish 한다.
public void sendMessage(ChatMessageRequest request) {
        //todo: user info caching
        User user = userService.findById(request.getSenderId());

        //채팅 생성/저장
        ChatMessage chatMessage = createChatMessage(request, user);
        redisChatMessageRepository.save(chatMessage);

        String topic = createTopic(chatMessage.getRoomId());
        ChatMessageResponse chatMessageResponse = ChatMessageResponse.of(chatMessage);

        //publish
        redisPublisher.publish(topic, chatMessageResponse);
    }

♻️ Publisher, Subscriber

  • Redis Channel에 메시지를 publish 하는 역할을 한다.
@Service
public class RedisPublisher { 
	
    private final RedisTemplate<String, ChatMessageResponse> chatRedisTemplate;

    public RedisPublisher(@Qualifier("chatRedisTemplate") RedisTemplate<String, ChatMessageResponse> chatRedisTemplate) {
        this.chatRedisTemplate = chatRedisTemplate;
    }

    public void publish(String topic, ChatMessageResponse dto) {
        chatRedisTemplate.convertAndSend(topic, dto);
    }
}
  • Publisher를 통해 전송된 메세지를 받는 역할을 수행한다.
  • publish 된 메세지가 존재할때, 해당 메시지에 대한 작업을 수행한다.
  • /sub/chat/room/{roomId} 경로로 메시지를 전달하면, 해당 room을 subscribe한 유저들에게 메시지가 전달된다.
@Slf4j
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate<String, ChatMessageResponse> chatRedisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    public RedisSubscriber(@Qualifier("chatRedisTemplate") RedisTemplate<String, ChatMessageResponse> chatRedisTemplate,
                           ObjectMapper objectMapper,
                           SimpMessageSendingOperations messagingTemplate) {
        this.chatRedisTemplate = chatRedisTemplate;
        this.objectMapper = objectMapper;
        this.messagingTemplate = messagingTemplate;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            RedisSerializer<String> stringSerializer = chatRedisTemplate.getStringSerializer();
            String publishMessage = stringSerializer.deserialize(message.getBody());

            ChatMessageResponse chatMessageResponse = objectMapper.readValue(publishMessage, ChatMessageResponse.class);
            messagingTemplate.convertAndSend("/sub/chat/room/" + chatMessageResponse.getRoomId(), chatMessageResponse);
        } catch (Exception e) {
            log.error("### {}", e.getMessage());
        }
    }
}

♻️ Scheduler

학습 차원에서 Spring Batch 사용도 고려했으나, 현재 요구사항에서는 필요하지 않기 때문에 Spring Scheduler만 사용하도록 했다.

  • 사용자의 활동 시간이 가장 적은 새벽 시간에 insert 작업을 수행하도록 했다.

  • getLastSyncTime - 이전에 동기화 작업이 진행된 시간을 가져온다.

  • redisChatMessageRepository.findMessagesAfter(lastSyncTime); : getLastSyncTime()을 기준으로 이후에 저장된 메시지만 가져와서 RDB에 반영하도록 했다.

  • updateLastSyncTime() 동기화 작업이 끝난 후, 현재 시간을 기록한다.

@RequiredArgsConstructor
@Component
public class ChatMessageScheduler {

    private final ChatMessageRepository chatMessageRepository;
    private final RedisChatMessageRepository redisChatMessageRepository;
	private final LastSyncTimeRepository lastSyncTimeRepository;

    @Transactional
    @Scheduled(cron = "0 0 4 * * *") //매일 4AM Redis-MySQL 동기화 작업
    public void applyToRDB() {
        log.info("### Scheduler 실행");
        LocalDateTime lastSyncTime = getLastSyncTime();
        List<ChatMessage> newMessages = redisChatMessageRepository.findMessagesAfter(lastSyncTime);

        chatMessageRepository.saveAll(newMessages);
        updateLastSyncTime();
    }

    private LocalDateTime getLastSyncTime() {
        String lastSyncTime = lastSyncTimeRepository.getLastSyncTime();
        return TimeConverter.toLocalDateTime(lastSyncTime);
    }

    private void updateLastSyncTime() {
        String currentTime = TimeConverter.toString(LocalDateTime.now());
        lastSyncTimeRepository.updateLastSyncTime(currentTime);
    }
}

추가적으로 Redis의 데이터 손실을 고려해 RDB의 데이터를 Redis에 동기화하는 작업도 진행할 수 있을것 같다.

profile
주니어 백엔드 개발자입니다😄

0개의 댓글