Java Stream 병렬 처리 - 1 (순차 스트림을 병렬 스트림으로)

MOOZZANG·2021년 4월 10일
1

모던 자바 인 액션

목록 보기
1/1
post-thumbnail

자바8에서 등장한 스트림 인터페이스를 통해 더이상 Data Collection을 subList 등 Sub Collection으로 나눠서 병렬 처리하지 않아도 됩니다.

아래는 Dishes 리스트를 3개의 subList로 쪼개서 칼로리 X2 병렬처리 하는 소스 입니다.

	// 우선 데이터를 각각 3개의 서브 파트로 분배합니다.
	List<Dish> subDishes1 = menu.subList(0, 3);
        List<Dish> subDishes2 = menu.subList(3, 6);
        List<Dish> subDishes3 = menu.subList(6, 9);
        // 분할된 데이터 서브파트는 각각의 쓰레드로 할당되어 병렬처리 됩니다.
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        threadPool.submit(() -> {
            for (Dish dish : subDishes1) {
                dish.setCalories(dish.getCalories() * 2);
            }
        });
        threadPool.submit(() -> {
            for (Dish dish : subDishes2) {
                dish.setCalories(dish.getCalories() * 2);
            }
        });
        threadPool.submit(() -> {
            for (Dish dish : subDishes3) {
                dish.setCalories(dish.getCalories() * 2);
            }
        });
        ## 어떤 경우에는 데이터 합치기 작업도 필요합니다!!

중복되는 코드로 인해 매우 아쉽네요;;
또한 위의 예제에서 subList는 기존 List에 대한 View를 제공하기 때문에 데이터를 합칠 필요는 없지만,
때에 따라서는 마지막에 Data 합치기 작업도 필요합니다.

이런 불편한 작업을 Collection의 parallelStream 메서드를 통해 병렬 스트림(parallel stream)을 생성 및 사용해서 해결해보겠습니다!

병렬 스트림을 이용하면 모든 멀티코어 프로세스가 각각의 청크(chunk)를 처리하도록 할당할 수 있습니다.
(Java의 jvm은 os의 병렬처리 시스템을 사용합니다.)

전통적인 자바에서의 1부터 n까지의 합 구하기

    // 1부터 n까지 더하는 반복문 기반 메서드
    public long iterativeSum(long n) {
        long result = 0;
        for (long i = 1L; i <= n; i++) {
            result += i;
        }
        return result;
    }

n이 커진다면 이 연산을 병렬로 처리하는 것이 좋습니다.
다만 결과 변수는 어떻게 동기화할지, 몇 개의 스레드를 사용해야 할지 등에 대한 설정을 직접 구현해야 합니다.

간편한 함수형 리듀싱 연산(stream)에 병렬 처리를 더해주기

    public long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림 생성
                .limit(n) // 스트림 연산을 n개 이하로 제한
                .parallel() // 순차 스트림을 병렬 스트림으로 변황
                .reduce(0L, Long::sum); // 모든 숫자를 sum 하는 리듀싱 연산
    }

위의 병렬 스트림 reducing 로직은 청크 단위로 병렬 처리 됩니다.

순차 스트림에 parallel을 호출해도 스트림 자체에는 변화가 없습니다.
내부적으로 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 Boolean Flag가 설정됩니다.
반대로 Sequential로 병렬 스트림을 순차 스트림으로 바꿀 수도 있어요.

여기선 parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미칩니다.

	menu.stream()
                .parallel()
                .filter(...)
                .sequential()
                .map(...)
                .parallel()
                .reduce();

마지막 호출이 parallel이므로 파이프라인은 전체적으로 병렬로 실행되겠군요.

#여기서 병렬 작업을 위한 쓰레드는 어디서 생성되는 것일까요??

병렬 스트림은 내부적으로 ForkJoinPool 기반이며, Runtime.getRuntime().availableProcessors() (가용 프로세서 갯수)에서 리턴된 값으로 쓰레드풀을 구성합니다.

자세한 내용은 다음 챕터에서!!

profile
느리지만 꾸준히 성장하는 개발자

0개의 댓글