[Java 8] 14. Parallel Streams

seony·2023년 5월 7일

java8

목록 보기
14/16

🌱 Parallel Stream 이란?

  • 데이터를 여러 파트로 나누어 병렬적으로 처리한 후 합쳐 결과값을 만들어 낸다.

🌱 어떻게 Parallel Stream을 만들까?

  • Java 8에서는 스트림(Stream)을 병렬적으로 처리하는 두 가지 방법을 제공합니다.
  • 바로 stream().parallel() 메서드를 호출하거나 parallelStream() 메서드를 호출하는 방법입니다.

❓stream().parallel와 parallelStream()의 차이❓

stream().parallel() 메서드는 일반적인 Stream에서 parallel Stream으로 변환하는 메서드입니다. 이 메서드를 호출하면 기존의 Stream 객체를 Parallel Stream으로 변환하여 반환합니다.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> parallelStream = numbers.stream().parallel();

parallelStream() 메서드는 컬렉션(Collection) 인터페이스에 추가된 메서드로, 해당 컬렉션의 요소를 직접적으로 Parallel Stream으로 변환하는 메서드입니다.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> parallelStream = numbers.parallelStream();

결과

  • 두 가지 방법 모두 동일한 결과를 반환하지만, parallelStream() 메서드를 사용하는 것이 더 간단하고 직관적입니다.
  • 또한, parallelStream() 메서드는 Collection 인터페이스의 메서드로 제공되므로, 해당 컬렉션에서 바로 Parallel Stream으로 변환할 수 있습니다.

🌱 Parallel Stream은 어떻게 동작할까?

Java7의 Fork/Join 프레임워크

  • Java Parallel Stream은 기본적으로 Fork/Join 프레임워크를 사용하여 병렬 처리를 구현합니다.
  • Fork/Join 프레임워크는
      1. 하나의 작업을 작은 단위로 분할한 다음,
      1. 분할된 작업을 병렬적으로 실행하는 방식으로 동작합니다.

❓몇 개의 스레드가 생성될까❓

  • Parallel Stream은 Fork/Join 프레임워크에서 사용하는 기본적인 스레드 풀을 활용합니다.

  • 이 스레드 풀은 기본적으로 현재 실행 중인 CPU의 코어 수에 따라 동적으로 스레드를 생성하고, 작업을 분할하여 실행합니다.

    • ex) 4 코어를 가진 CPU에서 Parallel Stream을 실행하면, 최대 4개의 스레드가 생성
  • 스레드 개수는 시스템의 CPU 개수, 사용 가능한 메모리 양, 처리할 작업의 종류와 크기 등에 따라 달라질 수 있습니다. 또한, 스레드 풀의 크기는 System.setProperty() 메서드를 사용하여 변경할 수도 있습니다.

결과

  • Parallel Stream에서 생성되는 스레드의 개수는 실행 환경에 따라 동적으로 결정되며, 고정된 값이 아닙니다. 이를 통해 효율적인 병렬 처리를 구현할 수 있습니다.

🌱 Parallel Stream이 항상 빠르고 좋을까?

Parallel Stream은 멀티코어 CPU를 활용하여 요소를 병렬 처리하기 때문에, 일반적으로 처리 속도가 빠릅니다. 하지만, 항상 그렇지는 않습니다.

Parallel Stream이 성능 향상을 보장하기 위해서는 다음과 같은 조건을 만족해야 합니다.

  • 데이터 크기가 충분히 크다.
  • 데이터 처리 비용이 높다.
  • 병렬 실행에 따른 부가 비용이 적다.

만약 처리할 데이터의 크기가 작거나, 처리 비용이 낮다면, Parallel Stream이 오히려 성능 저하를 가져올 수 있습니다. 또한, 병렬 실행에 따른 부가 비용이 큰 경우(예: 스레드 간 동기화 비용 등)에도 성능 저하가 발생할 수 있습니다.

1. Parallel이 속도가 더 느린 예시

import java.util.function.Supplier;
import java.util.stream.IntStream;

public class ParallelStreamExample {

    public static long checkPerformanceResult(Supplier<Integer> supplier,int numberOfTimes) {

        long startTime = System.currentTimeMillis();
        for (int i = 0; i < numberOfTimes; i++) {
            supplier.get();
        }
        long endTime = System.currentTimeMillis();
        return endTime - startTime;
    }

    // Supplier
    public static int  sumSequentialStream() {
        return IntStream.rangeClosed(1, 100000)
                .sum();
    }

    // Supplier
    public static int sumParallelStream() {
        return IntStream.rangeClosed(1, 100000)
                .parallel() // split the data in to multiple parts
                .sum();
    }

    public static void main(String[] args) {

        System.out.println(Runtime.getRuntime().availableProcessors());

        System.out.println("Sequential Stream Result : " +
                checkPerformanceResult(ParallelStreamExample::sumSequentialStream, 20));
        System.out.println("Parallel Stream Result : " +
                checkPerformanceResult(ParallelStreamExample::sumParallelStream, 20));
    }
}

결과

10
Sequential Stream Result : 6
Parallel Stream Result : 9
  • 오히려 Sequential Stream이 빠른 것을 확인할 수 있습니다.

2. parallel 사용시 값이 제대로 나오지 않는 예시

package com.learn.java.parallelstream;

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

public class SumClient {

    public static void main(String[] args) {
        Sum sum = new Sum();

        IntStream.rangeClosed(1, 1000)
                .parallel() // 값이 다르게 나옴
                .forEach(sum::performSum); // 500500

        System.out.println(sum.getTotal());

        List<Integer> list = Arrays.asList(1, 2, 3, 4);

    }
}

결과

474867
  • 원래 값은 500500이 나와야합니다.
  • 하지만 parallel()을 사용하면 Sum 클래스의 공유 변수 total에 동시에 접근하기 때문에 매번 값이 다르게 나옵니다.

0개의 댓글