Redis 5.0부터 추가된 append-only 로그 형태의 자료구조다. 로그 파일처럼 데이터가 끝에만 추가되지만, O(1) 시간복잡도로 랜덤 액세스가 가능하고 Consumer Group을 통해 여러 소비자가 메시지를 나눠 처리할 수 있습니다.
타임스탬프 기반으로 고유 ID가 자동 생성되고, 블로킹 읽기를 지원해서 실시간 데이터 스트리밍에 적합합니다.
주로 이벤트 소싱, 센서 모니터링, 알림 시스템, 실시간 분석 같은 곳에서 사용합니다.
Redis Stream의 각 엔트리는 고유한 ID를 가지며, 형식은 <밀리초>-<시퀀스번호>입니다.
1526919030474-0
└─────┬─────┘ └┬┘
밀리초 시퀀스
Redis는 ID의 단조 증가를 보장합니다. 시스템 시간이 과거로 점프하더라도, 이전 엔트리의 시간을 사용하여 순서를 유지합니다.
각 엔트리는 field-value 쌍의 리스트로 구성된다. Redis Hash와 비슷한 구조라고 보면 된다.
1526919030474-0
field1: value1
field2: value2
field3: value3
예를 들어 주문 데이터를 저장한다면:
1526919030474-0
user_id: 123
product: laptop
quantity: 1
price: 1500000
이런 식으로 한 메시지 안에 여러 필드를 넣을 수 있다. 최소 1개 이상의 field-value 쌍이 필요하고, 필드명과 값 모두 문자열로 저장된다.
XADD는 Stream에 데이터를 추가하는 유일한 명령어다.
XADD mystream * field1 value1 field2 value2
# 출력: "1526919030474-0"
*를 사용하면 Redis가 자동으로 ID 생성중요: XADD는 JSON을 직접 받지 않는다. field-value를 순서대로 나열하는 방식이다:
# ✅ 올바른 방식 - 여러 필드로 분리
XADD orders * user_id 123 product laptop quantity 1 price 1500000
# ✅ JSON을 문자열로 저장하는 방식
XADD orders * data '{"user_id":123,"product":"laptop","quantity":1}'
# ❌ 잘못된 방식 - JSON 객체를 직접 전달할 수 없음
XADD orders * {"user_id":123,"product":"laptop"}
필드를 분리하는 첫 번째 방식이 권장된다. 나중에 필요한 필드만 쉽게 조회하고 인덱싱할 수 있기 때문이다.
XADD mystream 1526919030474-55 message "Hello"
0-1# 정확히 1000개로 제한 (비효율적)
XADD mystream MAXLEN 1000 * field value
# 대략 1000개로 제한 (효율적, 권장)
XADD mystream MAXLEN ~ 1000 * field value
~ 옵션을 사용하면 내부 radix tree 구조로 인해 실제로는 1000개보다 약간 더 많을 수 있지만, 성능상 이점이 있습니다.
# Stream이 없으면 에러 반환 (생성 방지)
XADD mystream NOMKSTREAM * field value
# 전체 조회
XRANGE mystream - +
# ID 범위 지정
XRANGE mystream 1526919030474-0 1526919030500-0
# 개수 제한
XRANGE mystream - + COUNT 10
-: 최소 ID (처음부터)+: 최대 ID (끝까지)XREVRANGE mystream + - COUNT 10
최신 메시지부터 역순으로 조회합니다.
는 첫 XREAD 호출에만 사용하고, 이후엔 마지막으로 받은 메시지의 ID를 사용해야 중간 메시지를 놓치지 않는다.
# 첫 번째 호출
XREAD BLOCK 1000 STREAMS orders $
# 반환: 1526919030474-5
# 두 번째 호출 (마지막 받은 ID 사용)
XREAD BLOCK 1000 STREAMS orders 1526919030474-5
# 반환: 1526919030474-6, 1526919030474-7
# ❌ 잘못된 방식 - $ 계속 사용하면 중간 메시지 놓칠 수 있음
XREAD BLOCK 1000 STREAMS orders $
XREAD COUNT 1 STREAMS stream1 stream2 0 0
Consumer Group은 여러 Consumer가 협력하여 Stream을 소비할 수 있게 하는 메커니즘입니다.
작동 방식:
1. 메시지가 Consumer Group 내 Consumer들에게 분배됩니다
2. 각 메시지는 한 Consumer에게만 전달됩니다 (라운드로빈)
3. 처리 완료 시 ACK 필요
Consumer Group 없이 XREAD를 사용하면: 모든 클라이언트가 모든 메시지를 받습니다 (Fan-out).
Consumer Group 사용 시: 메시지가 Consumer들에게 분산됩니다 (Work Queue).
PEL은 Consumer에게 전달되었지만 아직 ACK되지 않은 메시지 목록입니다.
# 처음부터 읽기
XGROUP CREATE mystream mygroup 0
# 새 메시지만 읽기
XGROUP CREATE mystream mygroup $
# Stream이 없어도 생성 (Redis 6.2+)
XGROUP CREATE mystream mygroup 0 MKSTREAM
매개변수:
0: 처음부터 소비$: Group 생성 이후 메시지만 소비# 새 메시지 읽기
XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream >
# Pending 메시지 확인
XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0
# 블로킹 모드
XREADGROUP GROUP mygroup consumer1 BLOCK 2000 COUNT 10 STREAMS mystream >
특수 ID >: 아직 어떤 Consumer에게도 전달되지 않은 새 메시지만 요청합니다.
ID 0 사용: 현재 Consumer의 Pending 메시지를 확인할 수 있습니다 (crash 복구용).
XACK mystream mygroup 1526919030474-0
PEL에서 메시지를 제거하여 처리 완료를 표시합니다.
XREADGROUP GROUP mygroup consumer1 COUNT 10 NOACK STREAMS mystream >
메시지를 PEL에 추가하지 않습니다. 신뢰성이 필요 없고 occasional 메시지 유실이 허용되는 경우 사용합니다.
# 의사 코드
WHILE true:
entries = XREADGROUP GROUP mygroup consumer1 BLOCK 2000 COUNT 10 STREAMS mystream >
if entries == nil:
print("Timeout... try again")
continue
for message in entries:
process_message(message.id, message.fields)
# 처리 완료 후 ACK
XACK mystream mygroup message.id
요약 정보:
XPENDING mystream mygroup
# 출력:
# 1) (integer) 2 # 총 Pending 개수
# 2) "1526984818136-0" # 최소 ID
# 3) "1526984818136-1" # 최대 ID
# 4) 1) 1) "consumer1" # Consumer별 Pending 개수
# 2) "2"
상세 정보:
XPENDING mystream mygroup - + 10
# 각 메시지별 정보:
# 1) ID
# 2) Consumer 이름
# 3) Idle time (밀리초)
# 4) 전달 횟수
Idle time 필터링 (Redis 6.2+):
# 9초 이상 idle 메시지만 조회
XPENDING mystream mygroup IDLE 9000 - + 10
다른 Consumer가 실패했을 때 메시지를 가져와 재처리합니다.
# 1시간 이상 idle 메시지 claim
XCLAIM mystream mygroup Alice 3600000 1526569498055-0
동작 조건:
min-idle-time보다 커야 함옵션:
IDLE <ms>: idle time 설정TIME <unix-ms>: 절대 시간으로 idle time 설정RETRYCOUNT <count>: 재시도 카운터 설정FORCE: PEL에 없어도 강제로 claim (메시지는 존재해야 함)JUSTID: 메시지 내용 없이 ID만 반환 (retry counter 증가 안 됨)XCLAIM보다 효율적인 자동 claim 메커니즘입니다.
XAUTOCLAIM mystream mygroup consumer2 3600000 0-0 COUNT 10
Cursor 기반으로 동작하여 대량의 Pending 메시지를 순회하며 claim할 수 있습니다.
# 정확히 1000개로 trim
XTRIM mystream MAXLEN 1000
# 대략 1000개로 trim (효율적)
XTRIM mystream MAXLEN ~ 1000
# 특정 ID 이전 메시지 삭제
XTRIM mystream MINID 1526919030474-0
# LIMIT으로 한 번에 검사할 엔트리 수 제한 (Redis 6.2+)
XTRIM mystream MAXLEN ~ 1000 LIMIT 100
LIMIT 옵션: CPU 사용량을 제어하기 위해 한 번에 검사할 엔트리 수를 제한합니다. 기본값은 매크로 노드당 엔트리 수 × 100입니다.
XDEL mystream 1526919030474-0 1526919030474-1
지정한 ID의 메시지를 삭제합니다. PEL에 있는 메시지도 삭제 가능하지만, PEL 엔트리는 자동으로 제거되지 않습니다.
XLEN mystream
# 출력: (integer) 5
XINFO STREAM mystream
반환 정보:
FULL 옵션:
XINFO STREAM mystream FULL
모든 엔트리와 Consumer Group의 상세 정보를 포함합니다.
XINFO GROUPS mystream
반환 정보:
XINFO CONSUMERS mystream mygroup
반환 정보:
# Consumer Group 삭제
XGROUP DESTROY mystream mygroup
# Last delivered ID 설정
XGROUP SETID mystream mygroup 1526919030474-0
# Consumer 삭제
XGROUP DELCONSUMER mystream mygroup consumer1
# Consumer 생성
XGROUP CREATECONSUMER mystream mygroup consumer1
단일 Consumer Group에 여러 Worker를 두어 메시지를 분산 처리합니다.
# Worker 1, 2, 3이 동일한 Group 소속
XREADGROUP GROUP workers worker1 COUNT 10 STREAMS tasks >
XREADGROUP GROUP workers worker2 COUNT 10 STREAMS tasks >
XREADGROUP GROUP workers worker3 COUNT 10 STREAMS tasks >
여러 Consumer Group을 생성하여 모든 Group이 모든 메시지를 받습니다.
# 각기 다른 Group
XREADGROUP GROUP analytics consumer1 STREAMS events >
XREADGROUP GROUP notifications consumer2 STREAMS events >
XREADGROUP GROUP logging consumer3 STREAMS events >
재시도 횟수를 추적하여 일정 횟수 실패 시 별도 Stream으로 이동:
pending = XPENDING mystream mygroup - + 100
for message_id, consumer, idle_time, delivery_count in pending:
if delivery_count > 5:
# DLQ로 이동
data = XRANGE mystream message_id message_id
XADD dlq_stream * original_id message_id data ...
XACK mystream mygroup message_id
XDEL mystream message_id
XADD 시점에 자동으로 trim:
# Producer에서 메시지 추가 시 자동 trim
XADD mystream MAXLEN ~ 10000 * field value
Cron job이나 별도 프로세스로 주기적 정리:
# 매시간 실행
XTRIM mystream MAXLEN ~ 100000 LIMIT 1000
메시지 처리 후 명시적 삭제:
# 처리 완료
XACK mystream mygroup message_id
# 즉시 삭제
XDEL mystream message_id
주의: XACK는 PEL에서만 제거하며, Stream에서는 삭제하지 않습니다. 메모리 절약을 위해서는 XDEL이 필요합니다.
XADD: O(1) - trimming 없을 때XADD with trim: O(N) - N은 삭제될 엔트리 수XRANGE: O(N) - N은 반환될 엔트리 수XREAD/XREADGROUP: O(N) - N은 반환될 엔트리 수XACK: O(1)XPENDING: O(N) - N은 Consumer 수 (요약), Consumer당 Pending 수 (상세)XREADGROUP BLOCK 사용 시, XADD가 블로킹된 클라이언트 수만큼 O(N) 시간이 걸립니다. 많은 Consumer가 블로킹하면 XADD 성능에 영향을 줄 수 있습니다.
~ 사용 권장Redis는 인메모리 DB이므로 Stream이 무한정 커질 수 없습니다. 반드시 MAXLEN이나 XTRIM으로 크기 관리가 필요합니다.
Redis Stream은 at-least-once 전달을 보장합니다:
유사점:
차이점:
유사점:
차이점:
Redis Pub/Sub:
Redis Stream: