MQ 별 특징 3. - Redis Stream(2)

김예지·2024년 2월 12일
1

23-2 캡스톤 디자인

목록 보기
7/8

Redis Stream이란

이전 게시글에서 Redis에 대해서 알아보았다.
https://velog.io/@nwactris/MQ-별-특징-3.-Redis-Stream1-Redis란
(Redis Stream의 특징에 대해 이해하려면, Redis를 먼저 이해해야 합니다!)

Redis Stream은 Redis의 자료형 중 하나로 Redis 5.0에 추가되었으며, 단순한 브로드캐스팅만 지원하던 Redis Pub/Sub에서 더 나아가 이벤트 브로커로서의 기능을 갖추었다.

Redis Pub/Sub을 다룬 논문은 많았으나, Redis Stream은 선행 연구가 부족하여 이번 캡스톤 디자인의 목표로 삼았다.

아키텍쳐

  • Producer: 메시지를 생성하고 발송하는 주체
  • Consumer: 메시지를 수신하고 소비하는 주체
  • Consumer Group: 논리적으로 Consumer를 묶은 것으로, 그룹 내의 Consumer는 서로 다른 메시지를 소비한다.
    이름으로 그룹 내의 소비자를 구분하므로 통신이 끊긴 후 재접속하여도 동일 소비자임을 확인받을 수 있다.
    에러로 소비하지 못한 메시지를 다른 Consumer가 소비할 수 있다.

  • Stream: 다른 MQ의 Queue와 유사한 개념이다. key(Stream 이름)로 구분한다.
  • ID: 개별 메시지가 Stream에 저장되는 시간(Epoch Time)을 ID로 가진다. 따라서 Redis Pub/Sub과 달리 개별 메시지 구분 & 에러 처리가 가능해졌다.
  • Entry: 하나의 Entry는 (entry) ID + key-value 리스트로 구성된다. Map으로 구성된 메시지 하나와 비슷한 개념으로 이해해도 무방하다.
    append-only로 추가된 데이터라, 사용자가 삭제하지 않는 한 지워지지 않는다.

Redis Stream 명령어

Redis Stream의 명령어는 기존 Redis와 동일하게redis-cli에 직접 입력할 수 있으며, docs에 Java와 Python 사용 예제도 제시되어 있다.
Spring에서는 RedisTemplate을 통해StreamOperation에서 명령어를 사용할 수 있도록 인터페이스가 제공된다.

https://redis.io/commands/?group=stream
자세한 활용 방법은 docs에서 확인할 수 있으며, 간략하게는 다음과 같다.

  • XADD: 메시지를 Stream에 추가한다.
    Stream이 생성되어 있지 않다면, key에 대응하는 Stream을 생성한다.

  • XGROUP [CREATE, CREATECONSUMER, DELCONSUMER, DESTROY, SETID]: Consumer Group에 관한 명령어를 수행한다.

  • XRANGE: 하나의 Stream 내에서 지정된 ID 범위에 해당하는 Entry를 반환한다.

  • XREAD, XREADGROUP: 여러 Stream에서 요청된 ID 범위의 Entry를 읽어온다. XRANGE와 유사하게 사용될 수 있으며, BLOCK을 설정할 수 있다.(추후 설명)

  • XACK: Consumer가 해당 메시지를 소비했음을 Stream에 알린다.
    성공적으로 소비한 메시지 개수를 반환한다.

  • XDEL: Stream에서 지정된 ID의 Entry를 삭제하며, 삭제된 개수를 반환한다.

  • XPENDING: 소비에 실패해 Consumer Group에서 보류 중인 메시지를 관찰할 수 있다.

  • XCLAIM, XAUTOCLAIM: 보류중인 메시지를 동일 Consumer Group 내의 다른 Consumer에게 전달하도록 설정한다.

특징

https://redis.io/docs/data-types/streams/

확장성 및 내결함성

서버가 다운되면 모든 데이터가 삭제되는 Redis의 in-memory 단점과 메시지의 에러 처리 기능이 없는 Redis Pub/Sub의 한계를 극복하였다.

