Redis 메시지 패턴

Always·5일 전

Backend&Devops

목록 보기
17/17
post-thumbnail

개요

4가지 Redis 메시징 패턴의 처리량(msg/sec) 및 장애 처리 로직등을 비교하는 것이 최근에 궁금해졌고, 이에 따라 이를 구현해보고 결과를 도출해보는 과정을 해보았다.


Redis 메시지 패턴 프로젝트

각 패턴별 명령어 및 redis 키

패턴Redis 명령어Redis 키
QueueLPUSH / BRPOPredis:queue
Stream XREADXADD / XREAD BLOCKredis:stream:xread
Stream GroupXADD / XREADGROUP + XACKredis:stream:group
Pub/SubPUBLISH / SUBSCRIBEredis:pubsub

프로젝트 구조

redis/
├── producer/               # 메시지 발행 서버 (Spring Boot, 포트 8080)
├── consumer/               # 메시지 구독 서버 (Spring Boot, 포트 8081)
├── docker-compose.yml      # Redis 컨테이너 설정
└── compare_benchmark.py    # 4가지 패턴 자동 벤치마크 및 비교 스크립트

Producer와 Consumer는 별도 프로세스로 실행해야 함.


기술 스택

모듈버전
Java17
Spring Boot (Producer)4.0.5
Spring Boot (Consumer)3.4.5
Spring Data Redis-
MySQL8.x (Consumer 벤치마크 결과 저장)
Redislatest (Docker)

사전 요구사항

  • Java 17+
  • Docker (Redis 컨테이너용)
  • MySQL (localhost:3306 실행 중, 데이터베이스명: benchmark)
  • Python 3.x + requests 라이브러리 (벤치마크 스크립트용)

시작하기

1. Redis 시작

docker-compose up -d
# → Redis: localhost:6379

2. Producer 시작

cd producer/producer
./gradlew bootRun
# → http://localhost:8080

3. Consumer 시작

cd consumer/consumer
./gradlew bootRun
# → http://localhost:8081

벤치마크 실행

Python 자동 스크립트 (권장)

Producer와 Consumer 서버가 모두 실행 중인 상태에서, 루트 디렉토리에서 compare_benchmark.py를 실행.

pip install requests   # 최초 1회만

python compare_benchmark.py

4가지 패턴을 순서대로 실행함 (queuestream-xreadstream-grouppubsub). 각 패턴마다:

  1. Consumer /start 호출
  2. N개의 메시지 전송 (기본값 N = 1000, 스크립트에서 변경 가능)
  3. Consumer /stop 호출 후 결과 출력

출력 예시:

[QUEUE] Start (1,000 messages)
  Consumer started
  Sending... 1,000/1,000
  Producer done: 2.34s (427 msg/sec)
  Consumer result: {'pattern': 'QUEUE', 'totalMessages': 1000, 'durationMs': 2100, 'throughput': '476.19 msg/sec'}

========== Final Results ==========
  queue           476.19 msg/sec            (total 1,000 / 2100ms)
  stream-xread    390.12 msg/sec            (total 1,000 / 2563ms)
  stream-group    381.45 msg/sec            (total 1,000 / 2621ms)
  pubsub          512.33 msg/sec            (total 1,000 / 1952ms)

참고: 위 수치는 예시이므로 실제 결과는 실행 환경에 따라 다를 수 있음.

수동 실행

각 패턴마다 아래 순서를 반드시 따라야 함.

1. Consumer /start  →  2. Producer /send × N  →  3. Consumer /stop
# 1. Consumer 시작
curl -X POST http://localhost:8081/queue/start

# 2. Producer로 메시지 전송
curl -X POST http://localhost:8080/queue/send -d "message-1"

# 3. Consumer 종료 및 결과 확인
curl -X POST http://localhost:8081/queue/stop

Pub/Sub 주의사항: Consumer가 구독 중인 동안 발행된 메시지만 수신됨. /start 이전에 전송된 메시지는 유실되므로, 반드시 Consumer를 먼저 시작할 것.


API

Producer (:8080)

메서드경로Redis 명령어
POST/queue/sendLPUSH
POST/stream-xread/sendXADD
POST/stream-group/sendXADD
POST/pubsub/sendPUBLISH
  • 요청 본문: 메시지 문자열 (생략 시 기본값 "benchmark-message")
  • 응답: 200 OK / ok

Consumer (:8081)

메서드경로설명
POST/{pattern}/startConsumer 시작
POST/{pattern}/stopConsumer 종료 + 최종 결과 반환
GET/{pattern}/stats실시간 통계 조회
GET/{pattern}/resultsDB에서 과거 결과 목록 조회

{pattern} = queue | stream-xread | stream-group | pubsub

/stop 응답 예시

