Dandi에서 날씨 API를 호출해 날씨 정보를 DB에 저장하는 Batch를 진행하고 있습니다. ItemProcessor에서 날씨 API 호출이 진행됩니다. 배치에서 1628번의 공공 데이터 포털에 API 호출을 해야하고, 공공 데이터 포털의 API 응답은 그리 빠르지도 않습니다. 따라서, 쿼리 변경을 통한 ItemWriter 성능 개선을 했습니다. 다음으로, Batch 성능을 시스템적으로 개선할 수 있는 방법에 대해 고민하고 있습니다. Spring Batch 성능 개선 방식 중 병렬화와 관련된 아래 3개에 대해서 알아보겠습니다.
Spring Batch 공식문서와 Spring Batch 완벽 가이드 서적을 참고했습니다.
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
하나의 Chunk를 하나의 스레드가 담당하는 방식
입니다. 만약, Chunk Size에 따라 3개의 Chunk가 존재한다. 3개의 Batch 작업이 병렬로 이뤄지는 것입니다.
Step을 구성할 때 TaskExecutor를 지정해주면 됩니다.
throttleLimit 값도 함께 증가
시켜줘야합니다. 기본 값은 4입니다.throttleLimit 값은 DB Connection Pool Size를 넘지 않아야
합니다. 예를 들어, 20개의 Chunk를 20개의 스레드가 병렬로 실행하는 상황에서 Pool Size가 10이라면 병목이 발생합니다.Thread Safe한 ItemReader와 ItemWriter를 사용
해야 합니다. 사용하려는 ItemReader가 Thread-safe하지 않다면 SynchronizedItemStreamReader로 감싸거나 직접 동기화를 제어하는 객체를 만들고 내부에서 해당 Reader를 사용하면 됩니다.Spring Batch의 가장 큰 장점은 실패 지점부터의 재시도
입니다. 해당 장점을 누리기 위해 saveState가 true인 상태로 동작하게 됩니다. 하지만, Multi-Threaded Step을 사용하면 해당 장점을 누릴 수 없
습니다.
10개의 스레드가 1000개의 데이터를 100개의 Chunk로 나누어 병렬 동작한다고 가정해보겠습니다. 5번째 스레드가 실패했을 경우에 1~4번의 스레드는 성공했다는 보장이 없습니다. 1~10 순차적이 아닌, 병렬로 동작하기 때문입니다. 따라서, saveState를 false로 지정하여 사용합니다.
실패의 확률이 큰 Job에서는 사용하지 않는 것이 좋을 것이라고 생각합니다. 만약, 1000개의 데이터를 처리하는데 999번째를 처리하다가 실패했을 경우, 처음부터 다시 Job을 실행해야하는 상황이 발생할 수 있기 때문입니다.
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}
Multi-Threaded Step이 하나의 step을 병렬로 처리했다면, Parallel Step은 여러 Step을 병렬로 처리합니다. 더 정확하게 이야기하자면 여러 Step을 가질 수 있는 Flow를 병렬로 처리
합니다.
예제에서 flow1, flow2는 병렬적으로 수행됩니다. 그리고 job의 마지막에 step4이 실행되는데 해당 step은 flow1과 flow2가 완료되어야 수행됩니다.
병렬적으로 수행되어도 되는 Step이 있다면 Parallel Steps를 통해 성능을 끌어오는 것도 고려할 수 있습니다.
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor); // 스레드풀 지정
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
기본적으로, AsyncItemProcessor와 AsyncItemWriter는 함께 사용
되어야 합니다.
AsyncItemProcessor & AsyncItemWriter을 사용하기 위해서는 spring-batch-integration 의존성을 주입 해야합니다.
AsyncItemProcessor는 ItemProcessor를 래핑하는 데코레이터입니다. 실질적인 Process 작업은 새로운 스레드가 ItemProcessor를 실행
하고 AsyncItemProcessor 디스패처의 책임
을 지닙니다. 그리고 AsyncItemProcessor는 Future 타입을 반환합니다.
AsyncItemWriter도 ItemWriter의 데코레이터입니다. AsyncItemProcessor에서 전달된 Future 결과 값을 ItemProcessor에게 위임
합니다.
Multi-Threaded Step와 달리 ItemProcessor만 병렬로 동작합니다. 따라서, ItemReader가 병렬이 아니기 때문에, Job이 실패하고 재실행 했을 때, 실패 지점부터 재시작 가능
합니다.