XACK & PEL

  • PEL(Pending Entry List): XREADXREADGROUP 요청 시, 처리되어야 할 메시지는 PEL에 들어가 관리된다.
    PEL은 Consumer Group별로 관리되며, 메시지가 소비되어 XACK를 받으면 해당 메시지는 PEL에서 삭제된다.(Stream에는 그대로 남아있다.)

XACK을 받지 못한(Consumer가 소비에 실패한) 메시지는 XPENDING을 통해 확인할 수 있다. XPENDING은 Consumer Group에서 보류 중인 메시지를 관찰할 수 있는 명령어이다.

XPENDING [Stream 이름] [Group 이름]

아래의 예시에서는, 메시지 ID의 범위를 - + 10로 제한해 읽어오도록 하였다.

> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"
   2) "Bob"
   3) (integer) 74642
   4) (integer) 1
2) 1) "1692632662819-0"
   2) "Bob"
   3) (integer) 74642
   4) (integer) 1

XPENDING의 결과 값으로 소비에 실패한 메시지의 내용과 더불어, 3), 4)에서 각 메시지가 마지막으로 소비자에게 전달된 이후 경과한 유휴 시간 (밀리초), 메시지가 전송된 횟수도 확인할 수 있다.

XAUTOCLAIM [Stream 이름] [Group 이름] [Group 내 소비자 이름] [유휴시간] [읽어올 ID 범위] [시도 횟수]

이 때, 다른 Consumer가 실패한 메시지를 다시 자동적으로 읽어오도록 XAUTOCLAIM을 설정할 수 있다. 예시는 아래와 같다.

> XAUTOCLAIM race:italy italy_riders Alice 60000 0-0 COUNT 1
1) "0-0"
2) 1) 1) "1692632662819-0"
      2) 1) "rider"
         2) "Sam-Bodden"

race:italy Stream을 구독하는 italy_riders Group 내의 Alice라는 Conumser에 설정한 상황이며, 다른 Consumer가 소비한지 60,000ms이상 경과한 보류 메시지를 읽도록 한다.

메시지 자체에 오류가 있어 소비되지 못한 상황도 있으므로, COUNT를 1로 설정하여 한 번 실패한 메시지까지만 가져오게 할 수도 있다.
이 때, 오류가 지속적으로 발생한 메시지는, 일정 Consume COUNT에 도달하면 해당 메시지를 다른 스트림(오류 알림 전용)에 넣고 시스템에 알림을 보내도록 처리할 수 있다.

Redis Stream에서 RDB 및 AOF 적용

메시지의 영속성을 위해, Redis에서 지원하는 RDB와 AOF를 사용할 수 있다.

  • RDB(Redis DataBase): 스냅샷 방식으로, 일정 조건에 따라 메모리 내용을 바이너리 파일로 디스크에 저장한다.
  • AOF(Append-Only File): 레디스 서버에 데이터의 업데이트 명령이 수행될 때마다, 해당 명령어의 로그를 디스크에 appendonly.aof파일로 남긴다.

Redis Stream에서 RDB / AOF를 적용하고자 할 때 고려해야 할 사항은 다음과 같다.

  1. 애플리케이션에서 메시지 지속성이 중요한 경우 강력한 fsync 정책과 함께 AOF를 사용해야 한다. (fsync: AOF가 디스크에 동기화 되는 시간 간격)

  2. 기본적으로 비동기식 복제(BGSAVE)는 XADD명령 또는 Consumer Group 상태 변경 사항이 복제되도록 보장하지 않는다. 장애 조치 후 마스터에서 데이터를 수신하는 복제본의 능력에 따라 뭔가가 누락될 수 있다.

  3. WAIT명령은 변경 사항을 replica set에 강제로 업데이트하기 위해 사용될 수 있으며, 이렇게 하면 데이터가 손실될 가능성이 매우 낮다.
    하지만 Sentinel 또는 Redis Cluster에서 운영하는 Redis Failover 프로세스는 가장 업데이트된 복제본에 대한 Failover를 수행하기 때문에, 특정 장애 조건에서는 일부 데이터가 부족한 복제본으로 복구될 수 있다.