{
  "pattern": "QUEUE",
  "running": false,
  "totalMessages": 100000,
  "durationMs": 4823,
  "throughput": "20733.47 msg/sec"
}

패턴별 동작 원리

Queue

  1. http://localhost:8081/queue/start 호출을 통해 consumer 프로세스 내부에서 queue-consumer라는 이름의 데몬 쓰레드를 생성하고 실행.
  2. http://localhost:8080/queue/send API를 통해 producer 프로세스가 Redis List(redis:queue)에 LPUSH 명령으로 메시지를 전송.
  3. consumer 쓰레드는 반복문 안에서 1초 timeout의 blocking BRPOP 방식으로 Redis queue에서 메시지를 대기.
  4. 메시지가 존재하면 이를 소비하고, DB I/O 저장 로직을 수행한 후 messageCount를 증가시킴.
  5. producer가 모든 메시지 전송을 마치면 http://localhost:8081/queue/stop API를 호출함. stop API는 consumer 종료 플래그를 내리고, join으로 consumer 쓰레드가 종료될 때까지 최대 30초 대기한 뒤 최종 결과 값을 반환.
mermaid-diagram

Stream XREAD

  1. http://localhost:8081/stream-xread/start 호출을 통해 consumer 프로세스 내부에서 stream-xread-consumer라는 이름의 데몬 쓰레드를 생성하고 실행. 이 시점에 readOffset을 $(latest)로 초기화하여 start 이후에 들어오는 신규 메시지만 읽도록 설정.
  2. http://localhost:8080/stream-xread/send API를 통해 producer 프로세스가 Redis Stream(redis:stream:xread)에 XADD 명령으로 메시지를 전송.
  3. consumer 쓰레드는 반복문 안에서 1초 timeout의 blocking XREAD 방식으로 Redis Stream에서 최대 100건씩 메시지를 대기.
  4. 메시지가 존재하면 수신된 records를 리스트로 변환하여 saveAll()로 일괄 DB I/O 저장한 후 messageCount를 일괄 증가시키고, 다음 읽기를 위해 readOffset을 마지막 수신 메시지 ID로 갱신.
  5. producer가 모든 메시지 전송을 마치면 http://localhost:8081/stream-xread/stop API를 호출함. stop API는 consumer 종료 플래그를 내리고, join으로 consumer 쓰레드가 종료될 때까지 최대 30초 대기함. 쓰레드 종료 전 남은 스트림 메시지를 non-blocking으로 드레인하여 DB 저장한 뒤 최종 결과 값을 반환.
mermaid-diagram (1)

Stream Group

  1. http://localhost:8081/stream-group/start 호출을 통해, 기존 컨슈머 그룹을 삭제 후 XGROUP CREATE ... $ MKSTREAM으로 재생성(PEL 오염 방지)하고, stream-group-consumer라는 이름의 데몬 쓰레드를 생성하고 실행.
  2. http://localhost:8080/stream-group/send API를 통해 producer 프로세스가 Redis Stream(redis:stream:group)에 XADD 명령으로 메시지를 전송.
  3. consumer 쓰레드는 반복문 안에서 1초 timeout의 blocking XREADGROUP 방식으로 ReadOffset >(미전달 신규 메시지)를 기준으로 Redis Stream에서 최대 100건씩 메시지를 대기.
  4. 메시지가 존재하면 수신된 records를 리스트로 변환하여 saveAll()로 일괄 DB I/O 저장한 후 messageCount를 일괄 증가시키고, 이후 XACK 명령으로 처리 완료를 그룹에 알림.
  5. producer가 모든 메시지 전송을 마치면 http://localhost:8081/stream-group/stop API를 호출함. stop API는 consumer 종료 플래그를 내리고, join으로 consumer 쓰레드가 종료될 때까지 최대 30초 대기함. 쓰레드 종료 전 남은 스트림 메시지를 non-blocking으로 드레인하여 DB 저장 및 ACK한 뒤 최종 결과 값을 반환.
mermaid-diagram (3)

Pub/Sub

  1. http://localhost:8081/pubsub/start 호출을 통해 Spring의 RedisMessageListenerContainer에 메시지 리스너를 등록. 별도의 데몬 쓰레드를 직접 생성하지 않으며, 컨테이너 내부 쓰레드 풀이 구독을 관리.
  2. http://localhost:8080/pubsub/send API를 통해 producer 프로세스가 Redis Pub/Sub 채널(redis:pubsub)에 PUBLISH 명령으로 메시지를 전송.
  3. consumer는 이벤트 드리븐 방식으로 동작함. 메시지가 채널에 발행되면 onMessage() 콜백이 즉시 호출됨.
  4. 콜백 내에서 메시지를 DB I/O 저장한 후 messageCount를 증가시킴. Pub/Sub 특성상 메시지는 영속되지 않으며 subscriber가 없으면 유실됨.
  5. producer가 모든 메시지 전송을 마치면 http://localhost:8081/pubsub/stop API를 호출함. stop API는 컨테이너에서 리스너를 제거(thread.join 없음)하고 즉시 최종 결과 값을 반환.
