
Spring Batch 공식문서 공부 3번째 포스팅이다.
이번 포스팅에서는 Spring Batch에서 Step을 구성하는 방법을 알아보겠다.
Spring Batch의 Step은 Chunk 지향 처리 방식을 사용한다.
Chunk 지향 처리는 데이터를 한 번에 하나씩 읽고 트랜잭션 경계 내에서 기록되는 Chunk를 생성한다.
단계는 다음과 같은 방식으로 구성할 수 있다.
@Bean
public Step sampleStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
위와 같은 방식으로 StepBuilder를 사용해서 단계를 구성한다.
위처럼 단계에서 사용되는 ItemReader, ItemWriter는 필수 구성 요소이지만, ItemProcessor는 선택적으로 구성할 수 있다.
또한 Chunk를 구성할 때는 입력, 출력 단위와 한 Chunk의 갯수, transactionManager를 설정한다. 위 코드는 입력 타입 String, 출력 타입 String, 한 청크는 10개의 아이템으로 구성된다. 따라서 한 번의 트랜잭션 범위 안에서 처리할 아이템의 개수는 10개로 구성된다.
Step의 처리가 시작될 때 트랜잭션이 시작된다.
또한 read가 호출될때마다 카운터가 증가하고, 카운터가 아이템의 개수 (위의 예시에는 10)에 도달하면 집계된 항목 목록들이 전달되고 ItemWriter 트랜잭션이 커밋된다.
Restart는 Step에 많은 영향을 미치므로 특정 구성이 필요할 수 있다.
Step의 시작 횟수를 제어해야 하는 상황이 존재한다. StartLimit을 설정하는 방법은 다음과 같다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
예를 들어, 데이터 중복/오염 발생 위험 때문에 해당 Step이 한번만 실행되어야 하는 경우, 위의 설정처럼 startLimit를 1로 설정할 수 있다.
이 경우에 만약 실행 횟수가 1을 초과했을때 StartLimitExceededException 이 던져진다.
startLimit의 기본값은 int의 최댓값으로 설정된다.
재시작 가능한 작업의 경우, 처음 성공 여부와 관계없이 항상 실행되어야 하는 단계가 있을 수 있다. 예를 들어, 유효성 검사 단계나 Step 처리 전 리소스 정리 단계가 있다. 이 경우에는 아래와 같이 구성할 수 있다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
처리 중 발생한 오류가 실패로 이어지지 않고 건너뛰어야 하는 경우가 많다. 예를 들어, 재무 데이터는 송금으로 이어지기 때문에 건너뛸 수 없으며, 송금은 정확하게 정확해야 한다. 반면 공급업체 목록을 로드하는 경우에는 건너뛸 수 있다. 이처럼 문제가 없는 경우에는 Step을 스킵할 수 있다. 방법은 아래와 같다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
위의 설정을 보면 FlatFileParseException 이 발생하면 해당 Step은 건너뛰어지고 총 건너뛰기 제한은 10으로 설정된다. (건너뛰기 제한의 기본 설정은 10이다. 위 설정은 명시적으로 설정한 것임)
만약 특정 예외가 발생했을 때 스킵을 제외하고 싶다면 다음과 같이 설정하면 된다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
위의 예시는 noSkip 속성으로 FileNotFoundException을 지정했다. Exception이 발생하면 skip하지만, FileNotFoundException이 발생했으면 skip하지 않는다. 이처럼 skip을 구체화 할 때 사용한다. (skip이 없이 noSkip 단독으로 사용되지 않는다)
대부분의 경우 예외가 건너뛰기 또는 Step실패를 유발하기를 원한다. 그러나 모든 예외가 결정적인 것은 아니다.
만약 동시성제어의 잠금에서 예외가 발생했을 경우, 기다렸다가 다시 시도하면 성공할 수 있다.
재시도 설정은 다음과 같이 구성하면 된다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
위 설정은 DeadlockLoserDataAccessException이 발생했을 때 재시도를 시도하며, 3번까지 재시도가 가능하다.
롤백은 다음과 같이 제어할 수 있다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class)
.build();
}
Spring Batch에서 ItemReader가 트랜잭션 경계 안에서 실행되는지 여부를 제어하는 설정이다.
공식문서만 봐서는 이 설정이 왜 필요한지 충분한 설명이 없어서 추가하자면, 이 설정은 롤백 시 Reader의 작업도 무효화하여 재시도 안정성을 확보하는데 의의가 있다.
다음과 같이 설정할 수 있다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue()
.build();
}
readerIsTransactionalQueue 설정은 Step의 청크 처리에서 ItemReader를 트랜잭션 내에서 실행되도록 지정한다. 기본값은 false이며, 이 설정을 사용하면 청크 트랜잭션이 시작된 이후에 Reader가 실행되도록 보장한다.
트랜잭션 속성을 설정하는 방법은 다음과 같다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute)
.build();
}
Spring Batch에서 ItemStream을 Step에 등록하는 것은 재시작 가능한 배치 처리를 위해 매우 중요하다. ItemStream은 open(), update(), close() 메서드를 통해 실행 상태를 관리하며, 실패 시 재시작에 필요한 상태 정보를 제공한다.
ItemStream 인터페이스는 배치 처리 중 실행 상태를 유지하고 복원하는 데 사용된다. ItemReader, ItemWriter, ItemProcessor 중 일부는 이 인터페이스를 구현하여 자체적으로 상태를 관리한다.
예를 들어, FlatFileItemReader는 현재 읽고 있는 라인 번호를 저장하여 실패 후 재시작 시 해당 위치부터 다시 읽을 수 있다.
ItemReader, ItemWriter, ItemProcessor가 직접 ItemStream을 구현한 경우, Spring Batch는 이를 자동으로 인식하고 등록한다.CompositeItemWriter와 같이 내부에 여러 ItemWriter를 위임(delegation)하는 구조에서는, 위임된 구성 요소들이 ItemStream을 구현하더라도 자동으로 등록되지 않는다. 이 경우, 각 구성 요소를 명시적으로 Step에 등록해야 한다.ItemStream 등록은 아래와 같이 설정하면 된다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1()) // 수동 등록
.stream(fileItemWriter2()) // 수동 등록
.build();
}
@Bean
public CompositeItemWriter<String> compositeItemWriter() {
CompositeItemWriter<String> writer = new CompositeItemWriter<>();
writer.setDelegates(Arrays.asList(fileItemWriter1(), fileItemWriter2()));
return writer;
}
Spring Batch에서 Step 실행을 가로채는 기능은 배치 처리의 다양한 단계에서 사용자 정의 로직을 삽입할 수 있도록 도와준다. 이를 통해 로깅, 모니터링, 오류 처리, 상태 저장 등의 작업을 유연하게 수행할 수 있다.
리스너는 아래와 같이 등록할 수 있다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
다음은 주요 Listener 인터페이스와 기능을 알아보겠다.
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
Step 시작 전과 종료 후에 로직을 실행한다. afterStep은 ExitStatus를 반환하여 Step의 종료 상태를 조정할 수 있다.
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
트랜잭션 내에서 청크 처리의 시작과 종료 시점에 로직을 실행한다. 예를 들어, 청크 처리 전 초기화 작업이나 청크 처리 후 정리 작업을 수행할 수 있다.
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
아이템 읽기 전후 및 읽기 오류 발생 시 로직을 실행한다. 예를 들어, 읽기 오류 발생 시 로그를 남기거나 오류를 처리할 수 있다.
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
아이템 처리 전후 및 처리 오류 발생 시 로직을 실행한다. 예를 들어, 처리된 결과를 검증하거나 오류를 처리할 수 있다.
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
아이템 쓰기 전후 및 쓰기 오류 발생 시 로직을 실행한다. 예를 들어, 쓰기 전 데이터 검증이나 쓰기 오류 처리 등을 수행할 수 있다.
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
읽기, 처리, 쓰기 단계에서 스킵된 아이템에 대한 로직을 실행한다. 예를 들어, 스킵된 아이템을 로그에 기록하거나 별도로 저장할 수 있다.
Step을 처리할 때 Chunk지향 처리 방식 외에도 Tasklet 기반으로 처리할 수 있다.
Takslet은 Chunk방식과 다르게 매우 단순한 형태로, 하나의 작업을 수행할 때 사용된다. Tasklet은 Chunk방식에 비해, 구현이 간단하고 직관적이다. 따라서 복잡한 Reader, Processor, Writer 구성을 필요로하지 않는다는 장점이 있다.
TaskletStep 설정 방법은 다음과 같다
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.tasklet(myTasklet(), transactionManager)
.build();
}
StepBuilder의 tasklet() 메서드에 Tasklet 구현체를 전달해야 한다.
많은 배치 작업에는 주요 처리가 시작되기 전에 다양한 리소스를 설정하거나 처리가 완료된 후 해당 리소스를 정리하기 위해 수행해야 하는 단계가 포함된다.
Tasklet의 구성 예시는 다음과 같다.
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory(), "The resource must be a directory");
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.state(directory != null, "Directory must be set");
}
}
주어진 디렉토리 내의 모든 파일을 삭제하는 Tasklet의 예시이다.
각 execute의 호출은 트랜잭션으로 래핑된다.
또한 execute은 RepeatStatus를 리턴하는데, 이 상태값에 따라 계속 반복해서 작업을 수행할지, Step을 종료할지 판단한다.
RepeatStatus의 종류는 2가지이다
Tasklet 기반 Step을 구성할 때 반드시 Tasklet 구현체를 작성해야 하는건 아니다. TaskletAdapter는 기존에 존재하는 메서드를 Tasklet으로 래핑하여 사용할 수 있게 도와준다. 예를 들어, 기존 DAO 메서드를 Tasklet으로 사용하려는 경우 함수를 재사용할 수 있게 해준다.
TaksletAdapter를 사용하여 Takslet을 구성하는 방법은 다음과 같다.
@Bean
public Tasklet taskletAdapter() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(myService);
adapter.setTargetMethod("performTask");
return adapter;
}
Step을 구성할 때, 다양한 흐름이 있다.
각 흐름을 제어하기 위한 설정 방법을 알아보자.
가장 간단한 순차적인 흐름 시나리오이다.
순차적인 흐름 시나리오는 .next()를 사용하여 구성할 수 있다.
@Bean
public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) {
return new JobBuilder("job", jobRepository)
.start(stepA)
.next(stepB)
.next(stepC)
.build();
}
위 설정은 stepA → stepB → stepC 순서로 실행되는 순차적 흐름 설정이다.
분기적인 흐름을 구성할 수도 있다.
위는 성공 여부에 따라 StepB, StepC로의 흐름을 나타내는 흐름도이다. 분기적인 흐름을 구성하는건 다음과 같이 하면 된다.
@Bean
public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) {
return new JobBuilder("job", jobRepository)
.start(stepA)
.on("*").to(stepB)
.from(stepA).on("FAILED").to(stepC)
.end()
.build();
}
on() 방식은 간단한 패턴 매칭 방식을 사용하여 Step의 실행 결과인 ExitStatus와 매치한다.
패턴에는 특수 문자 두 개만 허용한다.
예를 들어, c*t 는 cat과 count와 일치하고, c?t는 cat과 일치하지만, count와는 일치하지 않는다.
주의할 점
BatchStatus 와 ExitStatus 의 차이점을 정확하게 이해하는 것이 중요하다. 이는 바로 뒤에서 설명하겠다.SpringBatch에서 조건 분기를 구현할 때 혼동을 피하고 의도한 흐름을 정확하게 구현하기 위해 위 두 개념을 반드시 이해하고 구분해야 한다.
| 구분 | BatchStatus | ExitStatus |
|---|---|---|
| 의미 | Step/Job의 실행 상태 (프레임워크 내부에서 사용) | Step/Job이 종료될 때 반환되는 사용자 정의 가능한 값 |
| 타입 | Enum (COMPLETED, FAILED, 등) | 문자열 ("COMPLETED", "FAILED", "SKIPPED", 등) |
| 용도 | Spring Batch 내부 로직/상태 추적 | 조건 분기 및 흐름 제어에 사용 |
| 커스터마이징 | 거의 불가 | 가능 (stepExecution.setExitStatus(new ExitStatus("SKIPPED"))) |
조건 분기 on(), to()는 ExitStatus 기준이다. BatchStatus 기준이 아니다. 예를 들어, ExitStatus가 FAILED 여도, BatchStatus가 COMPLETED일 수 있다는 것을 유의하자!
비즈니스 요구사항에 따라 Job의 흐름을 중단하거나 종료할 수 있다.
Job의 흐름을 중단하는 방법
예시는 다음과 같다.
@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(step2)
.on("FAILED").end()
.from(step2).on("*").to(step3)
.end()
.build();
}
위 설정은 Step2가 실패했을 시 종료하는 설정이다.
@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(step2).on("FAILED").fail()
.from(step2).on("*").to(step3)
.end()
.build();
}
@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job", jobRepository)
.start(step1)
.on("COMPLETED").stopAndRestart(step2)
.end()
.build();
}
각 전이 요소는 Job의 BatchStatus에 영향을 미치며, 이를 통해 Job의 재시작 가능 여부나 후속 처리 로직을 제어할 수 있다.
프로그래밍을 사용해 동적으로 분기를 결정할 수 있다.
프로그래밍을 사용해 동적으로 분기를 설정하는 방법은 다음과 같다.
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
JobExecutionDecider 를 구현하고, 이를 다음과 같이 설정할 수 있다.
@Bean
public Job job(JobRepository jobRepository, MyDecider decider, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(decider).on("FAILED").to(step2)
.from(decider).on("COMPLETED").to(step3)
.end()
.build();
}
SpringBatch는 작업을 병렬 흐름으로 구성하는 방법도 지원한다.
@Bean
public Flow flow1(Step step1, Step step2) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build();
}
@Bean
public Flow flow2(Step step3) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3)
.build();
}
@Bean
public Job job(JobRepository jobRepository, Flow flow1, Flow flow2, Step step4) {
return new JobBuilder("job", jobRepository)
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4)
.end()
.build();
}
처음보면 좀 복잡하긴 한데, 흐름을 말로 설명해보면 다음과 같다
flow1 실행
→ step1 → step2 (순차 실행)
동시에 flow2도 실행
→ step3
☑ 병렬 처리는 .split()과 SimpleAsyncTaskExecutor로 구현됨
→ flow1과 flow2는 병렬로 동시에 실행됨
두 Flow가 모두 완료된 후, step4 실행
병렬 실행 최적화가 필요한 경우 유용할 것 같다.
좀 길었는데, Step을 구성하는 방법을 공부해봤다.
아직까진 어려운건 없는데, 내용이 매우 많아서 단순히 글만 보는걸로는 제대로 학습했는지 잘 모르겠다. 얼른 공식문서 다 보고, 실제로 배치 시스템을 구성하고 싶다.
다음 포스팅은 ItemReader, ItemWriter 설명으로 돌아오겠다.