싱글 스레드

하나의 레디스 서버 내에 여러 Stream을 생성할 수 있지만, Redis 내부는 여전히 싱글스레드로 동작한다.

따라서, 빠른 성능을 위해서는 Redis Cluster를 생성하는 것이 좋다.

ID

1. Entry ID 구성
각 메시지(Entry)의 Entry ID는 두 부분으로 구성된다.

<millisecondsTime>-<sequenceNumber>
  • 밀리초 부분은 Redis 노드의 로컬 시간이지만, 현재 밀리초 시간이 이전 입력 시간보다 작은 경우 이전 입력 시간이 대신 사용된다.

  • 시퀀스 번호는 밀리초가 동일할 경우 사용된다. 시퀀스 번호는 64비트까지 지원된다.

2. 범위 지정

XRANGE [stream 이름] + - COUNT 1

ID가 타임스탬프로 생성되므로, 이러한 ID를 사용하여 XRANGE로 연관된 항목(시간순)을 검색하거나, 특정 시점 이후의 모든 Entry을 읽고 처리할 수 있다.

3. Consumer Group 구분
Consumer Group별 첫 ID가 있어, 그 이전의 데이터는 읽어오지 않도록 하여 Group을 구분한다.

XGROUP CREATE [stream 이름] [group 이름] $

($는 현재 스트림에서 가장 큰(가장 마지막) ID를 의미한다.)

여기서 Kafka의 Consumer Group과의 차이가 발생한다.

  • Kafka는 Consumer Group 내의 Consumer와 Partition은 성능상 1:1로 맺어지길 권장된다.
  • Redis Stream의 Consumer Group 내의 모든 Consumer는 동일한 Stream을 바라보고 있다. 같은 Stream에서 서로 다른 범위의 ID를 소비한다.

4. offset

Kafka의 offset처럼 Redis Stream은 자신이 읽은 가장 최신의 메시지 ID를 추적하고 저장하여 다음에 읽을 메시지의 시작위치를 알 수 있다.

메시지 삭제

Redis Stream은 RabbitMQ처럼 ACK를 사용하긴 하지만, 소비된 메시지가 바로 삭제되지 않고 Stream에 계속 남아있다는 점에서 Kafka와 유사하다.

따라서 Kafka처럼 여러 Consumer Group에서 같은 메시지에 접근할 수 있으며, Stream에 남아있지만 더이상 사용하지 않는 메시지를 처리하는 방법에 대한 이해가 필요하다

1. XDEL

명시적으로 삭제 명령을 할 수 있다. 만약, XACK를 보낸 메시지에 바로 XDEL(삭제) 명령어를 보내면 RabbitMQ와 비슷하게 동작할 수도 있다.

2. XADD의 MAXLEN

Redis stream에서는 XADD로 메시지를 추가할 때, MAXLEN 옵션을 사용하여 TTL(Time to Live)을 설정할 수 있다.

다만 Kafka에서는 메시지 별 생존 시간을 정하는 것과는 달리, Redis Stream에서는 하나의 스트림에 들어갈 수 있는 메시지의 최대 개수를 제한하여, MAXLEN을 초과해 들어갈 경우 가장 오래된 메시지는 삭제된다.

XADD mystream MAXLEN ~ 10000 * key value

위의 예시에서는 스트림에 들어온 메시지가 10,000개를 넘어가면 오래된 순서로 메시지를 삭제한다.

3. XTRIM

XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
  • MAXLEN: 오래 된 순서로 MAXLEN 개수만큼 삭제
  • MINID: MINID이전의 메시지 삭제

이 때, threshold 앞에 ~를 붙이면 threshold 언저리 개수로 Stream을 자른다.

XTRIM mystream MAXLEN ~ 1000

