스트림 병렬화는 주의해서 적용하라
Java8부터 parallel 메서드만 한 번 호출하면 파이프라인을 병렬 실행할 수 있는 스트림을 지원했다. 동시성 프로그래밍을 할 때는 안전성과 응답 가능 상태를 유지하기 위해 애써야 하는데, 병렬 스트림 파이프라인 프로그래밍에서도 다를 바 없다.
public static void main(String[] args) {
primes().map(p -> TWO.pos(p.intValueExact()).subtract(ONE))
.fileter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.forEach(System.out::println);
}
static Stream<BigInteger> primes() {
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
primes를 뽑아 메르센 소수를 생성하는 프로그램이다. 이 예시에서 속도를 높이고 싶어 parallel()
를 호출한다면, 응답 불가(liveness failure)상태가 된다. 이유는 스트림 라이브러리가 이 파이프라인을 병렬화하는 방법을 찾아내지 못했기 때문이다.
데이터소스가 Stream.iterate
거나 중간 연산으로 limit
를 쓴다면, 파이프라인 병렬화로는 성능 개선을 기대할 수 없다.
스트림의 소스가 ArrayList, HashMap, HashSet, ConcurrentHashMap의 인스턴스거나 배열, int 범위, long 범위일 때 병렬화의 효과가 가장 좋다.
위 자료구조는 아래의 특징을 갖는다.
일을 다수의 스레드에 분배하기 좋다는 특징이 있다. 나누는 작업은 Spliterator가 담당하고, 이 객체는 Stream이나 Iterable의 spliterator의 메서드로 얻어올 수 있다.
이웃한 원소의 참조들이 메모리에 연속해서 저장되어 있다는 뜻이다. 참조 지역성이 낮으면, 스레드는 데이터가 메인 메모리에서 캐시 메모리로 전송되어 오기를 기다리며 대부분 시간을 멍하니 보내게 된다.
참고로, 참조 지역성이 좋은 경우는 배열이다. 배열은 실제 데이터가 연속해서 저장되기 때문이다.
종단 연산에서 수행하는 작업량이 파이프라인 전체 작업에서 상당 비중을 차지하면서 순차적인 연산이라면 파이프라인 병렬 수행의 효과는 제한될 수밖에 없다.
종단 연산 중 병렬화에 가장 적합한 것은 축소다. 축소는 파이프라인에서 만들어진 모든 원소를 하나로 합치는 작업으로, Stream의 reduce 메서드 중 하나, 혹은 min, max, count, sum과 같이 완성된 형태로 제공되는 메서드 중 하나를 선택해 수행한다.
anyMatch, allMatch, noneMatch처럼 조건에 맞으면 바로 반환되는 메서드도 병렬화에 적합하다.
안전 실패는 병렬화한 파이프라인이 사용하는 mappers, filters, 혹은 다른 함수 객체가 명세대로 동작하지 않을 때 벌어질 수 있다.
앞서 병렬화한 메르센 소수 프로그램은 완료되더라도 출력된 소수의 순서가 올바르지 않을 수 있다. 출력 순서를 순차 버전으로 정렬하고 싶다면, 종단 연산 forEach
를 forEachOrdered
로 바꿔주면 된다.
스트림 안의 원소 수와 원소당 수행되는 코드 줄 수를 곱했을 때, 최소 수십만은 되어야 성능 향상을 맛볼 수 있다.
보통은 병렬 스트림 파이프라인도 공통의 포크-조인풀 (= 같은 스레드풀)에서 수행되므로, 잘못된 파이프라인 하나가 시스템의 다른 부분의 성능에까지 악영향을 줄 수 있다.
스트림 파이프라인을 병렬화할 일이 적어졌다고 느껴진다면, 그건 진짜 그렇기 때문이다. 스트림 병렬화가 효과를 보는 경우는 많지 않다.
하지만 조건이 잘 갖춰지면 parallel 메서드 호출 하나로 거의 프로세서 코어수에 비례하는 성능향상을 만끽할 수 있다.
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.mapToObj(BigInterger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
//병렬화 적용
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.parallel() //적용
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
위의 예시는 10^8을 계산하는 데 효과를 제대로 발휘하는 예시다.