
병렬 스트림을 이해하기 위해서는 먼저 Fork Join Pattern을 이해해야 한다.
작업을 단일 스레드로 처리하면 스레드가 하나밖에 없기 때문에 작업 시간이 늘어난다.
멀티 스레드로 작업을 나누어 실행하면 작업 시간을 단축할 수 있다.
이 때, 스레드 풀을 사용하여 값을 반환하는 스레드를 실행하면
작업을 나누고(Fork), 각 스레드를 실행하고(Execute),
결과를 받아 합치는(Join) 형태가 완성되는데 이 패턴을 Fork Join Pattern이라고 한다.

Java는 Fork Join Pattern을 쉽게 사용할 수 있는 Fork Join Framework를 제공한다.
ForkJoinPool forkJoinPool = new ForkJoinPool(n); // ** 기본값 : CPU 코어 개수**
// 사용
pool.invoke(new Task()); // ** 호출한 스레드도 분할된 작업 수행에 참여 **
ForkJoinTask<R> future = pool.submit(new Task()); // ** 호출한 스레드는 작업 수행에서 제외 **
R result = future.get();
// 자원 정리
pool.close();
ForkJoinPool은 Fork Join 작업을 수행하는 ExecutorService 스레드 풀의 한 종류이다.
생성할 스레드의 수를 설정할 수 있고 생략하면 CPU 코어 개수대로 스레드가 생성된다.
invoke() 메서드는 호출한 스레드도 분할된 작업의 수행에 참여하면서
다른 스레드에게 작업 수행을 명령하는 메서드이다.
만약 호출하는 스레드를 작업 수행에서 제외하고 싶다면,
submit() 메서드를 호출하여 큐에 넣고 결과를 반환받아야 한다.
사용 후에는 명시적으로 자원을 정리해주어야 한다.
public class Task extends RecursiveTask<T> {
..
@Override
protected T compute() {
..
dividedTask1.fork(); // ** 분할한 작업을 다른 스레드가 실행 가능 **
dividedTask1.compute(); // ** 분할한 작업을 자기 자신 스레드가 실행 **
dividedTask1.join(); // ** 분할 작업 수행 요청을 대기하고 결과를 반환 받는 메서드 **
..
}
}
ForkJoinTask는 Future을 구현한 Fork Join 작업의 추상 클래스이다.
결과를 반환하는 RecursiveTask<T>와
결과를 반환하지 않는(void) RecursiveAction으로 나누어진다.
compute() 메서드를 오버라이딩하여 작업 로직을 구현할 수 있다.
보통 Threshold(임계값)를 설정하여 작업을 분할하지 않고 처리할 크기를 설정한다.
처리할 작업의 크기가 Threshold 이상일 때,
fork() 메서드를 호출하면 내부적으로 분할한 작업을 다른 스레드가 실행할 수 있다.
분할한 작업을 스스로 수행하려면 compute() 메서드를 재귀적으로 호출한다.
join() 메서드를 호출하면 분할 작업을 수행중인 스레드가 결과를 반환할 때까지 대기한다.
그렇다면 작업의 크기인 Threshold는 어떻게 설정하는 것이 좋을까?
결론적으로는 성능 테스트를 통해 Threshold를 설정할 수 밖에 없다.
하드웨어 특성에 따라 작업을 처리하는 환경이 다르기 때문이다.
하드웨어적인 특성 외에도 작업의 상태를 고려해야 한다.
작업이 단순한 경우 오히려 작업 수행보다 작업 분할에 필요한 오버헤드가 더 커질 수 있다.
작업 처리 시간이 불균일한 상황이 발생하는 경우도 적절하게 대응해야 하며
너무 많은 작업 분할은 스레드 관리 비용이 발생한다는 점도 고려해야 한다.
보통 작업의 수는 스레드 수의 4 ~ 10배로 설정한다.

ForkJoinPool의 스레드는 각각 작업 Queue를 보유하고 있다.
스레드는 처리할 작업이 없으면 다른 스레드의 작업을 훔쳐와서(steal) 수행할 수 있다.
fork()는 분할한 작업을 자기 자신 스레드 큐에 쌓아놓게 되는데,
쌓아 놓은 작업을 다른 스레드가 훔쳐가 작업을 수행하여 분할 작업이 이루어진다.
작업을 보관할 때에는 위에서, 작업을 훔쳐갈 때에는 아래에서 가져가
경합을 덜 발생시키는 구조로 동작한다.

