1-1.
List<Foo> foos = Flux.fromIterable(barList) // 리스트의 요소를 하나씩 발행하는 Flux를 생성
.parallel() // 이를 병렬 처리를 위한 ParallelFlux로 변환
.runOn(Schedulers.parallel()) // parallel 연산자 이후의 연산을 실행할 스케줄러를 지정
.flatMap(b -> method(b)) // 요소마다 method를 실행한 결과를 Flux로 만들음
.sequential() // 다시 순차적으로 처리하기 위한 Flux로 변환
.collectList() // 결과값을 리스트로 수집
.block(); // 결과를 동기적으로 받음
1-2. 타임아웃 적용
CountDownLatch countDownLatch = new CountDownLatch(1);
List<Foo> result = new ArrayList<>();
List<Foo> foos = Flux.fromIterable(barList)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(b -> method(b))
.sequential()
.doFinally((signal) -> {
countDownLatch.countDown();
})
.subscribe(results::add);
try {
countDownLatch.await(20000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
log.error("ERROR!!!");
}
배치 로직 안에서 위와 같이 병렬 처리를 할 수도 있고, 스프링 배치 설정에 taskExecutor로 SimpleAsyncTaskExecutor를 지정해서 병렬 처리를 시도할 수 있다.
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}