Redis stream 정리

김영상 (dudtkd1221)·2026년 1월 18일

1. Redis Stream이란?

Redis 5.0부터 추가된 append-only 로그 형태의 자료구조다. 로그 파일처럼 데이터가 끝에만 추가되지만, O(1) 시간복잡도로 랜덤 액세스가 가능하고 Consumer Group을 통해 여러 소비자가 메시지를 나눠 처리할 수 있습니다.

타임스탬프 기반으로 고유 ID가 자동 생성되고, 블로킹 읽기를 지원해서 실시간 데이터 스트리밍에 적합합니다.

주로 이벤트 소싱, 센서 모니터링, 알림 시스템, 실시간 분석 같은 곳에서 사용합니다.


2. 기본 개념

2.1 Message ID 구조

Redis Stream의 각 엔트리는 고유한 ID를 가지며, 형식은 <밀리초>-<시퀀스번호>입니다.

1526919030474-0
└─────┬─────┘ └┬┘
  밀리초    시퀀스
  • 밀리초 부분: Redis 인스턴스의 Unix 시간(밀리초)
  • 시퀀스 번호: 동일 밀리초 내 생성된 엔트리 구분용 (64비트)

Redis는 ID의 단조 증가를 보장합니다. 시스템 시간이 과거로 점프하더라도, 이전 엔트리의 시간을 사용하여 순서를 유지합니다.

2.2 Entry 구조

각 엔트리는 field-value 쌍의 리스트로 구성된다. Redis Hash와 비슷한 구조라고 보면 된다.

1526919030474-0
  field1: value1
  field2: value2
  field3: value3

예를 들어 주문 데이터를 저장한다면:

1526919030474-0
  user_id: 123
  product: laptop
  quantity: 1
  price: 1500000

이런 식으로 한 메시지 안에 여러 필드를 넣을 수 있다. 최소 1개 이상의 field-value 쌍이 필요하고, 필드명과 값 모두 문자열로 저장된다.


3. Producer - 메시지 추가

3.1 XADD - 기본 사용법

XADD는 Stream에 데이터를 추가하는 유일한 명령어다.

XADD mystream * field1 value1 field2 value2
# 출력: "1526919030474-0"
  • *를 사용하면 Redis가 자동으로 ID 생성
  • 최소 1개 이상의 field-value 쌍 필요
  • field와 value는 평탄한(flat) 구조로 나열한다 (JSON이 아님)

중요: XADD는 JSON을 직접 받지 않는다. field-value를 순서대로 나열하는 방식이다:

# ✅ 올바른 방식 - 여러 필드로 분리
XADD orders * user_id 123 product laptop quantity 1 price 1500000

# ✅ JSON을 문자열로 저장하는 방식
XADD orders * data '{"user_id":123,"product":"laptop","quantity":1}'

# ❌ 잘못된 방식 - JSON 객체를 직접 전달할 수 없음
XADD orders * {"user_id":123,"product":"laptop"}

필드를 분리하는 첫 번째 방식이 권장된다. 나중에 필요한 필드만 쉽게 조회하고 인덱싱할 수 있기 때문이다.

3.2 명시적 ID 지정

XADD mystream 1526919030474-55 message "Hello"
  • 최소 유효 ID는 0-1
  • 반드시 Stream 내 기존 ID보다 큰 값이어야 함

3.3 MAXLEN - 스트림 크기 제한

# 정확히 1000개로 제한 (비효율적)
XADD mystream MAXLEN 1000 * field value

# 대략 1000개로 제한 (효율적, 권장)
XADD mystream MAXLEN ~ 1000 * field value

~ 옵션을 사용하면 내부 radix tree 구조로 인해 실제로는 1000개보다 약간 더 많을 수 있지만, 성능상 이점이 있습니다.

3.4 NOMKSTREAM 옵션 (Redis 6.2+)

# Stream이 없으면 에러 반환 (생성 방지)
XADD mystream NOMKSTREAM * field value

4. Consumer - 메시지 읽기 (단순)

4.1 XRANGE - 범위 조회

# 전체 조회
XRANGE mystream - +

# ID 범위 지정
XRANGE mystream 1526919030474-0 1526919030500-0

# 개수 제한
XRANGE mystream - + COUNT 10
  • -: 최소 ID (처음부터)
  • +: 최대 ID (끝까지)

4.2 XREVRANGE - 역순 조회

XREVRANGE mystream + - COUNT 10

최신 메시지부터 역순으로 조회합니다.

는 첫 XREAD 호출에만 사용하고, 이후엔 마지막으로 받은 메시지의 ID를 사용해야 중간 메시지를 놓치지 않는다.