// Fork Join 공용 풀 접근
ForkJoinPool forkJoinCommonPool = ForkJoinPool.commonPool();
Task task = new Task();
// ** Fork Join 공용 풀 자동 사용 **
task.invoke(); // 호출한 스레드도 작업에 참여
task.fork(); // 호출한 스레드 작업 제외
Fork Join 공용 풀은 자바가 제공하는 Fork Join 전용 풀이다.
시스템 전체에서 단일 인스턴스로 공유하여 사용하기 때문에 자원을 효율적으로 관리할 수 있다.
RecursiveTask 또는 RecursiveAction을 구현한 작업에서 메서드를 호출하면
별도로 생성한 ForkJoinPool에 별도로 전달하지 않고
자바가 제공하는 Fork Join 공용 풀을 자동으로 사용하여 작업을 수행할 수 있다.
invoke() 메서드를 호출하면 호출한 스레드도 분할된 작업 수행에 참여하고
fork() 메서드를 호출하면 호출한 스레드는 작업 수행에서 제외시킨다.
스레드의 수는 메인 스레드의 참여를 고려하여 가용 프로세서 수에서 1을 뺀 값으로 설정된다.
Java에서 관리하는 스레드 풀이기 때문에 사용 후 별도로 자원을 정리할 필요가 없다.

Stream.of().parallel()..
Stream에서는 parallel()를 호출하면 병렬로 작업을 처리할 수 있다.
parallel()은 자동으로 invoke()를 호출하고,
fork()로 작업을 분할하고, join()으로 반환한 결과를 자동으로 합치는 작업을 수행한다.
자동으로 병렬 작업을 수행하여 성능을 향상시켜주는 parallel() 메서드는
언뜻 보기에는 마법같은 만능 메서드로 보이지만 사실 사용에는 명확한 한계가 있는데
바로 CPU 바운드 작업에만 사용해야 한다는 점이다.
parallel() 호출 시 사용하는 Fork Join Pool은
CPU 코어 수에 맞추어 생성된 제한된 개수의 스레드를 공용으로 사용한다.
작업이 밀리면 스레드를 늘리는 것 없이 제한된 스레드만을 사용하기 때문에
작업이 처리될 때까지 계속해서 대기하는 문제가 발생한다.
백엔드의 경우 대부분이 CPU가 대기 상태인 경우가 많은 I/O 바운드 작업인데,
공용 풀을 사용하는 경우 모든 스레드를 계산이 아닌 대기를 위해 소모하게 되고
대기를 위한 대기 시간만 늘어나게 되는 최악의 상황이 펼쳐진다.
이와 같은 이유로 백엔드에서는 parallel() 메서드를 사용하는 경우가 극히 드물다.
따라서 I/O 바운드 작업은 반드시 별도의 스레드 풀을 생성해서 사용하고
공용 풀을 사용할 떄에는 반드시 계산 집약적인 CPU 바운드 작업에만 사용해야 한다.

CompletableFuture.runAsync(Runnable runnable, ExecutorService es) // ** 스레드 풀 지정 **
실무의 멀티스레드 상황에서 CompletableFuture을 종종 사용하게 된다.
CompletableFuture을 사용할 때 별도의 스레드 풀을 지정하지 않으면
바로 이 Fork Join 공용 Pool을 사용하게 되는데,
제한된 스레드의 개수를 사용하는 특징을 알고 있지 못한다면
이로 인해 발생하는 성능 저하의 원인을 찾지 못하고 오랜 시간을 소모할 수 있다.
CompletableFuture을 사용할 때에는 반드시 스레드 풀을 생성해서 지정하도록 하자.

parallel() 메서드를 처음 봤을 때에는 '우와, 사기캐다'라는 생각이 들었지만
명확한 한계를 보고 나서 김 식는 느낌을 받는건 어쩔 수 없는 것 같다.
그렇지만 CPU 바운드 작업에 한하여 유용하게 사용할 수 있으니 참고하도록 하자.
I/O 바운드 작업을 처리하는 백엔드에서는 별도의 스레드 풀을 생성하는 것을 잊지 말고,
특히 실무에서 중요한 CompletableFuture 사용 시 별도의 스레드 풀을 꼭 전달하자.