Pub/Sub Messaging

뾰족머리삼돌이·2024년 10월 15일
0

Spring Data Redis

목록 보기
6/12

Redis Pub/Sub

Redis는 SUBSCRIBE, UNSUBSCRIBEPUBLISH 명령어를 이용한 메시지 발행 및 구독기능을 제공한다. 즉, 메시지 발행 및 구독을 통해 서로다른 노드( 애플리케이션 )에 실시간으로 이벤트를 전파시킬 수 있다.

SUBSCRIBE channel11 ch:00 와 같이 다수의 채널을 구독하는게 가능하며, 다른 클라이언트가 해당 채널로 메시지를 발행하면 해당 메시지를 수신할 수 있다. 단, 하나 이상의 채널을 구독한 구독자는 SUBSCRIBEUNSUBSCRIBE를 제외한 다른 Redis 명령을 수행할 수 없다.

redis-cli 에서는 구독모드에 들어가면 UNSUBSCRIBEPUNSUBSCRIBE를 사용할 수 없다.
대신, ctrl + C 로 종료할 수 있다.

발행된 메시지는 한번만 전달되며, 만약 메시지를 처리할 수 없는 상황이라면 메시지가 소실된다. 메시지 소실을 막고싶다면 Redis Streams에 대해 알아보도록 하자

메시지 형식

Redis Pub/Sub에서 메시지는 3개의 요소로 구성된 array-reply 형식을 띈다.

SUBSCRIBE first second		// first와 secound 채널 구독
*3							// 3개의 항목으로 이루어진 배열 응답을 의미
$9							// 다음 오는 문자의 바이트 정보
subscribe					// 이후 주어지는 요소에 대한 성공적인 구독을 의미
$5							// 다음 오는 문자의 바이트 정보
first						// 클라이언트가 구독한 채널명
:1							// 현재 클라이언트가 구독하고 있는 채널 수
*3							// 3개의 항목으로 이루어진 배열 응답을 의미
$9							// 다음 오는 문자의 바이트 정보
subscribe					// 이후 주어지는 요소에 대한 성공적인 구독을 의미
$6							// 다음 오는 문자의 바이트 정보
second						// 클라이언트가 구독한 채널명
:2							// 현재 클라이언트가 구독하고 있는 채널 수

위 예시에서는 직접적인 채널명을 사용하여 구독하였지만, 패턴을 통한 채널구독도 사용할 수 있다.
이 경우, PSUBSCRIBEPUNSUBSCRIBE 명령을 사용하면 된다.

Spring에서의 Pub/Sub

Spring Data Redis에서는 org.springframework.data.redis.connectionorg.springframework.data.redis.listener 패키지에서 핵심기능을 제공한다.

메시지 발행

메시지 발행을 위해서는 RedisConnectionRedisOperations를 사용할 수 있다.
두 객체는 모두 채널명을 인수로 받는 publish() 메서드를 제공한다.

RedisConnectionbyte[]같은 저수준의 데이터 처리에 적합하며,
RedisOperations는 객체같은 고수준의 데이터 처리에 적합하다.

// connection을 통한 메시지 발행
RedisConnection con =byte[] msg =byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// RedisOperations을 통한 메시지 발행
RedisOperations operations =Long numberOfClients = operations.convertAndSend("hello!", "world");

메시지 구독

저수준에서의 메시지 구독은 RedisConnection를 사용할 수 있다.

subscribepSubscribe 메서드를 이용하여 채널명이나 패턴을 이용한 채널구독이 가능하며, 현재 구독중인 채널목록이나 현재 Connection이 구독모드인지 확인할 수 있는 getSubscriptionisSubscribed 메서드를 제공한다.

한번 구독이 이뤄지고나면 메시지를 수신하기 위해 Connection이 구독모드에 들어간다.
이 상태에서는 특정 명령을 제외하고는 명령수행이 불가능하다.

이를 해결하기 위해서는 비동기 메시지 처리가 필요하며, 이는 3가지의 주요 클래스로 구성된다

  • MessageListener
    : 메시지의 수신 및 처리
  • RedisMessageListenerContainer
    : MessageListener의 관리와 스레드 및 Connection 관리
  • MessageListenerAdapter
    : POJO 객체를 MDP수신에 사용할 수 있도록 변환

Message Listener Containers

저수준에서의 메시지 구독은 앞서 살펴봤듯이 구독모드에 의해 스레드가 블로킹되는 문제가 있다.

이를 해결하기 위해서는 MessageListener 콜백을 통한 비동기 메시지 처리를 구현해야한다.
해당 인터페이스는 새로운 메시지가 도착할 때마다 호출되는 콜백메서드 역할인 onMessage()로 구성되어 있다.

MessageListenerSubscriptionListener를 추가적으로 구현함으로써 구독과 구독취소에 따른 알람을 받을 수도 있다.
이는 호출동기화 상황에서 유용하게 사용될 수 있다.

RedisMessageListenerContainer 는 이러한 MessageListener들을 관리하는 컨테이너 역할을 한다.

메시지 프로토콜과 메시지 제공자 사이에서 메시지 수신등록, 리소스 확보 및 해제, 예외 변환등의 작업을 처리하며, 메시지 수신과 관련된 스레드들을 관리한다. 채널마다 하나의 connection과 하나의 스레드를 사용하도록 구성하여 구독자들이 많아지더라도 런타임에 발생하는 비용을 절약한다. 이러한 구성은 런타임 시에 변경이 가능하므로 구독자를 추가하거나 제거하는 등의 동작이 가능하며, 지연 구독 방식을 통해 필요할 때만 리소스를 사용하여 더욱 효율적인 자원 관리를 지원한다.

MessageListenerAdapter

MessageListenerAdapter를 이용하면 Redis 관련 의존성없는 POJO를 MDP로 구성할 수 있다.

// 1
public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }
 
// 2
public class DefaultMessageDelegate implements MessageDelegate {
	// …
}

// 3
@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}

예를들어, 위 예시에서는 MessageDelegateDefaultMessageDelegate로 구성된 순수한 자바 객체를 Bean으로 등록시킨다.
이를 이용하여 Adapter Bean을 등록하고, 최종적으로 RedisMessageListenerContainer를 등록한다.

0개의 댓글

관련 채용 정보