
layout: post
title: "모던 자바 인 액션 스터디 - chapter7(2)"
date: 2022-04-25T00:00:00-00:00
author: sangyeop
categories: Sproutt-2nd
병렬화 할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 뒤 작은 작업들의 결과를 합쳐서 전체 결과를 만들도록 설계되어있다. 포크/조인 프레임워크에서는 작은 작업들을 스레드 풀(Fork Join Pool)의 작업자 스레드에 분살 할당하는 ExecutorService 인터페이스를 구현한다.
RecursiveTask<R>의 서브클래스를 구현해야 스레드 풀을 이용할 수 있다. RecursiveTask를 정의하기 위해서는 추상 메서드인 compute를 구현해야 한다. 대부분 아래와 같은 의사코드 형식을 따른다
if (태스크가 충분이 작거나 더 이상 분할할 수 없으면) {
순차적을 태스크 계산
} else {
태스크를 두 서브 태스크로 분할
태스크가 다시 서브 태스크로 분활되도록 이 메서드를 재귀적으로 호출
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
포크/조인 프레임워크를 이용한 병렬 합계 수행 코드
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers; // 더할 숫자 배열
private final int start; // 이 서브 태스크에서 처리할 배열의 초기위치와 최종 위치
private final int end;
private static final long THRESHOLD = 10_000; // 이 값 이하의 서브태스크는 더 이상 분할 X
public ForkJoinSumCalculator(long[] numbers) { // 메인 태스크를 생성할 때 사용할 공개 생성자
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) { // 메인 태스크의 서브태스크를 재귀적으로 만들때 사용할 비공개 생성자
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if(length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); // 배열 첫번째 절반 더하도록 서브태스크 생성
leftTask.fork(); // ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute(); // 두번째 서브태스크를 동기 실행한다. 이때 추가 분할이 일어날 수 있다.
Long leftResult = leftTask.join(); // 첫번째 태스크의 결과를 읽거나 아직 결과가 없으면 기다린다.
return leftResult + rightResult; // 두 결과를 조합하여 최종 결과를 반환한다.
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
ForkJoinPool은 일반적으로 필요한 곳에서 언제든 가져다 쓸 수 있도록 한번만 인스턴스화 해서 정적 필드에 싱글톤으로 저장한다. fork() 를 호출해서 인수가 없는 디폴트 생성자를 이용했는데, 이는 JVM에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있음을 의미한다.
ForkJoinSumCalculator 실행
ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 compute()메서드를 호출한다. 이후 위의 의사코드처럼 부분 결과를 합쳐서 최종 결과를 계산한다.

join()메서드를 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join()을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리는 일이 발생하며 원래의 순차 알고리즘보다 느리고 복잡한 프로그램이 된다.블록이란? : 어떤 요청이 발생하고 완료될 때까지 모든 일을 중단한 상태로 대기하는 것을 블로킹 방식이라고 한다. 즉 결과가 준비될 때까지 다른 작업을 중단하고 하염없이 기다리는 것을 의미한다.
RecursiveTask 내에서는 순차 코드에서 병렬 계산을 시작할 떄를 제외하고는 ForkJoinPool의 invoke() 메서드를 사용하지 말아야한다. 대신에 compute()나 fork() 는 직접 호출할 수 있다.fork() 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다. 왼쪽과 오른쪽 모두 fork()를 호출하는 것이 효율적일 것 같지만, 한쪽에서만 fork()를 호출하고 나머지 한쪽은 compute()를 호출하는 것이 더 효율적이다. 그래야만 두 서브태스크의 한 태스크에서는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.포크/조인 프레임워크에서는 작업 훔치기라는 기법으로 주어진 서브태스크를 더 분할할 것인지 결정할 기준을 정하는데 도움을 준다. 작업 훔치기를 통해서 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다.
따라서 태스크 크기를 작게 나누어야 스레드간 작업 부하를 비슷한 수준으로 유지할 수 있다.
Spliterator는 Iterator 처럼 소스 요소 탐색 기능을 제공한다는 점은 같지만, 병렬화에 특화되어 있다는 점에서 차이점이 있다. Spliterator는 다음과 같은 여러 메서드를 정의한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
tryAdavance
요소를 하나씩 순차적으로 소비하면서 탐색할 요소가 남아있다면 true를 반환한다.
trySplit()
Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 만든다
estimateSize()
탐색해야 할 요소의 수에 대한 값을 반환
trySplit() 첫번째 호출로 인해 두 개의 Spliterator가 된다trySplit() 를 호출하면 4개의 Spliterator가 된다trySplit()의 결과가 null이 될 때까지 이 과정을 반복한다.Spliterator 특성
characteristics()
Spliterator 자쳉의 특성 집합을 포함하는 int를 반환한다. Spliterator를 이용하는 프로그램은 이들 특성을 참고해서 Spliterator를 더 잘 제어하고 최적화 할 수 있다.
Spliterator의 특성
ORDERED
요소에 정해진 순서가 있으므로 탐색, 분할 할때 이 순서에 따라야 한다.
DISTINCT
x, y 두 요소를 방문했을때 x.equals(y)는 항상 false를 반환한다.
SORTED
탐색된 요소는 미리 정의된 정렬 순서를 따른다.
SIZED
estimatedSize()는 정확한 값을 반환한다.NON-NULL
탐색하는 모든 요소는 null이 아니다.
IMMUTABLE
불변이다. 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 변경할 수 없다.
CONCURRENT
동기화 없이
Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.SUBSIZED
분할되는 모든
Spliterator까지 모두 SIZED 특성을 갖는다.
반복형으로 단어 수를 세는 메서드
public class Main {
public static void main(String[] args) {
System.out.println(countWordsIteratively("채상엽 채상엽 채상엽"));
}
public static int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
} else {
if(lastSpace) counter++;
lastSpace = false;
}
}
return counter;
}
}
3
함수형으로 단어 수를 세는 메서드 재구현하기
반복형 대신 함수형을 사용하면 직접 스레드를 동기화하지 않고 병렬 스트림으로 작업을 병렬화할 수 있다. 먼저 String을 스트림으로 바꾸어주어야 한다.
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulator(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
}
public int getCounter() {
return counter;
}
}
public class Main {
private static final String SENTENCE = "채채채 상상상 엽엽엽";
public static void main(String[] args) {
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println(countWords(stream));
}
private static int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulator,
WordCounter::combine);
return wordCounter.getCounter();
}
}
3
스트림을 탐색하면서 새로운 문자를 찾을 때마다 accumulate() 메서드를 호출한다. countWordsIteratively()에서와 같이 새로운 비공백 문자를 탐색하고 마지막 문자가 공백이면 counter를 증가시킨다. 그리고 combine()은 WordCounter 내부의 counter 값을 서로 합치는 역할을 한다.
WordCounter 병렬로 수행하기
아래 처럼 병렬 스트림으로 처리하려고 하면 원하는 답이 나오지 않는다.
System.out.println(countWords(stream.parallel()));
그 이유는 원래 문자열의 임의의 위치에서 둘로 분할하다 보니, 예상치 못하게 하나의 단어를 둘로 나누어서 계산하는 경우가 발생할 수 있기 때문이다. 이를 해결하기 위해서는 문자열의 임의의 위치에서 분할하는 것이 아닌, 단어가 끝나는 위치에서만 분할이 이루어지도록 함으로써 문제를 해결할 수 있다.
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++)); // 현재 문자 소비
return currentChar < string.length(); // 소비할 문자가 남아 있으면 true 반환
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null; // 파싱할 문자열이 순차 처리할 수 있을 만큼 충분히 작아졌으면 null 반환
}
// 파싱할 문자열의 중간을 분할 위치로 설정
for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
// 다음 공백이 나올떄까지 분할 위치를 뒤로 이동시킨다
if (Character.isWhitespace(string.charAt(splitPos))) {
// 처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator 생성
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
currentChar = splitPos; // WordCounterSpliterator의 시작위치를 분할위치로 설정
return spliterator; // 공백 찾고 문자열 분리 했으므로 반복 종료
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}