
ZipInputStream → ZipFile 전환 + Spring Batch Partitioning Step 도입대규모 이미지 데이터셋을 받아 오토라벨링 파이프라인으로 흘려보내는 서비스를 만들 때였다. 클라이언트는 수십만~수백만 장의 이미지를 하나의 ZIP 파일로 묶어 올리고, 우리 서버는 이것을 항목별로 풀어 MinIO에 저장한다.
초기 구현은 ZipInputStream으로 순차 스트리밍 처리였다. 처리 속도는 약 10개/초. 100만 장이면 약 27시간이다.
ZipInputStream → Entry 순서대로 읽기 (seek 불가)
→ LargeFileChunkProcessor (검증)
→ LargeFileWriter → MinIO PUT
(완전 순차)
당장의 고객 규모에서는 버틸 수 있겠지만, 향후 대응 및 3D 센서 파이프라인 확장으로 단일 ZIP이 1M → 10M+ 이미지로 커질 것이 예정되어 보였다.
27시간짜리 작업은 실패 시 재시작 비용을 최대한 줄이고 싶었다!
taskExecutor 병렬화 롤백가장 먼저 시도한 것은 단순했다. Spring Batch StepBuilder에 taskExecutor를 끼워 청크를 병렬로 돌리는 방식.
// 최초 시도 (실패)
.taskExecutor(threadPoolTaskExecutor)
.throttleLimit(10)
💥 결과: "Stream closed" 예외 폭발.
원인은 ZipInputStream의 근본적인 설계에 있었다!
java.util.zip.ZipInputStream:
- 내부적으로 단일 스트림 포인터 유지
- 현재 Entry를 읽는 중 다른 스레드가 next() 호출 → 포인터 충돌
- Thread-unsafe, seek 불가
당시 결정이 ADR-02: ZipInputStream 순차 처리 (taskExecutor 제거)다. 안정성을 위해 순차로 돌아갔고, 이 결정은 이후 ZipFile 기반 병렬화로 Superseded 될 때까지 유지됐다.
ZipFile의 thread-safety 보장결국 돌파구는 Java 표준 라이브러리 Javadoc에 있었다.
java.util.zip.ZipFile (JDK 8+):
"Reading from a ZipFile from multiple threads is safe"
⭐ ZipInputStream은 순차 스트림이지만, ZipFile은 Central Directory를 통해 임의 Entry에 독립적인 InputStream을 반환할 수 있다. 각 getInputStream(entry) 호출이 서로 독립적인 스트림이므로, 여러 스레드가 동시에 서로 다른 Entry를 읽어도 안전할 수 있었다!
// ✅ ZipFile: 각 호출이 독립 스트림 반환
ZipFile zipFile = new ZipFile(tempFile);
InputStream stream1 = zipFile.getInputStream(entry1); // Worker 1
InputStream stream2 = zipFile.getInputStream(entry2); // Worker 2 (안전)
이것이 옵션 B — ZipFile random-access + Spring Batch Partitioning 선택의 근거다.
| 옵션 | 방식 | 디스크 추가 비용 | 채택 |
|---|---|---|---|
| A | ZIP을 먼저 압축 해제 → 일반 파일 병렬 처리 | +2배 | ❌ |
| B | ZipFile random-access + Partitioning | +0 | ✅ |
옵션 A는 구현이 단순하지만 100만 장 ZIP 기준 디스크를 2배 잡아먹는다.
옵션 B는 ZIP 파일을 그대로 두고 여러 Worker가 각자의 Entry 범위만 읽는 방식이라 디스크 비용이 없다.
Batch Job 시작
│
▼
Master Step: ZipFile로 Central Directory 1회 스캔
→ Entry 목록 전체를 BatchFileJobEntry 테이블에 INSERT (status=PENDING)
→ Entry 목록을 gridSize개로 균등 분할
│
▼
TaskExecutorPartitionHandler
├── Worker 0: partition 0번 Entry 처리 (ZipFile.getInputStream → MinIO PUT)
├── Worker 1: partition 1번 Entry 처리
├── ...
└── Worker 9: partition 9번 Entry 처리
│
▼
Master AfterStep (PartitionAggregator)
→ BatchFileJobEntry COUNT(COMPLETED) → successCount 집계
→ BatchFileJob 1회 update
BatchFileJobEntry순수 JobExecutionContext에 Entry 목록을 올리는 방식은 직렬화 한계로 100만 Entry를 다룰 수 없다. 별도 테이블이 필요했다.
CREATE TABLE batch_file_job_entry (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
batch_job_id BIGINT NOT NULL,
partition_id INT NOT NULL,
entry_name VARCHAR(500) NOT NULL,
status VARCHAR(20) NOT NULL, -- PENDING/PROCESSING/COMPLETED/FAILED
saved_file_id VARCHAR(64), -- 성공 시 MinIO UUID
...
INDEX idx_bfje_job_partition (batch_job_id, partition_id, status)
);
이 테이블 덕분에 재시작(Resume) 도 가능해졌다. 50% 처리 중 서버가 죽어도, 재시작 시 status=COMPLETED Entry는 skip하고 PENDING/PROCESSING Entry만 다시 처리한다.
public class ZipEntryPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
List<BatchFileJobEntry> entries = repository.findByBatchJobId(batchJobId);
// entries를 gridSize개 그룹으로 균등 분할
// 각 그룹 → ExecutionContext { partitionId: 0, 1, ..., N-1 }
...
}
}
@StepScope)@StepScope
@Component
public class PartitionedZipReader implements ItemReader<FileResource> {
@Value("#{stepExecutionContext['partitionId']}") int partitionId;
@Override
public FileResource read() {
// 자기 partitionId의 PENDING Entry만 fetch
// ZipFile.getInputStream(entry) → readAllBytes() → FileResource.of(name, type, bytes)
}
}
@StepScope이 핵심이다. Worker마다 독립적인 Bean 인스턴스가 생성되어 partitionId가 각자에게 올바르게 주입된다.
public class PartitionAggregator implements JobExecutionListener {
@Override
public void afterJob(JobExecution jobExecution) {
// 모든 Worker 완료 후 1회 집계
long success = entryRepository.countByBatchJobIdAndStatus(batchJobId, COMPLETED);
long fail = entryRepository.countByBatchJobIdAndStatus(batchJobId, FAILED);
batchFileJobService.updateResult(batchJobId, success, fail);
}
}
✅ "청크마다 DB를 update하지 말고, 마지막에 1회 집계"라는 원칙을 구현한 컴포넌트다.
Phase 1 PoC가 성공적으로 완료되고 dev 환경에 배포했을 때, 두 개의 심각한 버그가 연달아 발생했다.
CHUNK_PROCESSING 영구 고착현상: 배치가 완료됐는데 상태가 CHUNK_PROCESSING에서 COMPLETED로 전환되지 않는다.
원인 추적:
BatchProcessingStatus.isAllDone() 조건:
return totalChunks == completedChunks;
문제:
PartitionAggregator.afterJob()이 집계를 수행하기 전에
totalChunks 필드가 설정되지 않은 채로 실행 → totalChunks = 0
결과:
completedChunks가 10이 돼도 '0 == 10'은 false
→ isAllDone() 항상 false → 상태 전환 불가
수정:
@Override
public void afterJob(JobExecution jobExecution) {
// ⭐ 집계 전에 totalChunks를 먼저 확정
batchFileUploadService.updateTotalChunks(batchJobId, gridSize);
// 이후 집계
long success = entryRepository.countByBatchJobIdAndStatus(...);
...
}
실행 순서가 중요한 전형적인 초기화 선행 문제였다.
successCount=0 / status=FAILED현상: 배치가 끝났고, MinIO에 파일도 다 올라가 있는데 DB에는 successCount=0, status=FAILED로 남는다.
원인 추적:
시나리오:
LargeFileWriter (Worker 모드)에서
chunk 처리 완료 후 BatchFileJobEntry의 status를
PROCESSING → COMPLETED/FAILED로 확정하지 않음
결과:
PartitionAggregator.afterJob()이 실행될 때
모든 Entry가 여전히 PROCESSING 상태
→ COUNT(WHERE status=COMPLETED) = 0
→ successCount = 0 → 안전 판정 → status = FAILED
수정:
// LargeFileWriter.write() 완료 시점에 벌크 상태 확정
@Override
public void write(Chunk<? extends ProcessedFileResult> chunk) {
// ... 처리 로직 ...
// ⭐ 청크 완료 시 Entry 상태를 벌크 UPDATE
updateEntryStatuses(successFileIds, failedEntryIds);
}
Writer가 처리 결과를 "DB에 반영하는 책임"까지 갖고 있어야 했는데, 병렬화 과정에서 집계 책임을 Aggregator로 분리하면서 이 단계가 누락됐다.
버그를 잡고 안정화가 됐는데 이번엔 모니터링 경보(BatchResourceSpikeAspect WARN)가 폭주했다.
PartitionedZipReader는 readAllBytes()로 이미지 전체를 힙에 로드한다.
byte[] bytes = zipFile.getInputStream(entry).readAllBytes();
FileResource resource = FileResource.of(name, type, bytes); // 클로저에 byte[] 보관
Spring Batch는 chunkSize개가 쌓일 때까지 Reader를 계속 호출한다. 즉 write()가 호출되는 시점에는 chunkSize × 평균 파일크기 만큼의 힙이 순간 점유된다.
chunkSize=500, 평균 이미지 1.15MB
→ 순간 heap_delta = 573MB → WARN 다발
chunkSize를 줄이면:
- heap_delta = 200 × 최대 2MB = 400MB (허용 범위)
- 트랜잭션 롤백 단위 축소 (chunk 실패 시 재처리 대상 감소)
- DB 커밋 횟수 2.5배 증가 (MinIO I/O 바운드 환경에서 영향 미미 — 실측 확인)
단순히 chunkSize만 줄인다고 끝이 아니었다. 원래 임계값 자체도 순차 단일 스레드 시절 기준이었다.
| 조정 시점 | HEAP_DELTA_WARN_MB | THREAD_WARN_COUNT | 변경 이유 |
|---|---|---|---|
| 초기 설계 (순차) | 200 | 30 | 단일 스레드 기준 |
| 병렬화 초안 | 200 | 50 | gridSize=10 예측치 |
| 1차 실측 후 | 300 | 80 | heap_delta=573MB 실측 |
| chunkSize=200 + 최종 | 450 | 90 | heap_delta=369MB + 실측 스레드 81개 |
최종 임계값 산출 공식:
HEAP_DELTA_WARN_MB = chunkSize × maxExpectedFileSizeMB × 1.125 (안전 마진)
= 200 × 2MB × 1.125 = 450MB
THREAD_WARN_COUNT = 실측 최대 스레드(81) + 보유 마진(9) = 90
heap_delta WARN(청크 크기 특성상 구조적으로 발생)과 heap_ratio=85% WARN(실제 OOM 위험 경보)을 분리한 것이 핵심이다. 후자는 절대로 건드리지 않는다.
dev 환경, 2,400장 ZIP, gridSize=10, chunkSize=200
수정 전: 5분
수정 후: 3분 (~40% 단축)
이론상 100만 장 기준으로는 27시간 → 약 3~5시간대를 기대할 수 있다 (MinIO TPS 한계에 의존).
| 항목 | 값 |
|---|---|
parallel.enabled | true |
grid-size | 10 |
chunk-size | 200 (ADR-05) |
HEAP_DELTA_WARN_MB | 450 (ADR-06) |
THREAD_WARN_COUNT | 90 (ADR-06) |
| Phase 1~4 | ✅ 전부 완료 |
1. thread-safety는 Javadoc을 먼저 읽어라
ZipInputStream이 thread-unsafe라는 것은 Javadoc에 명시되어 있다. ZipFile의 thread-safety 보장도 마찬가지다. 구현하기 전에 표준 라이브러리 Javadoc 한 줄이 27시간짜리 문제를 풀었다.
2. 집계 책임 분리는 실행 순서를 명시적으로 보장해야 한다
Bug #3 (totalChunks=0)과 Bug #4 (successCount=0)는 모두 "집계를 Aggregator로 분리했지만, 그 Aggregator가 올바른 선행 조건 하에서 실행되는지"를 검증하지 않아서 발생했다. 책임 분리 후에는 실행 순서 계약을 코드와 테스트로 명시해야 한다.
3. 모니터링 임계값은 아키텍처 변경 시 함께 바꿔야 한다
임계값은 한번 설정하면 잊기 쉽다. 순차 처리 기준으로 잡은 숫자가 병렬화 후 전혀 다른 의미를 갖게 됐다. 임계값 조정 이력을 ADR로 남긴 것이 나중에 chunkSize나 gridSize를 바꿀 때 재산출 공식을 찾을 수 있게 해준다.
4. @StepScope은 병렬 처리의 기본 전제다
Spring Batch에서 싱글톤 Bean으로 Worker Step을 구현하면 각 partition이 같은 인스턴스를 공유하여 상태가 뒤섞인다. 병렬 Step에 참여하는 모든 Stateful Bean은 @StepScope이어야 한다.
이상이 블로그 초안