Sprint icons created by Freepik - Flaticon
Stream의 parallel() API를 사용하면 쉽게 병렬 작업을 구현할 수 있습니다. 병렬 스트림은 내부적으로 common ForkJoinPool을 사용하고 디폴트로 CPU 코어 개수만큼의 쓰레드를 생성합니다. CompletableFuture을 사용해서 병렬 작업을 직접 실행시키는 것과 비교해 보면 Join에 신경쓰지 않아도 되기 때문에 코드가 더 단순해 집니다.
예를 들어 다음과 같이 약 1초의 실행시간을 가지는 CPU 코어 개수 만큼의 태스크를 병렬 스트림으로 실행하면 중간연산(mapToObj)이 병렬로 약 1초 안에 동시에 실행되고 Join된 이후 최종 연상(reduce)이 완료되는 것을 확인할 수 있습니다.
@Test
@DisplayName("병렬스트림은 병렬로 중간연산을 수행하고 Join 이후 최종연산을 수행합니다.")
void runParallelAndJoinIntermediateOperation() {
// Given: 시스템의 CPU 코어 개수만큼의 작업 준비
int nTasks = Runtime.getRuntime().availableProcessors();
log.info("The number of CPU Core : {}", nTasks);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// When: CPU 코어 개수 만큼의 병렬 작업(1초 실행 시간 모의)을 수행
Integer result = IntStream.rangeClosed(1, nTasks)
.parallel()
.mapToObj(i -> {
log.info("index : {}", i);
SleepUtils.sleep(1000);
return i;
})
.reduce(0, Integer::sum);
stopWatch.stop();
log.info("Total time : {} ms", stopWatch.getTotalTimeMillis());
// Then: 1초 안에 병렬로 모든 작업이 완료됩니다.
assertTrue(stopWatch.getTotalTimeMillis() > 1000);
assertTrue(stopWatch.getTotalTimeMillis() < 1100);
assertEquals(sumOfSequence(nTasks), result);
}
위 테스트 코드의 실행 결과는 다음과 같습니다. ForJoinPool commonPool에서 쓰레드 7개가 할당되어 각 작업이 병렬로 실행되고 1개의 작업은 main 쓰레드에 의해 처리되는 것을 볼 수 있습니다. 전체 실행 시간은 1.065초 입니다.
11:52:33.596 [main] INFO tech.java.stream.ParallelTest - The number of CPU Core : 8
11:52:33.618 [main] INFO tech.java.stream.ParallelTest - index : 6
11:52:33.619 [ForkJoinPool.commonPool-worker-1] INFO tech.java.stream.ParallelTest - index : 3
11:52:33.633 [ForkJoinPool.commonPool-worker-2] INFO tech.java.stream.ParallelTest - index : 8
11:52:33.634 [ForkJoinPool.commonPool-worker-5] INFO tech.java.stream.ParallelTest - index : 5
11:52:33.634 [ForkJoinPool.commonPool-worker-4] INFO tech.java.stream.ParallelTest - index : 2
11:52:33.635 [ForkJoinPool.commonPool-worker-3] INFO tech.java.stream.ParallelTest - index : 4
11:52:33.640 [ForkJoinPool.commonPool-worker-6] INFO tech.java.stream.ParallelTest - index : 1
11:52:33.640 [ForkJoinPool.commonPool-worker-7] INFO tech.java.stream.ParallelTest - index : 7
11:52:34.675 [main] INFO tech.java.stream.ParallelTest - Total time : 1065 ms
반면에 일반적인 스트림으로 위 코드를 바꾸어 실행해 보면 중간연산(mapToObj)이 순차적으로 실행되어 전체 작업에 각 작업에 소요되는 시간의 합 만큼(약 8초)의 시간이 소요되는 것을 확인할 수 있습니다.
@Test
@DisplayName("일반스트림은 차례대로 중간연산을 수행한 후 최종연산을 수행합니다.")
void runSequentialIntermediateOperation() {
// Given: 시스템의 CPU 코어 개수만큼의 작업 준비
int nTasks = Runtime.getRuntime().availableProcessors();
log.info("The number of CPU Core : {}", nTasks);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// When: CPU 코어 개수 만큼의 작업(1초 실행 시간 모의)을 순차적으로 수행
Integer result = IntStream.rangeClosed(1, nTasks)
.mapToObj(i -> {
log.info("index : {}", i);
SleepUtils.sleep(1000);
return i;
})
.reduce(0, Integer::sum);
stopWatch.stop();
log.info("Total time : {} ms", stopWatch.getTotalTimeMillis());
// Then: 각 작업 실행 시간의 합 만큼의 시간이 소요됨
assertTrue(stopWatch.getTotalTimeMillis() > nTasks * 1000);
assertTrue(stopWatch.getTotalTimeMillis() < nTasks * 1000 + 100);
assertEquals(sumOfSequence(nTasks), result);
}
출력 결과는 다음과 같습니다.
11:51:13.081 [main] INFO tech.java.stream.ParallelTest - The number of CPU Core : 8
11:51:13.098 [main] INFO tech.java.stream.ParallelTest - index : 1
11:51:14.115 [main] INFO tech.java.stream.ParallelTest - index : 2
11:51:15.120 [main] INFO tech.java.stream.ParallelTest - index : 3
11:51:16.134 [main] INFO tech.java.stream.ParallelTest - index : 4
11:51:17.135 [main] INFO tech.java.stream.ParallelTest - index : 5
11:51:18.145 [main] INFO tech.java.stream.ParallelTest - index : 6
11:51:19.146 [main] INFO tech.java.stream.ParallelTest - index : 7
11:51:20.149 [main] INFO tech.java.stream.ParallelTest - index : 8
11:51:21.156 [main] INFO tech.java.stream.ParallelTest - Total time : 8060 ms