[모던 자바 인 액션] 병렬 데이터 처리와 성능

신명철·2022년 12월 29일
1

모던 자바 인 액션

목록 보기
3/3

병렬 스트림

컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림을 말한다. parrallel()이나 sequential()로도 병렬 혹은 순차 스트림으로 중간에 변경이 가능하지만, 이는 내부적으로 병렬 혹은 순차 여부를 나타내는 boolean 값만 변경해주는데 의미가 있다.

전체적인 동작의 병렬 혹은 순차 여부는 마지막에 호출되는 스트림 동작 방식에 의해 결정된다. 다음 코드들을 통해 이해해보자.

stream.parallel()
	.filter(...)
	.sequential()    
    .map(...)
    .parallel()
    .reduce();
  • 마지막에 호출된 메서드는 parallel()이므로 이 코드는 병렬로 실행된다.

스트림 성능 측정

private static void streamSequential() {
	getTime(t -> {
		return t.reduce(0L, Long::sum);
	}, Stream.iterate(1L, i -> i + 1).limit(N), "streamSequential()");
}

private static void streamParallel() {
	getTime(t -> {
		return t.reduce(0L, Long::sum);
	}, Stream.iterate(1L, i -> i + 1).limit(N).parallel(), "streamParallel()");
}

private static void longStreamSequential() {
	getTime(t -> {
		return t.reduce(0L, Long::sum);
	}, LongStream.rangeClosed(1, N), "longStreamSequential()");
}

private static void longStreamParallel() {
	getTime(t -> {
		return t.reduce(0L, Long::sum);
	}, LongStream.rangeClosed(1, N).parallel(), "longStreamParallel()");
}

private static <T, R> void getTime(Function<T, R> function, T t, String msg) {
	System.out.print(msg + " = [");
	long startTime = System.currentTimeMillis();
	function.apply(t);
	long endTIme = System.currentTimeMillis();
	System.out.println(endTIme - startTime + "ms]");
}
streamParallel() = [604ms]
streamSequential() = [192ms]
longStreamParallel() = [33ms]
longStreamSequential() = [32ms]

위 Stream의 결과를 보면 알 수 있듯, 항상 병렬 처리가 더 빠른 것만은 아니다. Parralel보다 Sequential이 더 빠른 성능을 보인 점은, 주어진 Stream.iterate가 병렬 처리를 위해 청크 단위로 분할하기가 어렵기 때문이다.

반면 LongStream 을 사용한 stream이 훨씬 빠른 성능을 보였는데, 그 이유는 박싱과 언박싱 오버헤드가 사라졌기 때문이다. (LongStream은 primitive type인 long을 직접 사용한다.) 그리고 rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산해주기 때문에 병렬 처리를 위한 독립적인 청크를 효과적으로 생성할 수 있었기 때문이다.

병렬 스트림 효과적으로 사용하기

  • 박싱을 주의하라. 오토박싱과 언박싱은 성능을 크게 저하시킬 수 있다. 특화된 스트림(LongStream, IntStream, DoubleStream) 사용을 고려하자.
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 존재한다. limit, findFirst 같이 요소의 순서에 의존하는 연산을 수행하기 위해서는 비싼 비용을 치러야만 한다. 반면 findAny는 요소의 순서와 상관없으므로 성능이 좋다.
  • 소량의 데이터에서는 병렬화 과정에서 발생하는 부가 비용에 의해 성능이 더 안좋을 수 있다.
  • 스트림을 구성하는 자료구조가 적절한지 확인하라. ArrayListLinkedList보다 더 효율적인데, 그 이유는 LinkedList 같은 경우, 분할하기 위해 모든 요소를 탐색해야만 하기 때문이다.

ForkJoin 프레임워크

  • Fork/Join 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할하고, 서브 태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계됐다.
  • Fork/Join 프레임워크에서는 서브 태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

포크 조인 프레임워크는 작업 훔치기 라는 기법을 통해 모든 스레드를 거의 공정하게 분할하고 각각의 스레드가 유휴 상태에 빠지지 않게 다른 스레드의 큐에 있는 작업을 훔쳐와서 작업을 수행한다. 모든 태스크가 작업을 끝날 때 까지 이 과정을 반복한다.

