[모던 자바인 액션] chpt.7 병렬 데이터 처리와 성능 (2)

sameul__choi·2022년 4월 27일
0

[모던 자바인 액션]

목록 보기
10/11
post-thumbnail

이번 chpt에서는 지난 chpt에 이어 포크/조인 프레임워크와 spliterator 인터페이스에 대해 알아보는 내용을 다룰 것이다.

00 Fork/Join Framework

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작게 분할하여 서브태스크 각각의 결과를 합쳐 전체 결과를 만들도록 설계되어있다.

포크/조인 프레임 워크에서는 서브태스크를 스레드풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

00.1 Recursive Task 활용

  • 스레드 풀을 이용하기 위해 RecursiveTask<R>의 서브 클래스를 만든다.
  • 여기서 R은 병렬화도니 태스크가 생성하는 결과 형식이다.
    - 결과가 없다면 RecursiveAction 형식이다.
  • RecursiveTask를 정의하기 위해선 compute 라는 추상 메서드를 구현 해야한다.
protected abstract R compute();

compute(): task를 subtask로 분할하는 로직 + 분할할수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘 정의

if(task가 충분히 작거나 더 이상 분할할 수 없을 떄){
	순차적으로 task 계산
} else {
	task를 두 subtask로 분할
    task가 다시 subtask로 분할되도록 이 메서드를 재귀적으로 호출
    모든 subtask의 연산이 완료될 때 까지 대기
    각 subtask의 결과를 합침
}
  • 이 알고리즘은 분할정복 알고리즘의 병렬화 버전이다.

예시로, 범위의 숫자를 더하는 문제를 구현하며 사용방법을 확인하자

package hello.servlet.web.springmvc.v2;

public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long>{
	private final long [] numbers;
	private final int start;
	private final int end;
	private final long THRESHOLD = 10_000;
	
	//main task 생성시 public 생성자
	public ForkJoinSumCalculator(long[] numbers){
		this(numbers, 0, numbers.length);
	}
	
	//recursive subtask 생성시 non public 생성자
	private ForkJoinSumCalculator(long[] numbers, int start, int end){
		this.numbers = numbers;
		this.start = start;
		this.end = end;
	}
	
	@Override
	protected Long compute(){
		int length = end - start; // task에서 더할 배열의 길이
		if (length <= THRESHOLD){
			return computeSequentially(); // 기준값보다 작으면 순차적으로 결과를 계산
		}
		ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
		leftTask.fork(); // ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행
		ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
		
		Long rightResult = rightTask.compute(); // 두 번째 서브태스크를 동기 실행, 추가 분할 일어날 수 있음.
		Long leftResult = leftTask.join(); // 첫 번째 서브태스크의 결과를 읽거나 아직 없으면 기다린다.
		
		return leftResult + rightResult;
	}
	
	//분할 더 이상 안될 때 서브태스크 결과 계산해주는 알고리즘.
	private Long computeSequentially(){
		long sum = 0;
		for (int i = start; i < end; i++) {
			sum += numbers[i];
		}
		return sum;
	}
}

다음 코드처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘겨줄 수도 있다.

public static long forkJoinSum(long n){
		long[] numbers = LongStream.rangeClosed(1, n).toArray();
		ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
		return new ForkJoinPool().invoke(task);
	}

위 메서드는 LongStream으로 n까지의 자연수를 포함하는 배열을 생성 한 후 ForkJoinSumCalculator의 생성자로 전달하여 ForkJoinTask를 만들었다. 마지막으로 생성한 태스크를 새로운 ForkJoinPoolinvoke 메서드로 전달했다. ForkJoinPool에서 실행되는 마지막 invoke 메서드의 반환값은 ForkJoinSumCalculator에서 정의한 태스크의 결과가 된다.

일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다. 한 번만 인스턴스화해서 정적필드에 싱글턴으로 저장한다.

실행해보면 병렬 스트림을 이용할 때 보다 성능이 더 나빠졌다. 그 이유는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[]으로 변환했기 때문이다.

00.2 fork/join 프레임워크를 제대로 사용하려면

