이번 chpt에서는 지난 chpt에 이어 포크/조인 프레임워크와 spliterator 인터페이스에 대해 알아보는 내용을 다룰 것이다.
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작게 분할하여 서브태스크 각각의 결과를 합쳐 전체 결과를 만들도록 설계되어있다.
포크/조인 프레임 워크에서는 서브태스크를 스레드풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
RecursiveTask<R>
의 서브 클래스를 만든다.R
은 병렬화도니 태스크가 생성하는 결과 형식이다.RecursiveAction
형식이다.RecursiveTask
를 정의하기 위해선 compute
라는 추상 메서드를 구현 해야한다.protected abstract R compute();
compute()
: task를 subtask로 분할하는 로직 + 분할할수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘 정의
if(task가 충분히 작거나 더 이상 분할할 수 없을 떄){
순차적으로 task 계산
} else {
task를 두 subtask로 분할
task가 다시 subtask로 분할되도록 이 메서드를 재귀적으로 호출
모든 subtask의 연산이 완료될 때 까지 대기
각 subtask의 결과를 합침
}
예시로, 범위의 숫자를 더하는 문제를 구현하며 사용방법을 확인하자
package hello.servlet.web.springmvc.v2;
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long>{
private final long [] numbers;
private final int start;
private final int end;
private final long THRESHOLD = 10_000;
//main task 생성시 public 생성자
public ForkJoinSumCalculator(long[] numbers){
this(numbers, 0, numbers.length);
}
//recursive subtask 생성시 non public 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end){
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute(){
int length = end - start; // task에서 더할 배열의 길이
if (length <= THRESHOLD){
return computeSequentially(); // 기준값보다 작으면 순차적으로 결과를 계산
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork(); // ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute(); // 두 번째 서브태스크를 동기 실행, 추가 분할 일어날 수 있음.
Long leftResult = leftTask.join(); // 첫 번째 서브태스크의 결과를 읽거나 아직 없으면 기다린다.
return leftResult + rightResult;
}
//분할 더 이상 안될 때 서브태스크 결과 계산해주는 알고리즘.
private Long computeSequentially(){
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
다음 코드처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘겨줄 수도 있다.
public static long forkJoinSum(long n){
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
위 메서드는 LongStream
으로 n까지의 자연수를 포함하는 배열을 생성 한 후 ForkJoinSumCalculator
의 생성자로 전달하여 ForkJoinTask
를 만들었다. 마지막으로 생성한 태스크를 새로운 ForkJoinPool
의 invoke
메서드로 전달했다. ForkJoinPool
에서 실행되는 마지막 invoke
메서드의 반환값은 ForkJoinSumCalculator
에서 정의한 태스크의 결과가 된다.
일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool
을 사용하지 않는다. 한 번만 인스턴스화해서 정적필드에 싱글턴으로 저장한다.
실행해보면 병렬 스트림을 이용할 때 보다 성능이 더 나빠졌다. 그 이유는 ForkJoinSumCalculator
태스크에서 사용할 수 있도록 전체 스트림을 long[]
으로 변환했기 때문이다.
쉽게 사용할 수 있지만 주의를 기울여야한다. 다음은 포크/조인 프레임워크를 효과적으로 사용하는 방법이다.
RecursiveTask
내에서는 invoke()
사용 금지 (순차 병렬 계산 시작시에만 사용)compute
나 fork
를 직접 호출한다.fork
메서드를 호출하여 ForkJoinPool
일정 조절할 수 있다. fork
, fork
가 아니라 fork
, compute
를 호출하는 것이 효과적이다.compute
를 호출하므로 스택 트레이스 무용지물 됨포크/조인 프레임워크에서는 작업 훔치기(work stealing)라는 기법을 사용한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트(doubley linked list)를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있는데, 할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드의 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지 이 과정을 반복한다. 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업 부하를 비슷한 수준으로 유지할 수 있다.
자바 8에서 Spliterator라는 새로운 인터페이스 제공. Spliterator는 분할할 수 있는 반복자(spliatable iterator)라는 의미다. 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에서 사용할 수 있는 디폴드 Spliterator 구현을 제공한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
T
: 탐색하는 요소의 형식tryAdvance()
: Spliterator의 요소를 하나씩 순차 소비하며 탐색할 요소가 남아 있으면 참을 반환trySplit()
: Spliterator의 일부 요소를 분할하여 두 번째 Spliterator를 생성estimateSize()
: 탐색해야 할 요소 수 정보 제공characteristics()
: Spliter의 특성을 의미한다.특성 | 의미 |
---|---|
ORDERED | 리스트처럼 요소에 정해진 순서가 있으므로 요소를 탐색하고 분할할 때 이 순서에 유의해야한다 |
DISTINCT | x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다. |
SORTED | 탐색된 요소는 미리 정의된 정렬 순서를 따른다 |
SIZED | 크기가 알려진 소스로 Spliterator를 생성했으므로 estimateSize는 정확한 값을 반환한다. |
NONNULL | 탐색하는 모든 요소는 null이 아니다 |
IMMUTABLE | 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제, 수정이 불가하다. |
CONCURRENT | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다. |
SUBSIZED | Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다. |
(1) 첫 번째 Spliterator에 trySplit를 호출하면 두 번째 Spliterator가 생성된다.
(2) 두 개의 Spliterator에 다시 trySplit를 호출하면 네 개의 Spliterator가 생성된다.
(3) trySplit의 결과가 null이 될 때까지 반복한다.
(4) 재귀 분할 과정이 종료된다.
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
// 현재 문자를 소비한다.
action.accept(string.charAt(currentChar++));
//소비할 문자가 남아있으면 true를 반환한다.
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
// 파싱할 문자열을 순차 처리할 수 있을 만큼 충분히 작아졌음을 알리는 null을 반환
if (currentSize < 10) {
return null;
}
for (int splitPos = currentSize / 2 + currentChar;
// 파싱할 문자열의 중간을 분할 위치로 설정
splitPos < string.length(); splitPos++) {
// 다음 공백이 나올 때까지 분할 위치를 뒤로 이동
if (Character.isWhitespace(string.charAt(splitPos))) {
// 처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator 생성
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentChar,
splitPos));
// 이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정
currentChar = splitPos;
// 공백을 찾았고 문자열을 분리했으므로 루프를 종료
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NON-NULL + IMMUTABLE;
}
}