mermaid-diagram (4)

실제 벤치마크 결과

로컬 환경(Windows 11, Docker Redis, MySQL)에서 각 5회 반복 측정한 결과임.

1,000 메시지 (5회 측정)

회차QueueStream XREADStream GroupPub/Sub
132.78 msg/sec115.83 msg/sec108.50 msg/sec82.95 msg/sec
252.20 msg/sec125.74 msg/sec152.16 msg/sec145.50 msg/sec
376.45 msg/sec200.20 msg/sec134.46 msg/sec140.96 msg/sec
447.81 msg/sec157.31 msg/sec147.84 msg/sec131.67 msg/sec
541.32 msg/sec139.68 msg/sec143.12 msg/sec142.21 msg/sec
평균50.11 msg/sec147.75 msg/sec137.22 msg/sec128.66 msg/sec

10,000 메시지 (5회 측정)

회차QueueStream XREADStream GroupPub/Sub
197.95 msg/sec145.58 msg/sec65.82 msg/sec85.21 msg/sec
267.56 msg/sec125.37 msg/sec121.77 msg/sec215.75 msg/sec
3131.97 msg/sec184.18 msg/sec175.27 msg/sec163.58 msg/sec
429.60 msg/sec302.21 msg/sec272.66 msg/sec255.51 msg/sec
5157.83 msg/sec269.98 msg/sec273.20 msg/sec220.51 msg/sec
평균96.98 msg/sec205.46 msg/sec181.74 msg/sec188.11 msg/sec

결과 분석

Queue가 가장 느린 이유

Queue는 BRPOP으로 메시지를 한 건씩 처리함. Stream 패턴과 달리 배치 처리가 없어 메시지 1,000건을 소비하려면 Redis 왕복을 1,000번 수행해야 함. DB I/O도 건별로 발생하므로 누적 지연이 큼. 또한 1초 timeout의 blocking 특성상 큐가 순간적으로 비면 최대 1초 낭비가 생김. 이로 인해 측정 편차도 크고(29 ~ 157 msg/sec), 같은 조건에서도 결과가 크게 흔들림.

Stream XREAD가 가장 빠른 이유

XREAD BLOCK 한 번에 최대 100건을 읽어 saveAll()로 일괄 DB 저장함. Redis 왕복 횟수와 DB 왕복 횟수가 모두 최대 1/100로 줄어들어 처리량이 높음. 메시지 수가 많을수록(10,000건 기준 평균 205 msg/sec) 배치 효율이 더 두드러짐.

Stream Group이 XREAD보다 약간 느린 이유

동일한 배치 처리 구조이지만, 메시지 처리 후 반드시 XACK를 호출해야 함. 배치당 한 번의 추가 Redis 명령이 발생하며, Consumer Group 상태 관리(PEL) 오버헤드도 더해짐. 기능(장애 재처리, 다중 컨슈머)을 얻는 대신 소폭 성능을 희생하는 구조임.

Pub/Sub의 특성

이벤트 드리븐 방식으로 메시지가 도착하는 즉시 onMessage() 콜백이 호출됨. 폴링 오버헤드가 없어 낮은 지연 특성을 가지지만, 콜백은 메시지 한 건씩 처리되므로 DB I/O 배치 효율은 Queue와 동일하게 낮음. 10,000건 기준 평균 188 msg/sec로 Stream Group(181 msg/sec)과 유사한 수준임. 단, 메시지가 영속되지 않아 Consumer가 오프라인 상태이면 유실됨.

분산이 큰 이유

모든 패턴에서 측정 편차가 상당함. 주요 원인은 다음과 같다고 예상.

  • MySQL 커넥션 풀 경쟁: DB 저장 부하가 높아질수록 커넥션 대기 지연이 발생함.
  • 로컬 실행 환경: Producer, Consumer, Redis, MySQL이 모두 같은 머신에서 동작하므로 OS 스케줄러 및 메모리 압박의 영향을 받음.

뭔가 결과를 무조건적으로 따른다기보다는 queue,stream-xread,stream-group,pubsub의 경향성에 대해서 생각을 해보는것이 좋을듯하다.


장애 처리 시나리오

장애 주입 방식

각 Consumer에는 4번째 메시지(또는 배치)마다 예외를 강제 발생시키는 Fault Injection 로직이 내장되어 있음 (processCount % 4 == 0). 이 시나리오로 패턴별 메시지 유실 여부와 재처리 가능성을 비교함.

