병렬 스트림(Parallel Stream)을 활용한 손쉬운 성능 향상

주싱·2023년 4월 26일
0

더 나은 도구

목록 보기
9/13
post-custom-banner

Sprint icons created by Freepik - Flaticon

Stream의 parallel() API를 사용하면 쉽게 병렬 작업을 구현할 수 있습니다. 병렬 스트림은 내부적으로 common ForkJoinPool을 사용하고 디폴트로 CPU 코어 개수만큼의 쓰레드를 생성합니다. CompletableFuture을 사용해서 병렬 작업을 직접 실행시키는 것과 비교해 보면 Join에 신경쓰지 않아도 되기 때문에 코드가 더 단순해 집니다.

병렬 스트림 예제

예를 들어 다음과 같이 약 1초의 실행시간을 가지는 CPU 코어 개수 만큼의 태스크를 병렬 스트림으로 실행하면 중간연산(mapToObj)이 병렬로 약 1초 안에 동시에 실행되고 Join된 이후 최종 연상(reduce)이 완료되는 것을 확인할 수 있습니다.

@Test
@DisplayName("병렬스트림은 병렬로 중간연산을 수행하고 Join 이후 최종연산을 수행합니다.")
void runParallelAndJoinIntermediateOperation() {
    // Given: 시스템의 CPU 코어 개수만큼의 작업 준비
    int nTasks = Runtime.getRuntime().availableProcessors();
    log.info("The number of CPU Core : {}", nTasks);

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();

    // When: CPU 코어 개수 만큼의 병렬 작업(1초 실행 시간 모의)을 수행
    Integer result = IntStream.rangeClosed(1, nTasks)
            .parallel()
            .mapToObj(i -> {
                log.info("index : {}", i);
                SleepUtils.sleep(1000);
                return i;
            })
            .reduce(0, Integer::sum);
    stopWatch.stop();
    log.info("Total time : {} ms", stopWatch.getTotalTimeMillis());

    // Then: 1초 안에 병렬로 모든 작업이 완료됩니다.
    assertTrue(stopWatch.getTotalTimeMillis() > 1000);
    assertTrue(stopWatch.getTotalTimeMillis() < 1100);
    assertEquals(sumOfSequence(nTasks), result);
}

위 테스트 코드의 실행 결과는 다음과 같습니다. ForJoinPool commonPool에서 쓰레드 7개가 할당되어 각 작업이 병렬로 실행되고 1개의 작업은 main 쓰레드에 의해 처리되는 것을 볼 수 있습니다. 전체 실행 시간은 1.065초 입니다.

11:52:33.596 [main] INFO tech.java.stream.ParallelTest - The number of CPU Core : 8
11:52:33.618 [main] INFO tech.java.stream.ParallelTest - index : 6
11:52:33.619 [ForkJoinPool.commonPool-worker-1] INFO tech.java.stream.ParallelTest - index : 3
11:52:33.633 [ForkJoinPool.commonPool-worker-2] INFO tech.java.stream.ParallelTest - index : 8
11:52:33.634 [ForkJoinPool.commonPool-worker-5] INFO tech.java.stream.ParallelTest - index : 5
11:52:33.634 [ForkJoinPool.commonPool-worker-4] INFO tech.java.stream.ParallelTest - index : 2
11:52:33.635 [ForkJoinPool.commonPool-worker-3] INFO tech.java.stream.ParallelTest - index : 4
11:52:33.640 [ForkJoinPool.commonPool-worker-6] INFO tech.java.stream.ParallelTest - index : 1
11:52:33.640 [ForkJoinPool.commonPool-worker-7] INFO tech.java.stream.ParallelTest - index : 7
11:52:34.675 [main] INFO tech.java.stream.ParallelTest - Total time : 1065 ms

일반 스트림 예제

반면에 일반적인 스트림으로 위 코드를 바꾸어 실행해 보면 중간연산(mapToObj)이 순차적으로 실행되어 전체 작업에 각 작업에 소요되는 시간의 합 만큼(약 8초)의 시간이 소요되는 것을 확인할 수 있습니다.

@Test
@DisplayName("일반스트림은 차례대로 중간연산을 수행한 후 최종연산을 수행합니다.")
void runSequentialIntermediateOperation() {
    // Given: 시스템의 CPU 코어 개수만큼의 작업 준비
    int nTasks = Runtime.getRuntime().availableProcessors();
    log.info("The number of CPU Core : {}", nTasks);

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();

    // When: CPU 코어 개수 만큼의 작업(1초 실행 시간 모의)을 순차적으로 수행
    Integer result = IntStream.rangeClosed(1, nTasks)
            .mapToObj(i -> {
                log.info("index : {}", i);
                SleepUtils.sleep(1000);
                return i;
            })
            .reduce(0, Integer::sum);
    stopWatch.stop();
    log.info("Total time : {} ms", stopWatch.getTotalTimeMillis());

    // Then: 각 작업 실행 시간의 합 만큼의 시간이 소요됨
    assertTrue(stopWatch.getTotalTimeMillis() > nTasks * 1000);
    assertTrue(stopWatch.getTotalTimeMillis() < nTasks * 1000 + 100);
    assertEquals(sumOfSequence(nTasks), result);
}

출력 결과는 다음과 같습니다.

11:51:13.081 [main] INFO tech.java.stream.ParallelTest - The number of CPU Core : 8
11:51:13.098 [main] INFO tech.java.stream.ParallelTest - index : 1
11:51:14.115 [main] INFO tech.java.stream.ParallelTest - index : 2
11:51:15.120 [main] INFO tech.java.stream.ParallelTest - index : 3
11:51:16.134 [main] INFO tech.java.stream.ParallelTest - index : 4
11:51:17.135 [main] INFO tech.java.stream.ParallelTest - index : 5
11:51:18.145 [main] INFO tech.java.stream.ParallelTest - index : 6
11:51:19.146 [main] INFO tech.java.stream.ParallelTest - index : 7
11:51:20.149 [main] INFO tech.java.stream.ParallelTest - index : 8
11:51:21.156 [main] INFO tech.java.stream.ParallelTest - Total time : 8060 ms
profile
소프트웨어 엔지니어, 일상
post-custom-banner

0개의 댓글