# 첫 번째 호출
XREAD BLOCK 1000 STREAMS orders $
# 반환: 1526919030474-5

# 두 번째 호출 (마지막 받은 ID 사용)
XREAD BLOCK 1000 STREAMS orders 1526919030474-5
# 반환: 1526919030474-6, 1526919030474-7

# ❌ 잘못된 방식 - $ 계속 사용하면 중간 메시지 놓칠 수 있음
XREAD BLOCK 1000 STREAMS orders $

4.4 여러 Stream 동시 읽기

XREAD COUNT 1 STREAMS stream1 stream2 0 0

5. Consumer Group - 핵심 개념

5.1 Consumer Group이란?

Consumer Group은 여러 Consumer가 협력하여 Stream을 소비할 수 있게 하는 메커니즘입니다.

작동 방식:
1. 메시지가 Consumer Group 내 Consumer들에게 분배됩니다
2. 각 메시지는 한 Consumer에게만 전달됩니다 (라운드로빈)
3. 처리 완료 시 ACK 필요

Consumer Group 없이 XREAD를 사용하면: 모든 클라이언트가 모든 메시지를 받습니다 (Fan-out).

Consumer Group 사용 시: 메시지가 Consumer들에게 분산됩니다 (Work Queue).

5.2 PEL (Pending Entry List)

PEL은 Consumer에게 전달되었지만 아직 ACK되지 않은 메시지 목록입니다.

  • Consumer Group 레벨: 전체 Pending 메시지
  • Consumer 레벨: 각 Consumer의 Pending 메시지

6. Consumer Group 명령어

6.1 XGROUP CREATE - Consumer Group 생성

# 처음부터 읽기
XGROUP CREATE mystream mygroup 0

# 새 메시지만 읽기
XGROUP CREATE mystream mygroup $

# Stream이 없어도 생성 (Redis 6.2+)
XGROUP CREATE mystream mygroup 0 MKSTREAM

매개변수:

  • 0: 처음부터 소비
  • $: Group 생성 이후 메시지만 소비

6.2 XREADGROUP - Group에서 읽기

# 새 메시지 읽기
XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream >

# Pending 메시지 확인
XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0

# 블로킹 모드
XREADGROUP GROUP mygroup consumer1 BLOCK 2000 COUNT 10 STREAMS mystream >

특수 ID >: 아직 어떤 Consumer에게도 전달되지 않은 새 메시지만 요청합니다.

ID 0 사용: 현재 Consumer의 Pending 메시지를 확인할 수 있습니다 (crash 복구용).

6.3 XACK - 메시지 처리 완료

XACK mystream mygroup 1526919030474-0

PEL에서 메시지를 제거하여 처리 완료를 표시합니다.

6.4 NOACK 옵션

XREADGROUP GROUP mygroup consumer1 COUNT 10 NOACK STREAMS mystream >

메시지를 PEL에 추가하지 않습니다. 신뢰성이 필요 없고 occasional 메시지 유실이 허용되는 경우 사용합니다.

6.5 기본 Consumer 패턴

# 의사 코드
WHILE true:
    entries = XREADGROUP GROUP mygroup consumer1 BLOCK 2000 COUNT 10 STREAMS mystream >
    
    if entries == nil:
        print("Timeout... try again")
        continue
    
    for message in entries:
        process_message(message.id, message.fields)
        # 처리 완료 후 ACK
        XACK mystream mygroup message.id

7. 장애 처리 - Claim 메커니즘

7.1 XPENDING - Pending 메시지 확인

요약 정보:

XPENDING mystream mygroup
# 출력:
# 1) (integer) 2          # 총 Pending 개수
# 2) "1526984818136-0"    # 최소 ID
# 3) "1526984818136-1"    # 최대 ID
# 4) 1) 1) "consumer1"    # Consumer별 Pending 개수
#       2) "2"

상세 정보:

XPENDING mystream mygroup - + 10
# 각 메시지별 정보:
# 1) ID
# 2) Consumer 이름
# 3) Idle time (밀리초)
# 4) 전달 횟수

Idle time 필터링 (Redis 6.2+):

# 9초 이상 idle 메시지만 조회
XPENDING mystream mygroup IDLE 9000 - + 10

7.2 XCLAIM - 메시지 소유권 가져오기

다른 Consumer가 실패했을 때 메시지를 가져와 재처리합니다.

# 1시간 이상 idle 메시지 claim
XCLAIM mystream mygroup Alice 3600000 1526569498055-0

