Spark Structured Streaming에서 stateful 파이프라인을 다루다 보면 state 크기가 커질수록 OOM이나 GC pause 문제가 생긴다.
이게 HDFSBackedStateStore의 한계인데, 이걸 해결하는 게 RocksDB State Store다.
그래서 RocksDB가 뭔지, Spark에서 어떻게 동작하는지 정리해본다.
RocksDB는 Facebook이 Google의 LevelDB를 fork해서 만든 embedded key-value store다.
LSM-Tree 구조를 기반으로 하며, 별도 서버 프로세스 없이 라이브러리 형태로 애플리케이션에 내장된다.
Kafka, Flink, Spark, TiKV 등 다양한 분산 시스템에서 로컬 디스크 기반 고속 저장소로 사용한다.
RocksDB의 모든 성능 특성은 LSM-Tree (Log-Structured Merge Tree) 에서 비롯된다.
Write
↓
WAL (Write-Ahead Log) — crash 복구 보장
↓
MemTable (메모리, skip list 구조) — 실제 쓰기
↓
Immutable MemTable — flush 대기 상태
↓
L0 SSTable (디스크)
↓
Compaction → L1 → L2 → ... → Ln SSTable
모든 쓰기가 sequential append 방식이라 random write보다 훨씬 빠르다.
WAL 덕분에 MemTable이 메모리에 있는 상태에서 crash가 나도 복구 가능하다.
Read
↓
MemTable → Immutable MemTable → L0 → L1 → ... → Ln
↑
Bloom Filter로 각 레벨에서 key 없으면 스킵
최신 데이터일수록 상위 레이어에 있어 최신 key는 빠르게 찾는다.
오래된 key는 하위 레벨까지 탐색이 필요해 read amplification이 생길 수 있다.
| 전략 | 특징 | 적합한 경우 |
|---|---|---|
| Leveled (기본) | read 성능 우선, 각 레벨 크기 제한 | 읽기 빈도 높은 workload |
| Universal | write 성능 우선, 전체 크기 균등 유지 | 쓰기 빈도 높은 workload |
| FIFO | 오래된 파일 자동 삭제 | TTL 기반 데이터, 최신 N개만 유지 |
하나의 RocksDB 인스턴스 안에서 논리적으로 분리된 key-value 공간이다.
각 Column Family는 독립적인 MemTable, SSTable, Compaction 설정을 가진다.
RocksDB Instance
├── Column Family: default
├── Column Family: user_state
└── Column Family: session_state
Spark 4.0의 transformWithState에서 multiple state variables를 Column Family로 분리해서 관리하는 방식이 여기서 나온다.
Stateful streaming 연산(aggregation, deduplication, stream-stream join, transformWithState)을 하면 중간 상태를 어딘가에 저장해야 한다. Spark는 이걸 State Store 라는 추상 레이어로 관리하는데, 구현체가 두 가지다.
| HDFSBackedStateStore (기본) | RocksDBStateStore | |
|---|---|---|
| 저장 위치 | 메모리 (JVM heap) + HDFS checkpoint | 로컬 디스크 (RocksDB) + HDFS checkpoint |
| 메모리 사용 | state 전체를 heap에 올림 | 필요한 블록만 Block Cache에 올림 |
| 대용량 state | OOM 위험 | 안정적 |
| GC 영향 | 큼 | 거의 없음 (off-heap) |
| 적합한 경우 | state 크기 작고 단순한 경우 | state가 크거나 오래 유지되는 경우 |
microbatch N 처리
│
├── RocksDB (로컬 디스크)
│ ├── state read (key lookup + Bloom Filter)
│ └── state write (WAL → MemTable)
│
├── microbatch 완료 후
│ └── snapshot → checkpoint dir (HDFS / MinIO / S3)
│
└── executor 재시작 시
└── checkpoint에서 snapshot restore → RocksDB 재구성
spark = SparkSession.builder \
.config(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
) \
.getOrCreate()
| 버전 | 주요 변화 |
|---|---|
| 3.2 | RocksDBStateStore 최초 도입 (experimental) |
| 4.0 | transformWithState 도입 — Column Family 기반 multiple state variables 지원 |
| 4.1 | lock 관리 revamp, Unified Memory Manager 통합, snapshot lag 감지, 파일 레벨 checksum 검증, automatic snapshot repair, state checkpoint format v2 지원 |
| 설정 키 | 설명 | 기본값 |
|---|---|---|
spark.sql.streaming.stateStore.rocksdb.blockCacheSize | Block Cache 크기 | 8MB |
spark.sql.streaming.stateStore.rocksdb.writeBufferSize | MemTable 크기 | 64MB |
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber | Immutable MemTable 최대 수 | 2 |
spark.sql.streaming.stateStore.rocksdb.compactOnCommit | microbatch commit 시 compaction 수행 여부 | false |
spark.sql.streaming.stateStore.rocksdb.enableChangelogCheckpointing | changelog 기반 증분 checkpoint (format v2) | false |
기본 checkpoint 방식은 매 microbatch마다 전체 state snapshot을 업로드한다.
state가 커질수록 checkpoint I/O가 병목이 되는 문제가 있다.
enableChangelogCheckpointing=true 로 설정하면 변경분(delta)만 업로드한다.
기본 방식: 매 microbatch → 전체 snapshot 업로드 → I/O 스파이크
changelog: 매 microbatch → delta만 업로드 → 복구 시 changelog replay
트레이드오프: checkpoint I/O는 줄어들지만, 장애 복구 시 changelog를 replay해야 해서 restore 시간이 길어질 수 있다.
| 상황 | 선택 |
|---|---|
| state 크기가 수백 MB 이상 | ✅ RocksDB |
| 긴 세션 윈도우, 오랜 기간 유지되는 key | ✅ RocksDB |
| executor heap 제약이 있을 때 | ✅ RocksDB |
transformWithState로 다중 state 관리 | ✅ RocksDB |
| state 크기 작고 TTL 짧음 | HDFS (기본값으로 충분) |
| stateless 파이프라인 | State Store 불필요 |