Redis는 SUBSCRIBE
, UNSUBSCRIBE
와 PUBLISH
명령어를 이용한 메시지 발행 및 구독기능을 제공한다. 즉, 메시지 발행 및 구독을 통해 서로다른 노드( 애플리케이션 )에 실시간으로 이벤트를 전파시킬 수 있다.
SUBSCRIBE channel11 ch:00
와 같이 다수의 채널을 구독하는게 가능하며, 다른 클라이언트가 해당 채널로 메시지를 발행하면 해당 메시지를 수신할 수 있다. 단, 하나 이상의 채널을 구독한 구독자는 SUBSCRIBE
와 UNSUBSCRIBE
를 제외한 다른 Redis 명령을 수행할 수 없다.
redis-cli 에서는 구독모드에 들어가면
UNSUBSCRIBE
나PUNSUBSCRIBE
를 사용할 수 없다.
대신,ctrl + C
로 종료할 수 있다.
발행된 메시지는 한번만 전달되며, 만약 메시지를 처리할 수 없는 상황이라면 메시지가 소실된다. 메시지 소실을 막고싶다면 Redis Streams에 대해 알아보도록 하자
Redis Pub/Sub에서 메시지는 3개의 요소로 구성된 array-reply 형식을 띈다.
SUBSCRIBE first second // first와 secound 채널 구독
*3 // 3개의 항목으로 이루어진 배열 응답을 의미
$9 // 다음 오는 문자의 바이트 정보
subscribe // 이후 주어지는 요소에 대한 성공적인 구독을 의미
$5 // 다음 오는 문자의 바이트 정보
first // 클라이언트가 구독한 채널명
:1 // 현재 클라이언트가 구독하고 있는 채널 수
*3 // 3개의 항목으로 이루어진 배열 응답을 의미
$9 // 다음 오는 문자의 바이트 정보
subscribe // 이후 주어지는 요소에 대한 성공적인 구독을 의미
$6 // 다음 오는 문자의 바이트 정보
second // 클라이언트가 구독한 채널명
:2 // 현재 클라이언트가 구독하고 있는 채널 수
위 예시에서는 직접적인 채널명을 사용하여 구독하였지만, 패턴을 통한 채널구독도 사용할 수 있다.
이 경우,PSUBSCRIBE
나PUNSUBSCRIBE
명령을 사용하면 된다.
Spring Data Redis에서는 org.springframework.data.redis.connection
와 org.springframework.data.redis.listener
패키지에서 핵심기능을 제공한다.
메시지 발행을 위해서는 RedisConnection
나 RedisOperations
를 사용할 수 있다.
두 객체는 모두 채널명을 인수로 받는 publish()
메서드를 제공한다.
RedisConnection
는 byte[]
같은 저수준의 데이터 처리에 적합하며,
RedisOperations
는 객체같은 고수준의 데이터 처리에 적합하다.
// connection을 통한 메시지 발행
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);
// RedisOperations을 통한 메시지 발행
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
저수준에서의 메시지 구독은 RedisConnection
를 사용할 수 있다.
subscribe
나 pSubscribe
메서드를 이용하여 채널명이나 패턴을 이용한 채널구독이 가능하며, 현재 구독중인 채널목록이나 현재 Connection이 구독모드인지 확인할 수 있는 getSubscription
와 isSubscribed
메서드를 제공한다.
한번 구독이 이뤄지고나면 메시지를 수신하기 위해 Connection
이 구독모드에 들어간다.
이 상태에서는 특정 명령을 제외하고는 명령수행이 불가능하다.
이를 해결하기 위해서는 비동기 메시지 처리가 필요하며, 이는 3가지의 주요 클래스로 구성된다
MessageListener
RedisMessageListenerContainer
MessageListener
의 관리와 스레드 및 Connection 관리MessageListenerAdapter
저수준에서의 메시지 구독은 앞서 살펴봤듯이 구독모드에 의해 스레드가 블로킹되는 문제가 있다.
이를 해결하기 위해서는 MessageListener
콜백을 통한 비동기 메시지 처리를 구현해야한다.
해당 인터페이스는 새로운 메시지가 도착할 때마다 호출되는 콜백메서드 역할인 onMessage()
로 구성되어 있다.
MessageListener
는SubscriptionListener
를 추가적으로 구현함으로써 구독과 구독취소에 따른 알람을 받을 수도 있다.
이는 호출동기화 상황에서 유용하게 사용될 수 있다.
RedisMessageListenerContainer
는 이러한 MessageListener
들을 관리하는 컨테이너 역할을 한다.
메시지 프로토콜과 메시지 제공자 사이에서 메시지 수신등록, 리소스 확보 및 해제, 예외 변환등의 작업을 처리하며, 메시지 수신과 관련된 스레드들을 관리한다. 채널마다 하나의 connection
과 하나의 스레드를 사용하도록 구성하여 구독자들이 많아지더라도 런타임에 발생하는 비용을 절약한다. 이러한 구성은 런타임 시에 변경이 가능하므로 구독자를 추가하거나 제거하는 등의 동작이 가능하며, 지연 구독 방식을 통해 필요할 때만 리소스를 사용하여 더욱 효율적인 자원 관리를 지원한다.
MessageListenerAdapter
를 이용하면 Redis 관련 의존성없는 POJO를 MDP로 구성할 수 있다.
// 1
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
// pass the channel/pattern as well
void handleMessage(Serializable message, String channel);
}
// 2
public class DefaultMessageDelegate implements MessageDelegate {
// …
}
// 3
@Configuration
class MyConfig {
// …
@Bean
DefaultMessageDelegate listener() {
return new DefaultMessageDelegate();
}
@Bean
MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
return new MessageListenerAdapter(listener, "handleMessage");
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, ChannelTopic.of("chatroom"));
return container;
}
}
예를들어, 위 예시에서는 MessageDelegate
와 DefaultMessageDelegate
로 구성된 순수한 자바 객체를 Bean으로 등록시킨다.
이를 이용하여 Adapter Bean을 등록하고, 최종적으로 RedisMessageListenerContainer
를 등록한다.