Spring Batch에서는 일반적으로 단일 스레드로 배치 작업을 수행합니다. 즉, 순차적으로 작업을 실행해주게 됩니다. 하지만 여러 작업들을 병렬적으로 처리하여 빠르게 처리하기 위해 Spring Batch에서는 여러 Multi Thread 모델을 제공해주고 있습니다.
AsyncItemProcessor / AsycnItemWriter
Multi Thread Step
Parallel Step
Partition
Remote Chunking
List<Future<?>>
으로 반환그리고 의존성 추가가 필요합니다.
implementation 'org.springframework.batch:spring-batch-integration'
@Configuration
@RequiredArgsConstructor
public class HelloJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final EntityManagerFactory entityManagerFactory;
private int chunkSize = 10;
@Bean
public Job helloJob() {
return jobBuilderFactory.get("job")
.start(step1())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step")
.<Customer, Future<Customer2>>chunk(chunkSize) // Future 타입
.reader(customItemReader())
.processor(customAsyncItemProcessor())
.writer(customAsyncItemWriter())
.build();
}
@Bean
public ItemReader<? extends Customer> customItemReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.name("customItemReader")
.pageSize(chunkSize)
.entityManagerFactory(entityManagerFactory)
.queryString("select c from Customer c order by c.id")
.build();
}
@Bean
public AsyncItemProcessor<Customer, Customer2> customAsyncItemProcessor() {
AsyncItemProcessor<Customer, Customer2> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(customItemProcessor()); // customItemProcessor 로 작업 위임
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor()); // taskExecutor 세팅
return asyncItemProcessor;
}
@Bean
public ItemProcessor<Customer, Customer2> customItemProcessor() {
return new ItemProcessor<Customer, Customer2>() {
@Override
public Customer2 process(Customer item) throws Exception {
return new Customer2(item.getName().toUpperCase(), item.getAge());
}
};
}
@Bean
public AsyncItemWriter<Customer2> customAsyncItemWriter() {
AsyncItemWriter<Customer2> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(customItemWriter()); // customItemWriter로 작업 위임
return asyncItemWriter;
}
@Bean
public ItemWriter<Customer2> customItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer2>()
.dataSource(dataSource)
.sql("insert into customer2 values (:id, :age, :name)")
.beanMapped()
.build();
}
}
별도의 작업없이 기존의 ItemProcessor와 ItemWriter에 AsyncItemProcessor와 AsyncItemWriter를 적용하여 각각 setDelegate를 통해 작업을 위임해주면 됩니다.
saveState(false)
의 설정을 추가해주어야 합니다. @Bean(name = JOB_NAME+"taskPool")
public TaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // (1)
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(poolSize);
executor.setThreadNamePrefix("multi-thread-");
executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE); // (2)
executor.initialize();
return executor;
}
@Bean(name = JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.start(step())
.preventRestart() //실패한 Job인 경우 재실행 가능 막기
.build();
}
@Bean(name = JOB_NAME +"_step")
@JobScope
public Step step() {
return stepBuilderFactory.get(JOB_NAME +"_step")
.<Product, ProductBackup>chunk(chunkSize)
.reader(reader(null))
.processor(processor())
.writer(writer())
.taskExecutor(executor()) // (3)
.throttleLimit(poolSize) // (4)
.build();
}
@Bean(name = JOB_NAME +"_reader")
@StepScope
public JpaPagingItemReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {
Map<String, Object> params = new HashMap<>();
params.put("createDate", LocalDate.parse(createDate, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
return new JpaPagingItemReaderBuilder<Product>()
.name(JOB_NAME +"_reader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Product p WHERE p.createDate =:createDate")
.parameterValues(params)
.saveState(false) // (5)
.build();
}
private ItemProcessor<Product, ProductBackup> processor() {
return ProductBackup::new;
}
@Bean(name = JOB_NAME +"_writer")
@StepScope
public JpaItemWriter<ProductBackup> writer() {
return new JpaItemWriterBuilder<ProductBackup>()
.entityManagerFactory(entityManagerFactory)
.build();
}
만약 chunksize=1, poolsize = 2이고 읽어와야할 데이터가 10개라면, 작업을 수행하는데 2개의 스레드가 사용되고 각 스레드는 1개의 데이터씩 읽어오기 때문에 총 5번의 (Read,Proccess,Write) 작업이 수행됩니다.
PagingItemReader의 구현체는 모두 Thread-Safe하여 멀티 스레드 작업을 안정적으로 수행해줄수 있습니다. 하지만 Thread-Safe하지 않는 ItemReader를 사용한다면, 중복된 데이터를 읽어오고 원치않은 결과가 발생할 수 있습니다.
(CursorItemReader 등..)
이렇게 Thread-Safe를 지원해주지 않는 ItemReader에서는 SynchronizedItemStreamReader를 통해 ItemReader를 한번 감싸주기만 한다면 Read작업이 synchronized메소드에 감싸져 호출되기 때문에 동기화된 읽기가 가능하게 됩니다.
@Bean
public Job job(){
return jobBuilderFactory.get("job")
.start(splitFlow()) //(1)
.next(step4())
.build() //(4)
.build(); //(5)
}
@Bean
public Flow splitFlow(){
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor()) //(2)
.add(flow1(), flow2()) //(3)
.build();
}
@Bean
public Flow flow1(){
return
}
@Bean
public Flow flow2(){
}
@Bean Step step4(){
}
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}