Pub/Sub Messaging 에서 잠깐 언급했던 것처럼 Redis에서 발행되는 메시지는 단 한번만 전송된다.
네트워크 장애 등으로 인해 메시지가 발행된 순간에 시스템이 정상이 아니라면 해당 메시지는 소실된다는 의미다.
이를 해결하기 위한 기술로 Redis Streams을 알아보자.
Redis Streams은 일종의 append-only 로그처럼 동작하는 데이터구조다. 이를 이용하면 실시간으로 이벤트를 기록하고 배포할 수 있다.
공식문서에서 소개하는 사용예시는 아래와 같다
Redis는 각 스트림 항목에 대한 고유 ID를 생성하고, 이를 바탕으로 관련 항목을 검색하거나 처리한다.
이때, 생성되는 ID는 시간과 관련되어 있다.
append-only log
추가전용 로그, 즉 새롭게 데이터를 추가하는 목적으로 사용되는 로그를 의미한다.
이 로그에 저장된 데이터는 DB에 새로운 데이터를 추가할 수 있지만 기존의 데이터를 변경하진 못한다.
Redis Streams에서 사용되는 기초적인 명령어는 XADD
, XREAD
, XRANGE
, XLEN
이 있다.
XADD
: 스트림에 새로운 entry 추가XREAD
: 주어진 위치에서 시작하여 시간에 따라 이동하며 하나 이상의 entry를 읽음XRANGE
: 주어진 두 ID 사이의 entry 반환XLEN
: 스트림의 길이 반환항목을 추가하는데 소요되는 시간은 O(1) 이다.
n이 ID의 길이라고 했을때, 단일 항목에 접근하는 데는 O(n) 이 소요된다.
즉, 효율적인 항목 삽입과 읽기를 지원한다.
기초 명령어 예시를 제외한 Group이나 실패복구와 같은 세부내역은 공식문서를 참고하자
자동차 레이서가 체크포인트를 통과할때마다 이름, 속도, 위치 등을 포함한 스트림 항목을 추가한다고 가정하자
위 형태처럼 XADD
명령을 통해 race:france 스트림에 각 항목들이 추가되는 것을 확인할 수 있다.
race:france
는 스트림의 식별자( 키 ) 이며 뒤에오는 *
은 redis server에게 ID 생성을 맡긴다는 의미를 가진다.
실제로 1729048158754-0
와 같은 형식의 고유 ID들이 생성되는 것을 확인할 수 있다.
자동 생성되는 ID는 <현재시간의 milli초>-<sequence 번호>
의 형태를 띈다.
시퀀스 번호는 동일한 milli초에 생성된 경우, 구분하기 위해 사용된다
XRANGE race:france 1692632086370-0 + COUNT 2
이 명령은 1729048158754-0
부터 2개의 항목을 읽는다는 명령이다.
실제로 1729048158754-0
부터 시작하여 각 항목들의 정보가 출력되는 것을 확인할 수 있다.
XREAD
명령을 사용하면 특정 스트림의 시작항목( $
or 1729048158754-0
) 부터 읽어들일 숫자( COUNT 100
) 읽을 항목이 없을 때 대기하는 시간(BLOCK 300
) 을 지정하여 항목들을 읽어올 수 있다.
유의해야할 부분은 시작점 이후의 항목( 자신보다 큰 ID )들을 읽어온다는 부분이 있다.
위 이미지에서 1729048158754-0
을 시작지점으로 삼았기 때문에 다음 항목인 1729048165862-0
부터 항목을 읽어오는 것을 볼 수 있다.
선택옵션을 제외한 기초적인 명령어 구조는 아래와 같다
XREAD STREAMS race:france 0
Redis Streams은 크게 record 추가와 record 소비의 두 기능으로 구분할 수 있다.
Pub/Sub Messaging과 유사하지만 메시지가 유실되지 않는다는 점이 특징이다.
또한, Pub/Sub의 경우 애플리케이션이 Redis서버에 구독하여 메시지를 자동으로 푸시받는 것과 대비하여
Streams은 클라이언트가 직접 서버에 요청해야 메시지가 온다는 점이 다르다.
Spring 에서 이 기능과 관련된 패키지는 org.springframework.data.redis.connection
와 org.springframework.data.redis.stream
가 있다.
record를 전달하기 위해서는 저수준( byte[]
)의 RedisConnection
와 고수준( 객체 )의 StreamOperations
을 사용할 수 있다.
두 객체모두 record와 대상 스트림을 인수로 받는 add()
메서드를 제공한다.
// connection을 통한 메시지 추가
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);
// RedisTemplate을 통한 메시지 추가
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);
위 예시와 같이 RedisConnection
이나 StreamOperations
을 사용하여 record를 스트림에 추가할 수 있다.
RedisTemplate
의opsForStream()
는StreamOperations
을 반환한다.
저수준에서의 소비는 Redis 명령어의 XREAD
와 XREADGROUP
명령과 매치되는 RedisConnection
의 xRead
나 xReadGroup
메서드를 사용할 수 있다.
( 정확히는 RedisConnection
이 구현하는 RedisStreamCommands
의 메서드다 )
스트림 메시지를 소비하려면 애플리케이션 코드의 메시지를 폴링하거나 Message Listener Containers의 비동기 수신을 사용하면 된다.
폴링방식은 애플리케이션에서 정기적으로 메시지가 새롭게 등록되었는지 확인해야하는 수동적인 방법이며,
Message Listener Containers은 컨테이너에서 메시지가 새롭게 등록되면 자동으로 애플리케이션 코드에 알림을 보낸다.
스트림 소비는 일반적으로 비동기 처리와 연관되지만, 동기적으로 메시지를 소비하는 것도 가능하다.
StreamOperations.read()
를 사용하면 아래 예시코드처럼 동기적으로 메시지를 소비할 수 있다.
RedisTemplate template = …
// my-stream 의 최근에 새로추가된 레코드 2개를 읽음
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
StreamOffset.latest("my-stream"));
// my-group 그룹과 my-consumer 이 소비자로써 my-stream의 마지막으로 소비된 메시지부터 2개를 읽음
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
StreamReadOptions.empty().count(2),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
Pub/Sub의 Message Listener Containers와 유사하게 Streams에서도 블로킹 문제를 해결하기 위해 메시지 리스너를 제공한다.
사용되는 주요 인터페이스는 StreamMessageListenerContainer
와 StreamReceiver
이며,
메시지 수신과 처리를 위한 스레드 관리와 리스너에게 메시지를 전달하는 역할을 수행한다.
StreamMessageListenerContainer
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
// 커넥션 팩토리
RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …
// 컨테이서 생성 옵션 설정
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
// 컨테이너 생성
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
containerOptions);
// 구독 등록
Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);
StreamListener
를 구현하는 스트림 리스너를 생성하고, 이를 컨테이너에 제공함으로써 구독을 등록하는 방식으로 작성할 수 있다.
예를들어, 위 코드에서는 my-stream
으로 시작하는 스트림에 새로운 레코드가 등록되면 리스너가 동작한다.
기본적으로 Consumer Group을 통해 메시지를 확인한 경우,
서버는 주어진 메시지가 성공적으로 전달되었음을 기록하고 PEL( 전달되었으나 확인되지 않은 메시지 목록 )에 추가한다.
PEL에서 메시지를 제거하기 위해서는 StreamOperations.acknowledge
를 명령을 수행해야한다.
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...
container.receive(Consumer.from("my-group", "my-consumer"),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
msg -> {
// ...
redisTemplate.opsForStream().acknowledge("my-group", msg);
});
만약, 자동으로 메시지를 acknowledge
처리하려면 receive
대신 receiveAutoAck
를 사용하면 된다.
스트림 읽기 작업은 특정한 위치에서부터 메시지를 읽기위한 ReadOffset
을 지원한다.
ReadOffset.latest()
,ReadOffset.from(…)
,ReadOffset.lastConsumed()
ReadOffset.lastConsumed()
의 경우, Consumer Group이 아닌경우 ReadOffset.from(…)
과 동일하게 동작한다.
스트림으로 전송되는 모든 레코드들은 바이너리 형식으로 직렬화되어야 한다.
스트림이 해시 데이터구조와 비슷하기 때문에 RedisTemplate
에 구성된 관련 직렬화기를 사용한다.
keySerializer
,hashKeySerializer
,hashValueSerializer
가 사용된다.
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in("my-stream")
.ofObject("my-value");
redisTemplate()
.opsForStream()
.add(record);
List<ObjectRecord<String, String>> records = redisTemplate()
.opsForStream()
.read(String.class, StreamOffset.fromStart("my-stream"));
ObjectRecord
를 이용하면 간단한 값들을 스트림에 직접 추가할 수 있다.
이때 .add(record)
코드는 Redis 명령에서의 XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"
에 해당된다.
복합적인 값은 3가지의 방법을 사용하여 스트림에 넣을 수 있다.
RedisSerializer
를 이용한 직렬화 사용HashMapper
를 이용하여 직렬화에 적절한 Map
으로 변환1번 방법은 가장 간단하지만 스트림 구조의 필드-값 쌍 기능을 무시하므로, 데이터 구조를 제대로 표현할 수 없다.
2번 방법은 간단하지만 모든 소비자가 해당 직렬화 매커니즘을 구현해야한다.
3번 방법은 조금 복잡하지만 데이터 구조를 단순화시킨다.
아래 예시는 3번 방법을 구현하는 코드이다.
ObjectRecord<String, User> record = StreamRecords.newRecord()
.in("user-logon")
.ofObject(new User("night", "angel"));
redisTemplate()
.opsForStream()
.add(record);
List<ObjectRecord<String, User>> records = redisTemplate()
.opsForStream()
.read(User.class, StreamOffset.fromStart("user-logon"));
위 코드가 의미하는 스트림에 레코드를 추가하는 명령은 아래와 같다
XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel"
StreamOperations
이 기본적으로 ObjectHashMapper
를 사용하기에 위 예시는 해당 mapper를 사용한다.
반면, 아래 예시처럼 원하는 mapper를 제공하여 동작시킬 수도 있다.
redisTemplate()
.opsForStream(new Jackson2HashMapper(true))
.add(record);
이 코드가 의미하는 명령은 아래와 같다.
XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel"