캐시를 사용한다는 것은 결국 데이터 저장소가 하나 더 늘어나는 것이고 이는 DB같은 다른 데이터 저장소와의 데이터 불일치를 발생시키게 된다. 이를 해소하기 위해서 전통적인 캐시에서는 TTL을 지정하여 캐시 데이터에 기한을 지정해서 캐시 데이터에 유효 기간을 두곤 한다. 하지만 이 역시 TTL로 지정된 유효 기간 안에 다른 곳에서 해당 데이터를 수정하더라도 수정된 내용이 반영되지 않으므로 반쪽자리 해결책으로 볼 수 있다. (물론 실시간성이 보장되지 않아도 되는 데이터에서는 이 방법을 많이 사용하고 있다!)
Redis pub/sub은 SUBSCRIBE
, UNSUBCRIBE
, PUBLISH
라는 개념을 가지며 일반적인 publish/subscribe 개념을 사용한다. 즉, publish 된 메세지들은 어떤 subscribers 에게 전달될 지 모르는 채로 각 채널로 들어가고 subscribers는 원하는 채널에 있는 메세지들을 받는다. 이러한 구조는 publisher들과 subscriber 들을 deoupling 시켜 높은 확장성과 dynamic 한 네트워크 구성을 가능하게 한다. 주의할 점은 메세지는 한번만 publish되므로 네트워크 등의 외부 문제가 발생해서 메세지를 처리하지 못했다면 해당 메세지는 영원히 손실된다.
이제 개념을 정리했으니 자바 스프링 환경에서 어떻게 사용 가능한지 예제로 알아보자.
publish 쪽의 코드는 매우 간단하다. RedisTemplate.convertAndSend
메서드를 활용하여 메시지를 원하는 topic으로 전송한다.
@Service
public class RedisPublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, RoomMessage message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
subsribe에 대한 코드는 크게 두가지 부분으로 나눌 수 있다.
1. 해당 메시지를 받아서 처리하는 부분 (logic)
2. 메시지를 구독할 topic 및 redis connection에 대한 설정을 하는 부분 (configuration)
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private AtomicInteger counter = new AtomicInteger();
// 메시지를 받아서 어떻게 처리할 지에 대한 부분
public void receiveMessage(String message) {
LOGGER.info("Received <" + message + ">");
counter.incrementAndGet();
}
public int getCount() {
return counter.get();
}
}
설정할 것은 3개다.
1. connection factory
: message listener container, redis template 을 redis 서버에 연결
2. message listener container
: receiver 등록 (메시지를 받아 처리하는 로직적인 부분)
3. redis template
: 메시지 전송
// container 빈으로 등록
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// connection factory
container.setConnectionFactory(connectionFactory);
// listenerAdapter 를 이용해서 Listener 지정
// subscribe 할 topic 지정
container.addMessageListener(listenerAdapter, new PatternTopic("ChannelTopic 이름"));
return container;
}
// MessageListenerAdapter 에서 receiver 설정
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
// 처리하는 로직
@Bean
Receiver receiver() {
return new Receiver();
}
// redis template
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
sequenceDiagram
participant A as Publisher
participant B as MessageListenerContainer
participant C as MessageListenerAdapter
participant D as Receiver
B->>B: connectionFactory, topic, MessageListenerAdapter 지정
C->>C: reciever 지정
A->>B: convertAndSend(topic, message)
B->>C: OnMessage(Message)
C->>D: OnMessage(Message)
D->>D: 메시지를 이용한 로직 수행
https://redis.io/docs/manual/pubsub/
https://spring.io/guides/gs/messaging-redis/