이번 주제 키워드
- 병렬 스트림으로 데이터 병렬 처리하기
- 병렬 스트림의 성능 분석
- 포크/조인 프레임워크
- Spliterator로 스트림 데이터 쪼개기
병렬 스트림
- 스트림을 이용하면 순차 스트림을 병렬 스트림으로 자연스럽게 바꿀 수 있다.
- 컬렉션에
parallelStream
을 호출하면 병렬 스트림
이 생성된다.
- 병렬 스트림이란,
각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
이다.
- 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
- 1부터 n부터까지의 합을 구하는 코드 : 일반 스트림
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
- 1부터 n부터까지의 합을 구하는 코드 : 전통적인 자바 코드
public static long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i<= n; i++) {
result += i;
}
return result;
}
n이 엄청나게 커진다면 병렬로 처리하는 것이 좋을 것이다. 어떻게 할까??
1. 순차 스트림을 병렬 스트림으로 변환하기
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
- 순차스트림에 parallel 메서드를 호출하면 리듀싱 연산이 병렬로 처리된다.
- 하지만 스트림 자체에는 아무 변화도 없다.
- 내부적으로 불리언 플래그가 설정될 뿐
- 이전 코드와 다른 점은 스트림이 여러 청크로 분할되어 있다는 것이다.
- 따라서 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있다.
- 마지막 리듀싱 연산을 통해 생성된 부분 결과를
다시 리듀싱 연산
으로 합쳐 전체 스트림의 리듀싱 결과를 도출한다.
- parallel과 sequential 메서드를 통해 어떤 연산을 병렬로 실행할지, 순차로 실행할지 제어할 수 있다.
- parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
- 아래 코드는 parallel이 마지막 호출되었으므로 위 파이프라인은 병렬로 실행된다.
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
스트림 성능 측정
- 과연 어느것이 더 빠를까??
- 순차스트림
- 병렬스트림
- 전통적인 for loop
JMH
라이브러릴 이용해 성능 측정!
- 보통 for loop - 순차 스트림 - 병렬 스트림 순
- 왜일까??
- iterate가 박싱된 객체를 생성하므로 이를 다시 언박싱하는 과정이 필요했다.
반복 작업
은 병렬로 실행될 수 있도록 독립적인 청크로 분할하기 어렵다.
- 두 번째 이유는 굉장히 큰 문제
- iterate 연산은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 청크로 분할이 어렵다.
- 위와 같은 상황에서는 병렬 리듀싱 연산이 수행되지 않는다.
- 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수가 없기 때문이다.
- iterate같은 경우는 스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 thread에서 수행되었음에도 불구하고 순차처리 방식으로 처리되기 때문에 thread를 할당하는 오버헤드만 증가하게 될 뿐이다.
- 따라서 iterate와 같은 병렬과는 거리가 먼 방식을 사용하면 오히려 프로그램의 성능이 더 나빠질 수도 있다.
더 특화된 메서드 사용
- LongStream.rangeClosed라는 메서드를 활용할 수 있다. 이는 iterate에 비해 아래와 같은 장점이 있다.
- 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
- 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어, 1 ~ 20의 숫자 범위를 각각 1 ~ 5, 6 ~ 10, 11 ~ 15, 16 ~ 20 범위의 숫자로 분할할 수 있다.
LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long:sum);
- LongStream.rangeClosed를 활용하면 실질적으로 리듀싱 연산이 병렬로 수행된다.
- 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다.
병렬 스트림의 올바른 사용법
- 공유된 상태를 바꾸는 알고리즘을 사용할 때 병렬 스트림을 사용하면 문제가 발생한다.
public long sideEffectParalleSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
public class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}
- 10번 수행한 결과
- 올바른 결과값조차 나오지 않는다.
- 레이스 컨디션이 일어나기 때문
병렬 스트림 효과적으로 사용하기
- 확신이 서지 않을 때는 직접 측정해서 사용하라.
- 병렬 스트림이 순차 스트림보다 항상 성능이 좋은 것이 아니기 때문에 모를 때는 직접 성능 체크해보는 것이 정확하다.
- 박싱을 주의해서 사용하라.
- 오토박싱/언박싱은 성능을 크게 저하시킬 수 있는 요소다. 기본형 특화 스트림(ex. IntStream, LongStream, DoubleStream)을 활용하여 박싱 동작을 피할 수 있다.
- 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있음을 주의하라.
- limit이나 findFirst같이 요소의
순서에 의존
하는 연산을 병렬 스트림에 활용하게 되면 비싼 비용을 치뤄야 한다.
- 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
- 처리해야할 요소 수가 N이고 하나의 요소를 처리하는데 드는 비용이 Q라고 하면
- 전체 스트림 파이프라인 처리 비용을 N*Q로 예상할 수 있다.
- Q가 높아진다는 것은 병렬 스트림으로 성능 개선의 가능성이 있다는 것을 의미
- 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
- 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못한다.
- 스트림을 구성하는 자료구조가 올바른지 확인하라.
- 예를 들면, ArrayList가 LinkedList보다 효율적으로 분할할 수 있다.
- LinkedList는 분할하려면 모든 요소를 탐색해야 하지만 ArrayList는 요소를 탐색하지 않고도 리스트를 분할할 수 있다.
- 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
- 예를 들어, SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할되므로 효과적으로 스트림을 병렬처리 할 수 있다
- 반면, 필터 연산은 스트림의 길이를 예측할 수 없으므로 효과적으로 병렬 처리 할 수 있을지 알 수 없게 된다.
- 최종 연산의 병합 과정(ex. Collector의 combiner 메서드) 비용을 살펴봐라.
- 병합 과정의 비용이 비싸다면, 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.
포크/조인 프레임워크
병렬 스트림이 수행되는 내부 인프라구조는 자바7에서 추가된 포크/조인 프레임워크로 병렬 스트림이 처리된다.
- 포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
- 포크/조인 프레임워크에서는 서브태스크를
스레드 풀
(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
- RecursiveAction 또는 RecursiveTask 추상 클래스를 상속받아서 구현
- RecursiveAction: 반환값이 없을 때
- RecursiveTask: 반환값이 있을 떄
RecursiveTask 활용
- 스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다.
- RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.
protected abstract R compute();
- compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.
- compute 메서드 의사코드
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
- 재귀적인 태스크 분할 과정
- divide-and-conquer 알고리즘의 병렬화 버전이다.
fork & join으로 합 구하기
var pool = new ForkJoinPool();
var task = new SumTask(1, 10);
var resut = pool.invoke(task);
System.out.println(resut);
class SumTask extends RecursiveTask<Long> {
private final long start;
private final long end;
private static final long THRESHOLD = 10;
public SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start + 1;
if (length <= THRESHOLD) {
return sum();
}
long mid = (start + end) / 2;
SumTask leftSumTask = new SumTask(start, mid);
SumTask rightSumTask = new SumTask(mid + 1, end);
leftSumTask.fork();
Long rightResult = rightSumTask.compute();
Long leftResult = leftSumTask.join();
return rightResult + leftResult;
}
private Long sum() {
long ret = 0L;
for (long i = start; i < end; i++) {
ret += i;
}
return ret;
}
}
- ForkJoinPool은 fork & join 프레임웤에서 제공하는 쓰레드 풀로, 지정된 수의 쓰레드를 생성해서 미리 만들어 놓고 반복해서 재사용할 수 있게 한다.
fork()와 join()
- compute()는 작업을 나누고, fork()는 작업을 큐에 넣는다. (반복)
- join()으로 작업의 결과를 합친다. (반복)
- fork와 join의 차이점
- 비동기 메서드는 메서드를 호출만 할 뿐, 그 결과를 기다리지 않는다.
- 내부적으로는 다른 쓰레드에게 작업을 수행하도록 지시만 하고 결과를 기다리지 않고 돌아오는 것
작업 훔치기
- fork()가 호출되어 작업 큐에 추가된 작업 역시, compute()에 의해 더 이상 나눌 수 없을 때까지 반복해서 나뉘고, 자신의 작업 큐가 비어있는 쓰레드는 다른 쓰레드의 작업 큐에서 작업을 가져와서 수행한다.
- 이것을 작업 훔쳐오기라고 하며, 이 과정은 모두 쓰레드풀에 의해 자동적으로 이루어진다.
포크/조인 프레임워크를 제대로 사용하는 방법
- join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때 까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리게 되면서 순차 알고리즘보다 느리고 복잡한 프로그램이 될 수 있다.
- RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드 대신 compute나 fork 메서드를 호출한다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
- 두 서브태스크에서 메서드를 호출할 때는 fork와 compute를 각각 호출하는 것이 효율적이다. 그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
- 즉 compute()는 새 스레드를 사용하지 않기 때문
- 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
- 멀티코어에서 포크/조인 프레임워크를 사용하는 것이 순차처리보다 무조건 빠른 것은 아니다.
병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.
그렇다면 스트림은 어떻게 분할 로직을 개발하지 않고도 자동으로 스트림을 분할할까?? 바로 Spliterator 기법을 이용
Spliterator
- Spliterator는
분할할 수 있는 반복자
라는 의미다.
- Iterator 처럼 소스의 요소 탐색 기능을 제공하지만 병렬 작업에 특화돼있다.
- 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.
- 컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
분할 과정
- 스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
- 1단계: 첫 번째 Spliterator에 trySplit을 호출하면 두 번째 Spliterator가 생성된다.
- 2단계: 두개의 Spliterator에 trySplit을 다시 호출하면 4개의 Spliterator가 생성된다.
- 이처럼 trySplit의 결과가 null이 될 때 까지 이 과정을 반복한다.
- 3단계: trySplit이 null을 반환했다는 것은 더 이상 자료구조를 분할할 수 없음을 의미
- 4단계: Spliterator에 호출한 모든 trySplit의 결과가 null이면 재귀 분할 과정이 종료된다.
Spliterator의 특성
- Spliterator의 characteristics 추상 메서드는 Spliterator 자체의 특성 집합을 int 타입으로 반환한다.
- Spliterator 특성
정리
- 내부 반복을 이용하여 다른 스레드를 이용하지 않고도 스트림을 병렬로 처리할 수 있다.
- 병렬 처리 성능이 무조건 빠른 것이 아니기 때문에 성능 측정을 해보는 것이 좋다.
- 병렬 스트림은 처리해야할 데이터가 아주 많거나 각 요소를 처리하는데 오랜 시간이 걸릴 때 성능을 높일 수 있다.
- 기본형 특화 스트림을 이용하는 것이 병렬 처리보다 더욱 성능을 높일 수 있는 방법이다.
- 스트림의 병렬처리는 포크/조인 프레임워크 이용하여 병렬화할 수 있는 태스크를 작은 태스크로 분할한 후, 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.
- Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할 것인지를 정의한다.
참고 출처