Spring Data Redis Streams 정리

김형준·2025년 2월 28일
0

Redis Stream

Redis 5.0에서 도입된 Message Broker 역할의 자료구조

저장되는 데이터들을 시간 순서대로 관리하며(FIFO), 소비자 그룹을 통해 여러 클라이언트가 동일한 스트림을 소비할 수 있도록 한다.

또한 새로운 데이터 항목이 추가될 때 까지 대기하는 blocking read 기능도 제공한다.

Stream에 적용 가능한 연산들은 아래와 같다.

https://redis.io/docs/latest/commands/?group=stream

이 중 XTRIM을 통해 스트림의 최대 길이를 설정 가능하였고, 여러 Consumer들이 스트림을 병렬로 처리 가능하였기에 redis pub/sub 대신 사용하였다.

Spring Data Redis Stream

Spring Data Redis에서 Redis Stream에 대한 접근 방식을 제공하는 패키지(org.springframework.data.redis.stream, org.springframework.data.redis.connection)

설정

@Configuration
class MyConfig {

  // …
  
  // 구독할 채널 키값을 주입하기 위함
  // @Value 등을 활용해도 됨
  @Bean
	ChannelTopic topic() {
	    return new ChannelTopic("messageQueue");
	}
	
	// spring data redis의 설정
	@Bean
  public LettuceConnectionFactory lettuceConnectionFactory() {...}
  @Bean
  public RedisTemplate<String, Object> createTemplate(LettuceConnectionFactory lettuceConnectionFactory) {...}
	
	@Bean
  public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
				  RedisConnectionFactory redisConnectionFactory
  ) {
		  StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
				      StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
				              .batchSize(10)
                      .errorHandler(t -> System.err.println("에러 발생: " + t.getMessage()))
                      .pollTimeout(Duration.ZERO)
                      .build();

        return StreamMessageListenerContainer.create(redisConnectionFactory, options);
    }
}

Publisher

RedisTemplate.opsForStream().add()을 활용해 Stream에 값을 발행하는 코드를 Publisher라고 생각하면 된다.

여기서 add()에 들어가는 값은 Consumer가 상속하는 StreamListener의 제네릭 타입에 맞춰야 한다.

Map<String, String> message = Collections.singletonMap(key, "Message-Value");

// Stream 크기가 1000 미만일 경우에만 add하겠다는 뜻
XAddOptions options = XAddOptions.maxlen(1000);
// Redis Stream에 대해 XADD 실행
Object addedId = redisTemplate.opsForStream().add(topic.getTopic(), message, options);

if (addedId == null) {
		throw new RedisSystemException("스트림이 가득 차서 메시지를 추가할 수 없습니다.", new RuntimeException());
}

Consumer

Stream Consumer는 StreamListener 인터페이스에 기반해 구현된다.

@Override
public void onMessage(ObjectRecord<String, String> message) {
    
    String stream = message.getStream();
    String recordId = message.getId().getValue();
    
    try {
        // 처리할 로직 구현
        if (StringUtils.isNotEmpty(message.getValue())) {
            // To-Do
        }
        // 이후, ack stream
        this.redisTemplate.opsForStream().acknowledge(streamKey, consumerGroupName, recordId);
        
    } catch (Exception e) {
        // TODO: handle exception
        e.printStackTrace();
        this.redisOperator.delete(stream, recordId);
    }
}

또한 InitializingBean, DisposableBean을 활용해 Consumer Group을 설정할 수 있다.

// InitializingBean 상속
@Override
public void afterPropertiesSet() {

		String streamKey = "stream-key";
		// stream의 group 목록 조회
    StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(streamKey);
    boolean groupExists = groups.stream().anyMatch(group -> group.groupName().equals(GROUP_NAME));

    if (!groupExists) {
		    // consumer group 생성
		    redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), GROUP_NAME);
    }
        
    // Stream의 최대 길이 설정
    redisTemplate.opsForStream().trim(streamKey, maxStreamLength);

		// stream 구독
    this.subscription = listenerContainer.receive(
		    Consumer.from(GROUP_NAME, CONSUMER_NAME),
        StreamOffset.create(topic.getTopic(), ReadOffset.lastConsumed()),
        this
    );

		// consuming 시작
    this.listenerContainer.start();
}

// DisposableBean 상속
@Override
public void destroy() {
    if (subscription != null) {
		    // 구독 해제
        subscription.cancel();
    }

    if (listenerContainer != null) {
		    // consuming 중단
        listenerContainer.stop();
		}
}

이렇게 Stream을 등록한 경우 acknowledge() 없이 Stream에서 발행되는 Message가 자동으로 onMessage()에 전달된다.

참고

https://redis.io/docs/latest/develop/data-types/streams/

https://dev.gmarket.com/113

https://docs.spring.io/spring-data/redis/reference/redis/redis-streams.html

0개의 댓글

관련 채용 정보