자바 7이 등장하기 전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠다.
데이터를 서브파트로 분할하고, 분할된 서브파트를 각각의 스레드로 할당하고, 레이스 컨디션이 발생하지 않도록 동기화 문제에도 신경써야 했고, 마지막으로 부분 결과를 합치는 일련의 과정들을 거쳐야 했다.
자바 7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인 프레임워크 기능을 제공
📝 스트림으로 데이터 컬렉션 관련 동작을 얼마나 쉽게 병렬로 실행할 수 있는지포크/조인 프레임워크와 내부적인 병렬 스트림 처리는 어떤 관계가 있는지
병렬 스트림이 요소를 여러 청크로 분할하는 방법
각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
// 반복형
public static long iterativeSum(long n) {
long result = 0;
for(long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
// 순차 스트림
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림 생성
.limit(n) // n 개 이하로 제한
.reduce(0L, Long::sum); // 모든 숫자를 더하는 스트림 리듀싱 연산
}
// 병렬 스트림
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
리듀싱 연산을 여러 청크에 병렬로 수행
리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출
병렬 스트림에서 사용하는 스레드 풀 설정
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime) // 벤치마크 대상 메서드를 실행하는 데 걸린 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) // 벤치마크 결과를 밀리초 단위로 출력
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
public class Sample {
private static final long N = 10_000_000L;
@Benchmark
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
@Benchmark
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림 생성
.limit(N) // n 개 이하로 제한
.reduce(0L, Long::sum); // 모든 숫자를 더하는 스트림 리듀싱 연산
}
@Benchmark
public long parallelSum() {
return Stream.iterate(1L, i -> i + 1)
.limit(N)
.parallel()
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation) // 매 번 벤치마크를 실행한 다음에도 가비지 컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
Benchmark Mode Cnt Score Error Units
Sample.iterativeSum avgt 3.182 ms/op
Sample.parallelSum avgt 101.162 ms/op
Sample.sequentialSum avgt 74.647 ms/op
@Benchmark
public long rangedSum() {
return LongStream.rangeClosed(1, N)
.reduce(0L, Long::sum);
}
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
Benchmark Mode Cnt Score Error Units
Sample.iterativeSum avgt 3.578 ms/op
Sample.parallelRangedSum avgt 0.474 ms/op
Sample.parallelSum avgt 110.169 ms/op
Sample.rangedSum avgt 3.972 ms/op
Sample.sequentialSum avgt 87.143 ms/op
올바른 자료구조를 선택해야 최적의 성능을 발휘할 수 있다. 그리고 멀티코어 간의 데이터 이동의 비용은 비싸기 때문에 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직
포크/조인 프레임워크란?
스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다. R은 병렬화된 태스크가 생성하는 결과 형식을 의미하며, 결과가 없는 Void 형태일 경우 RecursiveAction을 이용
RecurisiveTask를 정의하려면 추상 메서드 compute를 구현해야 함
// 주요 산술 연산이 해당 태스크에 의해 수행됨.
protected abstract void compute();
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
포크 조인 과정
범위의 숫자를 더하는 문제
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
private static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if(length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for(int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
Spliterator의 특성
ORDERED | 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다. |
---|---|
DISTINCT | x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환 |
SORTED | 탐색된 요소는 미리 정의된 정렬 순서를 따른다. |
SIZED | 크기가 알려진 소스(예를 들면 Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환 |
NON-NULL | 탐색하는 모든 요소는 null이 아니다 |
IMMUTABLE | 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 고칠 수 없다. |
CONCURRENT | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다. |
SUBSIZED | 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다. |
문자열의 단어 수를 계산하는 단순한 메서드 구현
public int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
} else {
if (lastSpace) counter++;
lastSpace = false;
}
}
return counter;
}
스트림에 리듀싱 연산을 통해 단어의 수를 계산할건데, 지금까지 발견한 단어의 수를 계산한느 int 변수와 마지막 문자가 공백이었는지 여부를 기억하는 Boolean 변수가 필요
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace));
}
public int getCounter() {
return counter;
}
}
accumulate : Word의 상태를 어떻게 바꿀 것인지를 정의, 스트림을 탐색하면서 새로운 문자를 찾을 때마다 호출
새로운 비공백 문자를 탐색한 다음에 마지막 문자가 공백이면 counter 증가
combine : 문자열 서브 스트림을 처리한 WordCounter의 결과를 합친다.
private int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++));
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if(currentSize < 10) {
return null;
}
for(int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
if(Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}