🐱 Github
기존에 ChatRoom을 HashMap에 저장하던 데이터를 Redis에 저장하면서 Redis의 pub/sub기능을 통해 메시지를 전송해보자.
이전 게시물과 채팅에 쓰일 레디스는 아래 문서에 있다.
채팅 기능에 Stomp 적용하기(+Vue) - 2
85. [SpringBoot] 채팅 서버에 사용될 Redis hashes
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'
embedded-redis(Spring boot server 내장)를 사용하여서 구현할 것이다. 현재 컴퓨터에 redis가 다운받아져 있다면 두 번째 의존성과 이후 작성할 EmbeddedRedisConfig
은 제외해도 된다.
(Embedded Redis는 일종의 H2 데이터베이스처럼 테스트 용도로 사용된다.)
spring:
redis:
host: 127.0.0.1
port: 6379
💡 Spring Boot 3.X.X application.yml 설정
spring:
data:
redis:
host: 127.0.0.1
port: 6379
먼저 Embedded Redis의 Config를 스프링 빈에 등록해줘야 한다.
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import redis.embedded.RedisServer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Configuration
public class EmbeddedRedisConfig {
@Value("${spring.redis.port}")
private int redisPort;
private RedisServer redisServer;
@PostConstruct
public void startRedisServer() {
redisServer = new RedisServer(redisPort);
redisServer.start();
}
@PreDestroy
public void stopRedis() {
if (redisServer != null) redisServer.stop();
}
}
EmbeddedRedisConfig는 스프링 빈에 등록될 때 레디스 서버를 실행하고 생명주기가 끝날 때 서버를 멈추게 하였다.
다음으로 레디스의 메시지 리스너와 RedisTemplate를 스프링 빈에 등록하자.
설정해야하는 부분이 있기 때문에 @Bean
으로 등록하였다.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
/**
* redis pub/sub 메시지를 처리하는 listener 설정
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
/**
* 어플리케이션에서 사용할 redisTemplate 설정
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
}
먼저 RedisMessageListenerContainer
는 pub/sub 구조에서 항상 발행된(pub) 데이터가 있는지 확인해야 하고 처리해야하기 때문에 Listener를 등록해줘야 한다.
RedisTemplate를 통해 SpringBoot에서 redis에 데이터를 넣고 뺄 수 있다.
Redis를 이용하여 채팅방 정보를 저장하기 때문에 repository를 생성하자
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
import site.leui.chat_example.chat.dto.ChatRoom;
import javax.annotation.PostConstruct;
import java.util.*;
@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
private static final String CHAT_ROOMS = "CHAT_ROOM";
private final RedisTemplate<String, Object> redisTemplate;
private HashOperations<String, String, ChatRoom> opsHashChatRoom;
@PostConstruct
private void init() {
opsHashChatRoom = redisTemplate.opsForHash();
redisTemplate.delete(CHAT_ROOMS);
for (int i = 0; i < 5; i++) {
ChatRoom chatRoom = ChatRoom.of("test_" + i);
opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
}
}
public List<ChatRoom> findAll() {
return opsHashChatRoom.values(CHAT_ROOMS);
}
public ChatRoom findById(String id) {
return opsHashChatRoom.get(CHAT_ROOMS, id);
}
public void save(ChatRoom chatRoom) {
opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
}
}
RedisTemplate와 HashOperations에 대해서는 85. [SpringBoot] 채팅 서버에 사용될 Redis hashes 이 문서에 정리해 두었다.
v2와 같은 코드이다. Redis를 통해 publish를 하지만 여전히 Stomp를 사용하기 때문에 필요하다. (messageTemplate를 통해 메시지를 전달하는 방식은 v2와 같다.)
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/sub");
registry.setApplicationDestinationPrefixes("/pub");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp")
.withSockJS();
}
}
redis publisher는 레디스로 topic과 메시지를 가지고 전송할 객체이다. 여기서 topic은 roomId
이다.
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
import site.leui.chat_example.chat.dto.ChatMessage;
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, ChatMessage message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
RedisSubscriber는 채팅방이 생성되었을 때 생성되어 위에 있는 RedisConfig
에 RedisMessageListenerContainer
에 메시지 리스너로서 들어가게 된다.
ChatRoomService enterChatRoom()
redis publisher가 publish를 하게되면 RedisMessageListenerContainer
에 있는 리스너 중에 같은 topic을 가진 리스너의 onMessage()
를 실행하게 된다.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
import site.leui.chat_example.chat.dto.ChatMessage;
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate<String, Object> redisTemplate;
private final SimpMessageSendingOperations messageTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String publishMessage = redisTemplate
.getStringSerializer()
.deserialize(message.getBody());
ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
messageTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
onMessage()
를 보면 messageTemplate를 사용하는 것을 볼 수 있다. 이는 v2에서 다루었던 방식과 일치한다.
/pub/chat/message url로 들어온 발행을 처리할 수 있는 컨트롤러를 만들어야 한다. v2와 거의 일치하는데 publish를 redis를 통해 하게된다.
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import site.leui.chat_example.chat.dto.ChatMessage;
import site.leui.chat_example.chat.service.ChatRoomService;
import site.leui.chat_example.base.redis.service.RedisPublisher;
@RequiredArgsConstructor
@Controller
public class ChatController {
private final RedisPublisher redisPublisher;
private final ChatRoomService chatRoomService;
@MessageMapping("/chat/message")
public void sendMessage(ChatMessage message) {
if (isJoin(message)) {
chatRoomService.enterChatRoom(message.getRoomId());
message.setMessage(message.getSender() + "님이 입장하셨습니다.");
}
redisPublisher.publish(chatRoomService.getTopic(message.getRoomId()), message);
}
private boolean isJoin(ChatMessage messageType) {
return messageType.getMessageType().equals(ChatMessage.MessageType.JOIN);
}
}
redisPublisher.publish()
를 하게 되면 redisTemplate
를 통해 publish
가 되고 적절한 리스너인 RedisSubscriber
를 찾아서 onMessage()
를 실행하게 된다.
onMessage()
에서는 v2와 마찬가지로 messageTemplate.convertAndSend()
를 통해 메시지를 구독자에게 전달한다.
이제 전체 흐름을 보자
/pub/chat/message
url로 chat message가 들어오면 redis에 roomId
(topic)을 가지고 publish를 한다.핵심 코드만 설명되어 있기 때문에 전체 코드는 github에서 확인할 수 있다.
글 잘 봤습니다, 많은 도움이 되었습니다.