Redis는 성능을 얻을 수 있을 때(예: 데이터 구조의 전체 매크로 노드를 제거할 수 없는 경우) 일찍 트리밍을 중지한다.

Redis Stream 사용 시기

  • 이벤트 소싱(예: 사용자 작업, 클릭 등 추적)
  • 센서 모니터링(예: 현장 장치의 판독값)
  • 알림(예: 각 사용자의 알림 기록을 별도의 스트림에 저장)

Spring 적용

https://docs.spring.io/spring-data/redis/reference/redis/redis-streams.html

Broker에서 Redis Stream 실행

docker run -d --name redis -p 6379:6379

간단히 도커로 실행할 수 있다.

docker run -d --name redis -p 6379:6379 -v /home/ubuntu/example/redis/redis.conf:/etc/redis/redis.conf redis redis-server /etc/redis/redis.conf

RDBAOF설정을 추가하고 싶다면 redis/redis.conf파일을 작성하여 볼륨마운팅 할 수 있다.

Producer & Consumer 공통

1. build.gradle

implementation 'org.springframework.boot:spring-boot-starter-data-redis'

2. Application class 어노테이션 추가

@EnableRedisRepositories
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

}

3. application.yml(선택)

spring:
  redis:
    host: ${redis.url}
    port: 6379
    publish:
      rate: 10000
    key: ${example.streamKey}
    group: ${example.group}
    consumer: ${example.consumerName}

4. RedisConfig class

@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.port}")
    private int redisPort;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort));
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());

        /* Java 기본 직렬화가 아닌 JSON 직렬화 설정 */
//        redisTemplate.setHashKeySerializer(new Jackson2JsonRedisSerializer(SmsRedisStream.class));
//        redisTemplate.setHashValueSerializer(new Jackson2JsonRedisSerializer(SmsRedisStream.class));

        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());

        return redisTemplate;
    }
}

Redis에서 제공하는 여러 직렬화 방법이 있으므로, 가장 적절한 방법을 사용하면 된다.
기본 Key,ValueHashKey,HashValue 각각 직렬화 방법을 설정하도록 되어있다.

5. RedisOperator class