동작 조건:

  • 메시지의 idle time이 지정한 min-idle-time보다 커야 함
  • 메시지가 Stream의 PEL에 존재해야 함

옵션:

  • IDLE <ms>: idle time 설정
  • TIME <unix-ms>: 절대 시간으로 idle time 설정
  • RETRYCOUNT <count>: 재시도 카운터 설정
  • FORCE: PEL에 없어도 강제로 claim (메시지는 존재해야 함)
  • JUSTID: 메시지 내용 없이 ID만 반환 (retry counter 증가 안 됨)

7.3 XAUTOCLAIM - 자동 Claim (Redis 6.2+)

XCLAIM보다 효율적인 자동 claim 메커니즘입니다.

XAUTOCLAIM mystream mygroup consumer2 3600000 0-0 COUNT 10

Cursor 기반으로 동작하여 대량의 Pending 메시지를 순회하며 claim할 수 있습니다.


8. Stream 관리 명령어

8.1 XTRIM - Stream 크기 줄이기

# 정확히 1000개로 trim
XTRIM mystream MAXLEN 1000

# 대략 1000개로 trim (효율적)
XTRIM mystream MAXLEN ~ 1000

# 특정 ID 이전 메시지 삭제
XTRIM mystream MINID 1526919030474-0

# LIMIT으로 한 번에 검사할 엔트리 수 제한 (Redis 6.2+)
XTRIM mystream MAXLEN ~ 1000 LIMIT 100

LIMIT 옵션: CPU 사용량을 제어하기 위해 한 번에 검사할 엔트리 수를 제한합니다. 기본값은 매크로 노드당 엔트리 수 × 100입니다.

8.2 XDEL - 특정 메시지 삭제

XDEL mystream 1526919030474-0 1526919030474-1

지정한 ID의 메시지를 삭제합니다. PEL에 있는 메시지도 삭제 가능하지만, PEL 엔트리는 자동으로 제거되지 않습니다.

8.3 XLEN - Stream 길이 확인

XLEN mystream
# 출력: (integer) 5

9. 모니터링 명령어

9.1 XINFO STREAM - Stream 정보

XINFO STREAM mystream

반환 정보:

  • length: Stream 길이
  • radix-tree-keys/nodes: 내부 자료구조 정보
  • last-generated-id: 마지막 생성 ID
  • entries-added: 누적 추가된 엔트리 수
  • groups: Consumer Group 개수
  • first-entry/last-entry: 첫/마지막 엔트리

FULL 옵션:

XINFO STREAM mystream FULL

모든 엔트리와 Consumer Group의 상세 정보를 포함합니다.

9.2 XINFO GROUPS - Consumer Group 정보

XINFO GROUPS mystream

반환 정보:

  • name: Group 이름
  • consumers: Consumer 수
  • pending: Pending 메시지 수
  • last-delivered-id: 마지막 전달 ID
  • entries-read: Group이 읽은 엔트리 수 (휴리스틱)
  • lag: 아직 전달되지 않은 메시지 수

9.3 XINFO CONSUMERS - Consumer 정보

XINFO CONSUMERS mystream mygroup

반환 정보:

  • name: Consumer 이름
  • pending: Pending 메시지 수
  • idle: 마지막 활동 이후 시간 (밀리초)
  • inactive: 마지막 성공적인 상호작용 이후 시간

9.4 XGROUP 관리 명령어

# Consumer Group 삭제
XGROUP DESTROY mystream mygroup

# Last delivered ID 설정
XGROUP SETID mystream mygroup 1526919030474-0

# Consumer 삭제
XGROUP DELCONSUMER mystream mygroup consumer1

# Consumer 생성
XGROUP CREATECONSUMER mystream mygroup consumer1

10. 실전 패턴

10.1 Work Queue 패턴

단일 Consumer Group에 여러 Worker를 두어 메시지를 분산 처리합니다.

# Worker 1, 2, 3이 동일한 Group 소속
XREADGROUP GROUP workers worker1 COUNT 10 STREAMS tasks >
XREADGROUP GROUP workers worker2 COUNT 10 STREAMS tasks >
XREADGROUP GROUP workers worker3 COUNT 10 STREAMS tasks >

10.2 Fan-out 패턴

여러 Consumer Group을 생성하여 모든 Group이 모든 메시지를 받습니다.

# 각기 다른 Group
XREADGROUP GROUP analytics consumer1 STREAMS events >
XREADGROUP GROUP notifications consumer2 STREAMS events >
XREADGROUP GROUP logging consumer3 STREAMS events >

10.3 Dead Letter Queue 구현

