Redis 5.0에서 도입된 Message Broker 역할의 자료구조
저장되는 데이터들을 시간 순서대로 관리하며(FIFO), 소비자 그룹을 통해 여러 클라이언트가 동일한 스트림을 소비할 수 있도록 한다.
또한 새로운 데이터 항목이 추가될 때 까지 대기하는 blocking read 기능도 제공한다.
Stream에 적용 가능한 연산들은 아래와 같다.
https://redis.io/docs/latest/commands/?group=stream
이 중 XTRIM을 통해 스트림의 최대 길이를 설정 가능하였고, 여러 Consumer들이 스트림을 병렬로 처리 가능하였기에 redis pub/sub 대신 사용하였다.
Spring Data Redis에서 Redis Stream에 대한 접근 방식을 제공하는 패키지(org.springframework.data.redis.stream
, org.springframework.data.redis.connection
)
@Configuration
class MyConfig {
// …
// 구독할 채널 키값을 주입하기 위함
// @Value 등을 활용해도 됨
@Bean
ChannelTopic topic() {
return new ChannelTopic("messageQueue");
}
// spring data redis의 설정
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {...}
@Bean
public RedisTemplate<String, Object> createTemplate(LettuceConnectionFactory lettuceConnectionFactory) {...}
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory
) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.errorHandler(t -> System.err.println("에러 발생: " + t.getMessage()))
.pollTimeout(Duration.ZERO)
.build();
return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}
}
RedisTemplate.opsForStream().add()을 활용해 Stream에 값을 발행하는 코드를 Publisher라고 생각하면 된다.
여기서 add()에 들어가는 값은 Consumer가 상속하는 StreamListener의 제네릭 타입에 맞춰야 한다.
Map<String, String> message = Collections.singletonMap(key, "Message-Value");
// Stream 크기가 1000 미만일 경우에만 add하겠다는 뜻
XAddOptions options = XAddOptions.maxlen(1000);
// Redis Stream에 대해 XADD 실행
Object addedId = redisTemplate.opsForStream().add(topic.getTopic(), message, options);
if (addedId == null) {
throw new RedisSystemException("스트림이 가득 차서 메시지를 추가할 수 없습니다.", new RuntimeException());
}
Stream Consumer는 StreamListener 인터페이스에 기반해 구현된다.
@Override
public void onMessage(ObjectRecord<String, String> message) {
String stream = message.getStream();
String recordId = message.getId().getValue();
try {
// 처리할 로직 구현
if (StringUtils.isNotEmpty(message.getValue())) {
// To-Do
}
// 이후, ack stream
this.redisTemplate.opsForStream().acknowledge(streamKey, consumerGroupName, recordId);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
this.redisOperator.delete(stream, recordId);
}
}
또한 InitializingBean, DisposableBean을 활용해 Consumer Group을 설정할 수 있다.
// InitializingBean 상속
@Override
public void afterPropertiesSet() {
String streamKey = "stream-key";
// stream의 group 목록 조회
StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(streamKey);
boolean groupExists = groups.stream().anyMatch(group -> group.groupName().equals(GROUP_NAME));
if (!groupExists) {
// consumer group 생성
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), GROUP_NAME);
}
// Stream의 최대 길이 설정
redisTemplate.opsForStream().trim(streamKey, maxStreamLength);
// stream 구독
this.subscription = listenerContainer.receive(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamOffset.create(topic.getTopic(), ReadOffset.lastConsumed()),
this
);
// consuming 시작
this.listenerContainer.start();
}
// DisposableBean 상속
@Override
public void destroy() {
if (subscription != null) {
// 구독 해제
subscription.cancel();
}
if (listenerContainer != null) {
// consuming 중단
listenerContainer.stop();
}
}
이렇게 Stream을 등록한 경우 acknowledge() 없이 Stream에서 발행되는 Message가 자동으로 onMessage()에 전달된다.
https://redis.io/docs/latest/develop/data-types/streams/
https://docs.spring.io/spring-data/redis/reference/redis/redis-streams.html