컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 덩어리로 분할한 스트림이다. 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 덩어리를 처리하도록 할당할 수 있다.
parallel() 메서드를 호출하면 기존 함수형 리듀싱 연산이 병렬로 처리된다.
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() // 병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않는다. 내부적으로 이후 연산이 병렬로 수행되야 함을 의미하는 Boolean 플래그가 설정된다.(병렬 스트림은 내부적으로 ForkJoinPool을 사용한다.)
sequential() 메서드는 반대로 병렬 스트림을 순차 스트림으로 바꿔준다. 만약 스트림연산에 parallel()과 sequential()이 모두 있다면 최종적으로 호출된 연산에 따라 병렬인지 순차인지 결정된다.
병렬화를 한다고해서 순차나 반복 형식에 비해 무조건 성능이 좋다고 볼 수 없다.
// 기본 반복
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
// 순차
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum);
}
// 병렬
public long parallelSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).parallel().reduce(0L, Long::sum);
}
위 코드들의 성능을 비교해보면 기본 반복 > 순차 스트림 > 병렬 스트림
순으로 성능이 좋다. 그 이유는 아래와 같다.
위의 예시 같은 경우 리듀싱 시작 시점에 전체 숫자 리스트가 준비되지 않았기 때문에 스트림을 분할할 수 없다. 스트림을 분할할 수 없기 때문에 순차 스트림과 같은 방식으로 처리되고 스레드 할당 오버헤드만 증가한다.
따라서 병렬 프로그래밍을 오용한다면 성능이 오히려 더 나빠진다.
병렬 스트림을 사용하여 성능이 올라가는 케이스를 보자.
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum);
}
이 경우 LongStream이 기본형을 사용하므로 박싱-언박싱이 일어나지 않고, rangeClosed는 범위가 주어지므로 각 범위를 덩어리로 나눌 수 있다.
병렬화를 이용하려면 스트림을 재귀적으로 분할해야하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 그 결과를 하나의 값으로 합쳐야한다.
멀티코어 간 데이터이동은 비싸므로 코어 간 데이터 전송 시간보다 오래 걸리는 작업만 병렬로 다른 코어에서 수행하도록 한다.
상태 공유에 따른 부작용 피하기(공유된 가변 상태 피하기)
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계됨.
포크 조인 프레임워크에서는 ExecutorService 인터페이스(서브태스크를 스레드 풀의 작업자 스레드에 분산 할당)가 구현되어 있다.
ForkJoinPool이 ExecutorService 구현체인 AbstractExecutorService를 상속받고있고, RecursiveTask가 ForkJoinTask를 상속받고 있다.
분할 후 정복 알고리즘의 병렬화버전. 재귀적으로 각 서브태스크 크기가 충분히 작아질 때까지 포크(분할)하고 모든 서브태스크를 병렬로 수행한 후, 부분 결과를 조합한다.
일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다. 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다.
ForkJoinPool의 모든 스레드를 거의 공정하게 분할. 각 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업 처리.
한 스레드가 할 일이 떨어진 경우, 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.(모든 태스크가 끝날때까지)
자동으로 스트림을 분할하는 기법. 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화할 것인지 정의. Iterator와 같이 소스의 요소 탐색 기능을 제공한다. 하지만 Spliterator는 병렬 작업에 특화되어있다.
public interface Spliterator<T> { // T: 탐색하는 요소 형식
boolean tryAdvance(Consumer<? super T> action); // 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환.(일반적인 Iterator와 같음)
Spliterator<T> trySplit(); // 일부 요소를 분할해서 두 번째 Spliterator를 만든다. null을 반환할때까지 반복
long estimateSize(); // 탐색해야 할 요소 수 정보 제공
int characteristics(); // Spliterator의 특성 정의
}
characteristics()을 사용해서 정의. Spliterator 자체의 특성 집합을 포함하는 int 반환.
x.equals(y)
는 항상 false 반환