재시도 횟수를 추적하여 일정 횟수 실패 시 별도 Stream으로 이동:

pending = XPENDING mystream mygroup - + 100

for message_id, consumer, idle_time, delivery_count in pending:
    if delivery_count > 5:
        # DLQ로 이동
        data = XRANGE mystream message_id message_id
        XADD dlq_stream * original_id message_id data ...
        XACK mystream mygroup message_id
        XDEL mystream message_id

11. 메모리 관리 전략

11.1 자동 Trimming

XADD 시점에 자동으로 trim:

# Producer에서 메시지 추가 시 자동 trim
XADD mystream MAXLEN ~ 10000 * field value

11.2 주기적 Trimming

Cron job이나 별도 프로세스로 주기적 정리:

# 매시간 실행
XTRIM mystream MAXLEN ~ 100000 LIMIT 1000

11.3 ACK 후 삭제

메시지 처리 후 명시적 삭제:

# 처리 완료
XACK mystream mygroup message_id
# 즉시 삭제
XDEL mystream message_id

주의: XACK는 PEL에서만 제거하며, Stream에서는 삭제하지 않습니다. 메모리 절약을 위해서는 XDEL이 필요합니다.


12. 성능 고려사항

12.1 시간 복잡도

  • XADD: O(1) - trimming 없을 때
  • XADD with trim: O(N) - N은 삭제될 엔트리 수
  • XRANGE: O(N) - N은 반환될 엔트리 수
  • XREAD/XREADGROUP: O(N) - N은 반환될 엔트리 수
  • XACK: O(1)
  • XPENDING: O(N) - N은 Consumer 수 (요약), Consumer당 Pending 수 (상세)

12.2 블로킹 성능

XREADGROUP BLOCK 사용 시, XADD가 블로킹된 클라이언트 수만큼 O(N) 시간이 걸립니다. 많은 Consumer가 블로킹하면 XADD 성능에 영향을 줄 수 있습니다.

12.3 최적화 팁

  1. COUNT 적절히 설정: 너무 크면 메모리 사용량 증가, 너무 작으면 왕복 횟수 증가
  2. ~ 옵션 사용: MAXLEN 사용 시 항상 ~ 사용 권장
  3. Pipeline 활용: 여러 XADD를 pipeline으로 묶어 네트워크 오버헤드 감소
  4. Connection Pooling: Connection 재사용

13. 주의사항 및 제약

13.1 메모리

Redis는 인메모리 DB이므로 Stream이 무한정 커질 수 없습니다. 반드시 MAXLEN이나 XTRIM으로 크기 관리가 필요합니다.

13.2 순서 보장

  • Consumer Group 내에서 여러 Consumer가 동시 처리 시 처리 완료 순서는 보장되지 않습니다
  • 엄격한 순서가 필요하면 단일 Consumer 사용 또는 파티셔닝 필요

13.3 중복 처리

Redis Stream은 at-least-once 전달을 보장합니다:

  • Consumer 장애 시 메시지가 다른 Consumer에게 재전달될 수 있음
  • Application에서 멱등성(idempotency) 구현 필요

13.4 PEL 관리

  • ACK되지 않은 메시지는 PEL에 계속 남아 메모리 사용
  • 주기적으로 XPENDING 확인하고 오래된 메시지 claim 또는 삭제 필요

13.5 Persistence

  • RDB/AOF를 사용하여 Stream 데이터 영속화 가능
  • Consumer Group 상태(PEL 포함)도 함께 영속화됩니다
  • Replica에도 모든 상태가 복제됩니다

14. 다른 메시지 큐와 비교

Kafka와 비교

유사점:

  • Consumer Group 개념
  • Append-only log 구조
  • Offset(ID) 기반 읽기

차이점:

  • Redis: 인메모리(빠르지만 메모리 제약), 간단한 설정
  • Kafka: 디스크 기반(대용량), 복잡하지만 기능 풍부

RabbitMQ와 비교

유사점:

  • 메시지 라우팅
  • ACK 메커니즘

차이점:

  • Redis: Pull 모델, 메시지 영구 보존 가능
  • RabbitMQ: Push 모델, 소비 후 삭제, 더 다양한 라우팅

Redis Pub/Sub와 비교

Redis Pub/Sub:

  • 메시지 영속성 없음
  • Consumer 연결 안 되어 있으면 메시지 유실
  • 단순 브로드캐스트

Redis Stream:

  • 메시지 영속성
  • Consumer 오프라인이어도 나중에 소비 가능
  • Consumer Group으로 분산 처리

참고 자료

profile
아직 배고프다

0개의 댓글