쉽게 사용할 수 있지만 주의를 기울여야한다. 다음은 포크/조인 프레임워크를 효과적으로 사용하는 방법이다.

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비 될 때까지 blcok
    - 두 서브태스크가 모두 시작된 다음에 join 호출, 그렇지 않으면 다른 태스크가 끝나기를 기다리면서 순차 알고리즘보다 느리고 복잡해진다.
  • RecursiveTask 내에서는 invoke() 사용 금지 (순차 병렬 계산 시작시에만 사용)
    - 대신 computefork를 직접 호출한다.
  • 서브테스크에서 fork 메서드를 호출하여 ForkJoinPool 일정 조절할 수 있다. fork, fork가 아니라 fork, compute를 호출하는 것이 효과적이다.
    - 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있게 되어 불필요한 태스크 할당 오버헤드를 피할 수 있다.
  • 포크/조인 프레임워크 사용하면 디버깅 어려움
    - 다른 스레드에서 compute를 호출하므로 스택 트레이스 무용지물 됨
  • 무조건 순차처리 보다 빠를 것이란 생각을 버려라 아래의 조건이 충족 되어야 성능이 개선된다.
    - 테스크를 여러 독립적인 테스크로 분할할 수 있어야함.
    - 서브태스크의 실행시간이 새로운 태스크를 포킹하는 시간보다 길어야 함.
    - 컴파일러 최적화는 병렬보다 순차버전에 집중될 수 있음을 알고 사용.

00.3 작업 훔치기

포크/조인 프레임워크에서는 작업 훔치기(work stealing)라는 기법을 사용한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트(doubley linked list)를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있는데, 할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드의 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지 이 과정을 반복한다. 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업 부하를 비슷한 수준으로 유지할 수 있다.

01 Spliterator 인터페이스

자바 8에서 Spliterator라는 새로운 인터페이스 제공. Spliterator는 분할할 수 있는 반복자(spliatable iterator)라는 의미다. 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에서 사용할 수 있는 디폴드 Spliterator 구현을 제공한다.

public interface Spliterator<T> {
	boolean tryAdvance(Consumer<? super T> action);
	Spliterator<T> trySplit();
	long estimateSize();
	int characteristics();
}
  • T : 탐색하는 요소의 형식
  • tryAdvance() : Spliterator의 요소를 하나씩 순차 소비하며 탐색할 요소가 남아 있으면 참을 반환
  • trySplit() : Spliterator의 일부 요소를 분할하여 두 번째 Spliterator를 생성
  • estimateSize() : 탐색해야 할 요소 수 정보 제공
  • characteristics() : Spliter의 특성을 의미한다.

Spliterator 특성

특성의미
ORDERED리스트처럼 요소에 정해진 순서가 있으므로 요소를 탐색하고 분할할 때 이 순서에 유의해야한다
DISTINCTx, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다.
SORTED탐색된 요소는 미리 정의된 정렬 순서를 따른다
SIZED크기가 알려진 소스로 Spliterator를 생성했으므로 estimateSize는 정확한 값을 반환한다.
NONNULL탐색하는 모든 요소는 null이 아니다
IMMUTABLE이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제, 수정이 불가하다.
CONCURRENT동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
SUBSIZEDSpliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.

실행과정


(1) 첫 번째 Spliterator에 trySplit를 호출하면 두 번째 Spliterator가 생성된다.

(2) 두 개의 Spliterator에 다시 trySplit를 호출하면 네 개의 Spliterator가 생성된다.

(3) trySplit의 결과가 null이 될 때까지 반복한다.

(4) 재귀 분할 과정이 종료된다.

Custom Spliterator

class WordCounterSpliterator implements Spliterator<Character> {
    private final String string;
    private int currentChar = 0;
    public WordCounterSpliterator(String string) {
        this.string = string;
    }
    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        // 현재 문자를 소비한다.
        action.accept(string.charAt(currentChar++));
        //소비할 문자가 남아있으면 true를 반환한다.
        return currentChar < string.length();
    }
    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentChar;
        // 파싱할 문자열을 순차 처리할 수 있을 만큼 충분히 작아졌음을 알리는 null을 반환
        if (currentSize < 10) {
            return null;
        }
        for (int splitPos = currentSize / 2 + currentChar;
                 // 파싱할 문자열의 중간을 분할 위치로 설정
                 splitPos < string.length(); splitPos++) {
            // 다음 공백이 나올 때까지 분할 위치를 뒤로 이동
            if (Character.isWhitespace(string.charAt(splitPos))) {
                // 처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator 생성 
                Spliterator<Character> spliterator =
                   new WordCounterSpliterator(string.substring(currentChar,
                                                               splitPos));
                // 이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정
                currentChar = splitPos;
                // 공백을 찾았고 문자열을 분리했으므로 루프를 종료
                return spliterator;
            }
        }
        return null;
    }
    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }
    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NON-NULL + IMMUTABLE;
    }
}

02 마치며

  • 내부 반복 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림 병렬 처리가능
  • 간단하게 스트림 병렬 처리할 수 있지만 병렬 처리가 무조건 빠른 것은 아니고 동작 방법과 성능이 직관적이지 않아서 병렬 처리를 사용했을 때 성능을 직접 측정해봐야 한다.
  • 병렬 스트림으로 데이터 집합을 병렬 실행할 때 특히 처리해야할 데이터가 아주 많거나 요소를 처리하는 데 오랜 시간이 걸릴 때 성능을 높일 수 있다.

0개의 댓글