RedisTemplate.opsForStream()로 접근할 수 있는 RedisOperations의 인터페이스를 좀 더 직관적으로 사용하기 위해 설정한다.

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisOperator {
    private final RedisTemplate<String, Object> redisTemplate;

    public Object getRedisValue(String key, String field){
        return this.redisTemplate.opsForHash().get(key, field);
    }


    public long increaseRedisValue(String key, String field){
        return this.redisTemplate.opsForHash().increment(key, field, 1);
    }

    public void ackStream(String consumerGroupName, MapRecord<String, Object, Object> message){
        this.redisTemplate.opsForStream().acknowledge(consumerGroupName, message);
    }

    public void claimStream(PendingMessage pendingMessage, String consumerName){
        RedisAsyncCommands commands = (RedisAsyncCommands) this.redisTemplate
                .getConnectionFactory().getConnection().getNativeConnection();

        CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
                .add(pendingMessage.getIdAsString())
                .add(pendingMessage.getGroupName())
                .add(consumerName)
                .add("20")
                .add(pendingMessage.getIdAsString());
        commands.dispatch(CommandType.XCLAIM, new StatusOutput(StringCodec.UTF8), args);
    }

    public MapRecord<String, Object, Object> findStreamMessageById(String streamKey, String id){
        List<MapRecord<String, Object, Object>> mapRecordList = this.findStreamMessageByRange(streamKey, id, id);
        if(mapRecordList.isEmpty()) return null;
        return mapRecordList.get(0);
    }

    public List<MapRecord<String, Object, Object>> findStreamMessageByRange(String streamKey, String startId, String endId){
        return this.redisTemplate.opsForStream().range(streamKey, Range.closed(startId, endId));
    }

    public void createStreamConsumerGroup(String streamKey, String consumerGroupName){
        // if stream is not exist, create stream and consumer group of it
        if (Boolean.FALSE.equals(this.redisTemplate.hasKey(streamKey))){
            RedisAsyncCommands commands = (RedisAsyncCommands) this.redisTemplate
                    .getConnectionFactory()
                    .getConnection()
                    .getNativeConnection();

            CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
                    .add(CommandKeyword.CREATE)
                    .add(streamKey)
                    .add(consumerGroupName)
                    .add("0")
                    .add("MKSTREAM");

            commands.dispatch(CommandType.XGROUP, new StatusOutput(StringCodec.UTF8), args);
        }
        // stream is exist, create consumerGroup if is not exist
        else{
            if(!isStreamConsumerGroupExist(streamKey, consumerGroupName)){
                this.redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
            }
        }
    }

    public PendingMessages findStreamPendingMessages(String streamKey, String consumerGroupName, String consumerName){
        return this.redisTemplate.opsForStream()
                .pending(streamKey, Consumer.from(consumerGroupName, consumerName), Range.unbounded(), 100L);
    }

    public boolean isStreamConsumerGroupExist(String streamKey, String consumerGroupName){
        Iterator<StreamInfo.XInfoGroup> iterator = this.redisTemplate
                .opsForStream().groups(streamKey).stream().iterator();

        while(iterator.hasNext()){
            StreamInfo.XInfoGroup xInfoGroup = iterator.next();
            if(xInfoGroup.groupName().equals(consumerGroupName)){
                return true;
            }
        }
        return false;
    }

    public StreamMessageListenerContainer createStreamMessageListenerContainer(){
        return StreamMessageListenerContainer.create(
                this.redisTemplate.getConnectionFactory(),
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions.builder()
                        .hashKeySerializer(new StringRedisSerializer())
                        .hashValueSerializer(new StringRedisSerializer())
                        .pollTimeout(Duration.ofMillis(20))
                        .build()
        );
    }

}

Producer

@Slf4j
@Service
@RequiredArgsConstructor
@Transactional
public class ProduceService {
    private final RedisTemplate<String, Object> redisTemplate;

    @Value("${spring.redis.key}")
    private String streamKey;
    
    public String redisProducer(byte[] content) {

        try {
            ObjectMapper objectMapper = new ObjectMapper();
            HashMap<String, String> map = new HashMap<>();
            String value = objectMapper.writeValueAsString(content);

            map.put("info", value);
            map.put("claimTime",String.valueOf(System.currentTimeMillis()));

            this.redisTemplate.opsForStream().add(emailStream, map);

        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }

        return "success";

    }
}

Consumer

@Slf4j
@Service
@RequiredArgsConstructor
public class StreamConsumer implements StreamListener<String, MapRecord<String, Object, Object>>, InitializingBean, DisposableBean {

    private Subscription subscription;
    
    @Value("${spring.redis.key}")
    private String exampleStreamKey;
    
    @Value("${spring.redis.group}")
    private String exampleConsumerGroupName;
    
    @Value("${spring.redis.consumer}")
    private String exampleConsumerName;
    
    private final RedisOperator redisOperator;

    @Override
    public void onMessage(MapRecord<String, Object, Object> record) {
        
        Long sendTime = Long.parseLong((String) record.getValue().get("claimTime"));
        String info = (String)record.getValue().get("info");

        //Id를 이용해 stream에 들어갔을 때의 timestamp를 구할 수도 있다.
        Long brokerTime = record.getId().getTimestamp();
        
        // 처리할 로직 구현
        //..

        // 모든 로직이 완료되면, ack stream
        this.redisOperator.ackStream(consumerGroupName, record);
    }