패턴별 장애 처리 특성

Queue

BRPOP으로 큐에서 꺼내는 순간 메시지가 제거됨. 처리 중 장애가 발생하면 해당 메시지는 즉시 유실되며 복구 불가. 장애 처리를 위해서는 원본 큐 → processing 큐로 먼저 옮긴 뒤 처리 완료 시 삭제하는 별도 구조가 필요함.

Pub/Sub

메시지를 저장하지 않으므로 처리 중 장애가 나면 유실. 구독 중이 아닐 때 발행된 메시지도 유실됨. 장애 처리를 위해서는 Queue 또는 Stream과 병행 저장하는 구조가 필요함.

Stream XREAD

Stream에 메시지가 남아 있으므로 즉시 유실은 아님. 하지만 Consumer Group 기능(ACK, PEL)이 없어, 장애 발생 시 마지막으로 성공한 메시지 ID(offset)를 직접 저장하고 해당 위치부터 재처리해야 함. offset 갱신을 처리 성공 후에만 수행하면 재시도는 가능하지만, /finish 호출 시 드레인 단계에서 예외 발생 시 바로 루프를 빠져나가 남은 메시지를 모두 처리하지 못하는 케이스가 발생함.

Stream Group (XREADGROUP)

Consumer Group 기반으로 ACK되지 않은 메시지는 PEL(Pending Entries List)에 잔류함. 장애 발생 시 메시지가 유실되지 않고, 같은 Consumer가 "0-0" offset으로 재읽어 재처리할 수 있음. Consumer 자체가 죽은 경우에는 XAUTOCLAIM으로 다른 Consumer가 PEL을 가져가 복구 가능. 단, 재처리/멱등성 로직은 직접 구현해야 함.

이 프로젝트의 StreamGroupConsumer에는 다음 재시도 로직이 구현되어 있음:

  • 장애 발생 → ACK 생략 → PEL에 메시지 잔류
  • 다음 루프에서 "0-0" offset으로 PEL 재읽기
  • 지수 백오프(100ms → 200ms → 400ms)로 재시도
  • MAX_PEL_RETRIES(3회) 초과 시 해당 배치 포기 (운영 환경에서는 DLQ/알람 필요)

장애 주입 벤치마크 결과 (100 메시지, 5회 측정)

회차QueueStream XREADStream GroupPub/Sub
13건 / 2.67 msg/sec6건 / 8.34 msg/sec100건 / 89.37 msg/sec75건 / 94.70 msg/sec
227건 / 45.76 msg/sec6건 / 9.84 msg/sec100건 / 166.39 msg/sec75건 / 114.16 msg/sec
330건 / 42.98 msg/sec9건 / 6.36 msg/sec100건 / 146.20 msg/sec75건 / 114.85 msg/sec
430건 / 51.81 msg/sec6건 / 9.71 msg/sec100건 / 164.47 msg/sec75건 / 76.61 msg/sec
530건 / 47.85 msg/sec6건 / 9.52 msg/sec100건 / 167.50 msg/sec75건 / 98.04 msg/sec

결과 분석

  • Queue: 4번째 메시지마다 팝 후 유실. /finish 이후 드레인 단계에서 첫 장애 발생 시 루프 중단 → 잔여 메시지 전부 미처리. 1회차처럼 드레인 시작 직후 장애가 나면 3건만 저장됨.
  • Stream XREAD: offset 갱신은 저장 성공 후 수행하므로 재시도 구조는 갖춰져 있으나, 드레인 단계에서 예외 발생 시 루프를 바로 탈출 → 매 실행 6~9건에 그침.
  • Stream Group: PEL 재처리 덕분에 5회 모두 100건 완전 복구. 장애가 발생해도 ACK 없이 PEL에 잔류 → "0-0" 재읽기 → 재처리 → ACK로 이어지는 재시도 루프가 동작함.
  • Pub/Sub: 4번째마다 유실, 복구 불가. 100 × 3/4 = 75건이 일관되게 저장됨 (이벤트 드리븐 특성상 드레인 단계 없이 바로 결과 반환).

느낀점

현재의 Stream Group의 유실시 재시도 로직시 XAUTOCLAIM을 통해서는 다른 컨슈머에서의 메시지 복구 가능이 redis를 msa 환경에서 사용할 때 매우 중요한 포인트 일것 같다.
또한 현재는 같은 네트워크 상에서 프로세스 끼리 프로듀서,컨슈머의 역할을 하기 때문에 네트워크 레이턴시 등이 벤치 마크 결과에 반영되어있지않다.

다음번에는 실제 msa환경에서 테스트해야겠다.

깃허브 링크

https://github.com/Munhangyeol/Redis-messaging-benchmark

profile
🐶개발 블로그

0개의 댓글