포크 조인 프레임워크를 사용하기 위해서는 이미 구현되어 있는 추상클래스 RecursiveActionRecursiveTask를 사용하면 된다.

  • RecursiveAction : 작업의 결과를 반환하지 않는다.
  • RecursiveTask : 작업의 결과를 반환한다.

ForkJoin 프레임워크를 이용해 배열의 합을 구해보자.

public class ForkJoinSumCalculator extends RecursiveTask<Long>{

	private final long[] numbers;
	private final int start;
	private final int end;
	public static final long THRESHOLD = 500;
	
	public ForkJoinSumCalculator(long[] numbers) {
		this(0, numbers.length, numbers);
	}

	private ForkJoinSumCalculator(int start, int end, long[] numbers) {
		this.numbers = numbers;
		this.start = start;
		this.end = end;
	}
	
	@Override
	protected Long compute() {
		int length = end - start;
		if(length <= THRESHOLD) {
			return computeSequentially();
		}
		ForkJoinSumCalculator f1 = new ForkJoinSumCalculator(start, start + length / 2, numbers);
		ForkJoinSumCalculator f2 = new ForkJoinSumCalculator(start + length / 2, end, numbers);
		f1.fork();
		Long rightResult = f2.compute();
		Long leftResult = f1.join();
		
		return leftResult + rightResult;
	}
	
	private long computeSequentially() {
		long sum = 0;
		for(int i = start ; i < end ; i++) sum += numbers[i];
		return sum;
	}
}

포크/조인 프레임워크를 제대로 사용하는 방법

  • join 메서드를 태스크에 호출하면 태스크의 결과가 리턴될 때 까지 블록된다. 따라서 join은 두 서브태스크가 모두 시작된 다음에 호출해야 한다. 그렇지 않으면 서브태스크가 다른 태스크가 끝날때까지 기다리는 일이 발생하면서 더 느려질 수 있다.
  • RecursiveTask 내에서는 invoke 메서드를 사용하면 안된다. 대신 computefork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
  • 왼쪽 태스크와 오른쪽 태스크 모두에 fork를 호출하는게 자연스럽지만, 같은 스레드를 재사용하기 위해 한쪽에는 fork, 다른 한쪽에는 compute를 호출하는 것이 스레드 사용 효율면에서 효율적이다.
  • 디버깅하기 어렵다. fork 라 불리는 다른 스레드에서 compute를 호출하기 때문에 스택 트레이스가 도움이 되지 않기 때문이다.

Spliterator 인터페이스

Spliterator분할할 수 있는 반복자라는 의미이다. Iterator 처럼 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다.

public interface Spliterator<T> {
	boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
	long estimateSize();
    int characteristics();
}
  • tryAdvance : Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 TRUE을 반환한다.
  • trySplit : Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성한다. 더 이상 분할할 요소가 없어서null이 반환될 때 까지 진행된다.
  • estimateSize : 메서드로 탐색해야 할 요소 수 정보를 제공한다.
  • characteristics : Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
public class SumSpliterator implements Spliterator<Integer>{

	private final Integer[] numbers;
	private final int THRESHOLD = 10;
	private int index = 0;
	
	public SumSpliterator(Integer[] numbers) {
		this.numbers = numbers;
	}

	@Override
	public boolean tryAdvance(Consumer<? super Integer> action) {
		action.accept(numbers[index++]);
		return index < numbers.length;
	}

	@Override
	public Spliterator<Integer> trySplit() {
		int length = numbers.length - index;
		if(length <= THRESHOLD) {
			return null;
		}
		Integer[] newNumbers = Arrays.copyOfRange(numbers, index, Math.min(numbers.length, index + THRESHOLD));
		this.index = Math.min(numbers.length, index + THRESHOLD);
		return new SumSpliterator(newNumbers);
	}

	@Override
	public long estimateSize() {
		return numbers.length - index;
	}

	@Override
	public int characteristics() {
		return SIZED + NONNULL + IMMUTABLE;
	}
}
public static void main(String[] args) {
	Integer[] list = IntStream.rangeClosed(0, 100).boxed().toArray(Integer[]::new);
	Spliterator<Integer> spliterator = new SumSpliterator(list);
	Stream<Integer> stream = StreamSupport.stream(spliterator, true);
	System.out.println(stream.reduce(0, Integer::sum, Integer::sum)); // reduce(identity, accumulator, combiner)
}
profile
내 머릿속 지우개

0개의 댓글