    @Override
    public void destroy() throws Exception {
        if(this.subscription != null){
            this.subscription.cancel();
        }
        if(this.listenerContainer != null){
            this.listenerContainer .stop();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // Stream 기본 정보
        this.streamKey = exampleStreamKey;
        this.consumerGroupName = exampleConsumerGroupName;
        this.consumerName = exampleConsumerName;

        // Consumer Group 설정
        this.redisOperator.createStreamConsumerGroup(streamKey, consumerGroupName);

        // StreamMessageListenerContainer 설정
        this.listenerContainer = this.redisOperator.createStreamMessageListenerContainer();

        //Subscription 설정
        this.subscription = this.listenerContainer.receive(
                Consumer.from(this.consumerGroupName, consumerName),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                this
        );

        // 10밀리초 마다 정보를 가져오게 함.
        // BLOCK과는 다름
        //this.subscription.await(Duration.ofMillis(10));

        // redis listen 시작
        this.listenerContainer.start();
    }
}

더 나아가서

1. Radix Tree

  • Stream에 항목을 추가하는 것: O(1)
  • 단일 Entry에 엑세스: O(n)

여기서 n은 ID의 길이이다. ID는 일반적으로 짧고 고정된 길이이므로 이는 사실상 일정한 시간 조회로 줄어든다. Stream은 Radix Tree로 구현된다.

2. BLOCK

XRANGE, XREAD, XREADGROUP는 기본적으로 Non-Blocking & Synchronous로 동작하지만, BLOCK를 추가할 경우에는 다르게 동작한다. (READ 계열에만 추가할 수 있다.)

BLOCK 이란
Consumer가 메시지를 읽어올 때, 일부 Consumer는 Stream에서 당장 소비할 수 있는 메시지가 없을 수 있다. 이 경우, 계속 메시지를 polling하는 것은 Redis Server에 부하가 생기므로 Consumer을 일정 시간동안 Block시킨다.

  1. 새로운 메시지가 계속 들어오지 않으면, 설정된 시간동안 Blocking 후 다시 받을 메시지를 확인한다.

  2. Blocking 중 새로운 메시지가 들어왔다면, XADDsignalKeyAsReady()를 호출해, 새로 들어온 메시지를 처리해야 하는 키 list에 넣는다.
    XADD동작이 완료되고 다시 READ 동작을 하는 이벤트 루프로 돌아가기 전, 처리해야 하는 키 list에 들어가 있는 ID와, Blocking중인 Consumer 목록을 비교해 소비해야 할 메시지 ID가 있는 경우 Consumer는 새로운 데이터를 ⭐즉시⭐ 받게 된다.

1. XREAD

Blocking + Asynchronous 동작이다.

XREAD BLOCK <timeout> STREAMS <key> <ID>

예를 들어 BLOCK 6000으로 설정했을 때, Stream에 <ID>이후로 읽어올 메시지가 없다면 요청한 Consumer는 6초 동안 대기하게 된다.

Consumer는 BLOCK 대기중에도 다른 명령어를 처리할 수 있으므로 Asynchronous하다.

여러 Consumer에서 동일한 Stream에게 XREAD BLOCK 요청을 하는 상황에서, 만약 Stream에 읽을 수 있는 새로운 메시지가 들어온다면 FIFO처럼 가장 먼저 BLOCK된 Consumer 순으로 메시지가 제공된다.

2. XREADGROUP

Blocking + Synchronous 동작

반면, XREADGROUP은 Consumser Group이 소속된 Stream에서 새로운 메시지를 읽고 처리하고자 할 때 사용되므로, 메시지를 소비하는 작업이 그룹 단위로 동기적으로 진행되어야 한다.

따라서 BLOCK 대기중에 Consumer는 다른 명령어를 수행할 수 없다.

Redis Streams 강의(Python 예제)

https://university.redis.com/courses/ru202/?_ga=2.153838648.1465733428.1707635655-1344845187.1707465847&_gl=1*d2qua9*_ga*MTM0NDg0NTE4Ny4xNzA3NDY1ODQ3*_ga_8BKGRQKRPV*MTcwNzYzNTY1NC4yLjEuMTcwNzY0Mjg0Mi42MC4wLjA.*_gcl_au*MTA0NTY4OTE1My4xNzA3NDY1ODQ3

0개의 댓글

관련 채용 정보