[모던 인 자바 액션] 병렬 데이터 처리와 성능

이주오·2021년 8월 23일
0

도서

목록 보기
7/15

이번 주제 키워드

  • 병렬 스트림으로 데이터 병렬 처리하기
  • 병렬 스트림의 성능 분석
  • 포크/조인 프레임워크
  • Spliterator로 스트림 데이터 쪼개기

병렬 스트림

  • 스트림을 이용하면 순차 스트림을 병렬 스트림으로 자연스럽게 바꿀 수 있다.
  • 컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다.
  • 병렬 스트림이란, 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
  • 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
  • 1부터 n부터까지의 합을 구하는 코드 : 일반 스트림
public static long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)   //  무한 자연수 스트림 생성
                 .limit(n)                  //  n개 이하로 제한
                 .reduce(0L, Long::sum);    //  모든 숫자를 더하는 스트림 리듀싱 연산
}
  • 1부터 n부터까지의 합을 구하는 코드 : 전통적인 자바 코드
public static long iterativeSum(long n) {
    long result = 0;
    for (long i = 1L; i<= n; i++) {
        result += i;
    }
    return result;
}

n이 엄청나게 커진다면 병렬로 처리하는 것이 좋을 것이다. 어떻게 할까??

1. 순차 스트림을 병렬 스트림으로 변환하기

public static long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
                 .limit(n)
                 .parallel()  //  스트림을 병렬 스트림으로 변환
                 .reduce(0L, Long::sum);
}

  • 순차스트림에 parallel 메서드를 호출하면 리듀싱 연산이 병렬로 처리된다.
    • 하지만 스트림 자체에는 아무 변화도 없다.
    • 내부적으로 불리언 플래그가 설정될 뿐
  • 이전 코드와 다른 점은 스트림이 여러 청크로 분할되어 있다는 것이다.
    • 따라서 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있다.
  • 마지막 리듀싱 연산을 통해 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐 전체 스트림의 리듀싱 결과를 도출한다.
  • parallel과 sequential 메서드를 통해 어떤 연산을 병렬로 실행할지, 순차로 실행할지 제어할 수 있다.
    • parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
    • 아래 코드는 parallel이 마지막 호출되었으므로 위 파이프라인은 병렬로 실행된다.
stream.parallel()
      .filter(...)
      .sequential()
      .map(...)
      .parallel()
      .reduce();

스트림 성능 측정

  • 과연 어느것이 더 빠를까??
    • 순차스트림
    • 병렬스트림
    • 전통적인 for loop
  • JMH 라이브러릴 이용해 성능 측정!
    • https://github.com/openjdk/jmh
    • https://www.baeldung.com/java-microbenchmark-harness
    • JMH는 OpenJDK에서 개발한 성능 측정 툴이다.
    • 특정 메소드의 성능을 측정하는 식으로 사용할 수 있고 실제 테스트하기전 워밍업 과정과 실제 측정 과정을 수행하는데 각 과정의 실행 수를 제어할 수 있고, 측정 후 결과로 나오는 시간의 단위를 지정하는 기능도 제공한다.
  • 보통 for loop - 순차 스트림 - 병렬 스트림 순
    • 왜일까??
    • iterate가 박싱된 객체를 생성하므로 이를 다시 언박싱하는 과정이 필요했다.
    • 반복 작업은 병렬로 실행될 수 있도록 독립적인 청크로 분할하기 어렵다.
  • 두 번째 이유는 굉장히 큰 문제

- iterate 연산은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 청크로 분할이 어렵다.
- 위와 같은 상황에서는 병렬 리듀싱 연산이 수행되지 않는다.
- 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수가 없기 때문이다.
- iterate같은 경우는 스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 thread에서 수행되었음에도 불구하고 순차처리 방식으로 처리되기 때문에 thread를 할당하는 오버헤드만 증가하게 될 뿐이다.
- 따라서 iterate와 같은 병렬과는 거리가 먼 방식을 사용하면 오히려 프로그램의 성능이 더 나빠질 수도 있다.

더 특화된 메서드 사용

  • LongStream.rangeClosed라는 메서드를 활용할 수 있다. 이는 iterate에 비해 아래와 같은 장점이 있다.
    • 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
    • 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어, 1 ~ 20의 숫자 범위를 각각 1 ~ 5, 6 ~ 10, 11 ~ 15, 16 ~ 20 범위의 숫자로 분할할 수 있다.
LongStream.rangeClosed(1, N)
          .parallel()
          .reduce(0L, Long:sum);
  • LongStream.rangeClosed를 활용하면 실질적으로 리듀싱 연산이 병렬로 수행된다.
  • 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다.

병렬 스트림의 올바른 사용법

  • 공유된 상태를 바꾸는 알고리즘을 사용할 때 병렬 스트림을 사용하면 문제가 발생한다.
public long sideEffectParalleSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;
}

public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}
  • 10번 수행한 결과
    • 올바른 결과값조차 나오지 않는다.
    • 레이스 컨디션이 일어나기 때문

병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않을 때는 직접 측정해서 사용하라.
    • 병렬 스트림이 순차 스트림보다 항상 성능이 좋은 것이 아니기 때문에 모를 때는 직접 성능 체크해보는 것이 정확하다.
  • 박싱을 주의해서 사용하라.
    • 오토박싱/언박싱은 성능을 크게 저하시킬 수 있는 요소다. 기본형 특화 스트림(ex. IntStream, LongStream, DoubleStream)을 활용하여 박싱 동작을 피할 수 있다.
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있음을 주의하라.
    • limit이나 findFirst같이 요소의 순서에 의존하는 연산을 병렬 스트림에 활용하게 되면 비싼 비용을 치뤄야 한다.
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
    • 처리해야할 요소 수가 N이고 하나의 요소를 처리하는데 드는 비용이 Q라고 하면
    • 전체 스트림 파이프라인 처리 비용을 N*Q로 예상할 수 있다.
    • Q가 높아진다는 것은 병렬 스트림으로 성능 개선의 가능성이 있다는 것을 의미
  • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
    • 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못한다.
  • 스트림을 구성하는 자료구조가 올바른지 확인하라.
    • 예를 들면, ArrayList가 LinkedList보다 효율적으로 분할할 수 있다.
    • LinkedList는 분할하려면 모든 요소를 탐색해야 하지만 ArrayList는 요소를 탐색하지 않고도 리스트를 분할할 수 있다.
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
    • 예를 들어, SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할되므로 효과적으로 스트림을 병렬처리 할 수 있다
    • 반면, 필터 연산은 스트림의 길이를 예측할 수 없으므로 효과적으로 병렬 처리 할 수 있을지 알 수 없게 된다.
  • 최종 연산의 병합 과정(ex. Collector의 combiner 메서드) 비용을 살펴봐라.
    • 병합 과정의 비용이 비싸다면, 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.

포크/조인 프레임워크

병렬 스트림이 수행되는 내부 인프라구조는 자바7에서 추가된 포크/조인 프레임워크로 병렬 스트림이 처리된다.

  • 포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
  • 포크/조인 프레임워크에서는 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
  • RecursiveAction 또는 RecursiveTask 추상 클래스를 상속받아서 구현
    • RecursiveAction: 반환값이 없을 때
    • RecursiveTask: 반환값이 있을 떄

RecursiveTask 활용

  • 스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다.
  • RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.
  • protected abstract R compute();
  • compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.
  • compute 메서드 의사코드
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
    순차적으로 태스크 계산
} else {
    태스크를 두 서브태스크로 분할
    태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
    모든 서브태스크의 연산이 완료될 때까지 기다림
    각 서브태스크의 결과를 합침
}
  • 재귀적인 태스크 분할 과정
    • divide-and-conquer 알고리즘의 병렬화 버전이다.

fork & join으로 합 구하기

var pool = new ForkJoinPool(); // 쓰레드풀 생성
var task = new SumTask(1, 10); // task 생성
var resut = pool.invoke(task); // invoke() 호출로 작업 시작
System.out.println(resut);
// fork join을 이용하여 간단한 합 구하기
class SumTask extends RecursiveTask<Long> {
    private final long start;
    private final long end;
    private static final long THRESHOLD = 10;

    public SumTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start + 1;
        if (length <= THRESHOLD) {
            return sum();
        }

        long mid = (start + end) / 2;
        SumTask leftSumTask = new SumTask(start, mid);
        SumTask rightSumTask = new SumTask(mid + 1, end);
        leftSumTask.fork(); // 스레드풀의 다른 스레드로 태스크를 비동기로 실행
				Long rightResult = rightSumTask.compute(); // 두번째 서브태스크 동기 실행
				Long leftResult = leftSumTask.join();
        return  rightResult + leftResult;
    }

    private Long sum() {
        long ret = 0L;
        for (long i = start; i < end; i++) {
            ret += i;
        }
        return ret;
    }
}
  • ForkJoinPool은 fork & join 프레임웤에서 제공하는 쓰레드 풀로, 지정된 수의 쓰레드를 생성해서 미리 만들어 놓고 반복해서 재사용할 수 있게 한다.

fork()와 join()

  • compute()는 작업을 나누고, fork()는 작업을 큐에 넣는다. (반복)
  • join()으로 작업의 결과를 합친다. (반복)
  • fork와 join의 차이점

  • 비동기 메서드는 메서드를 호출만 할 뿐, 그 결과를 기다리지 않는다.
    • 내부적으로는 다른 쓰레드에게 작업을 수행하도록 지시만 하고 결과를 기다리지 않고 돌아오는 것

작업 훔치기

  • fork()가 호출되어 작업 큐에 추가된 작업 역시, compute()에 의해 더 이상 나눌 수 없을 때까지 반복해서 나뉘고, 자신의 작업 큐가 비어있는 쓰레드는 다른 쓰레드의 작업 큐에서 작업을 가져와서 수행한다.
  • 이것을 작업 훔쳐오기라고 하며, 이 과정은 모두 쓰레드풀에 의해 자동적으로 이루어진다.

포크/조인 프레임워크를 제대로 사용하는 방법

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때 까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리게 되면서 순차 알고리즘보다 느리고 복잡한 프로그램이 될 수 있다.
  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드 대신 compute나 fork 메서드를 호출한다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
  • 두 서브태스크에서 메서드를 호출할 때는 fork와 compute를 각각 호출하는 것이 효율적이다. 그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
    • 즉 compute()는 새 스레드를 사용하지 않기 때문
  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
  • 멀티코어에서 포크/조인 프레임워크를 사용하는 것이 순차처리보다 무조건 빠른 것은 아니다. 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.

그렇다면 스트림은 어떻게 분할 로직을 개발하지 않고도 자동으로 스트림을 분할할까?? 바로 Spliterator 기법을 이용


Spliterator

  • Spliterator는 분할할 수 있는 반복자라는 의미다.
  • Iterator 처럼 소스의 요소 탐색 기능을 제공하지만 병렬 작업에 특화돼있다.
  • 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.
  • 컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action); //  Spliterator 의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 true를 반환(iterator 동작과 같다)
    Spliterator<T> trySplit();  //  Spliterator 의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드
    long estimateSize();    //  탐색해야 할 요소 수 정보 제공 메서드
    int characteristics();
}

분할 과정

  • 스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.

  • 1단계: 첫 번째 Spliterator에 trySplit을 호출하면 두 번째 Spliterator가 생성된다.
  • 2단계: 두개의 Spliterator에 trySplit을 다시 호출하면 4개의 Spliterator가 생성된다.
    • 이처럼 trySplit의 결과가 null이 될 때 까지 이 과정을 반복한다.
  • 3단계: trySplit이 null을 반환했다는 것은 더 이상 자료구조를 분할할 수 없음을 의미
  • 4단계: Spliterator에 호출한 모든 trySplit의 결과가 null이면 재귀 분할 과정이 종료된다.

Spliterator의 특성

  • Spliterator의 characteristics 추상 메서드는 Spliterator 자체의 특성 집합을 int 타입으로 반환한다.
  • Spliterator 특성


정리

  • 내부 반복을 이용하여 다른 스레드를 이용하지 않고도 스트림을 병렬로 처리할 수 있다.
  • 병렬 처리 성능이 무조건 빠른 것이 아니기 때문에 성능 측정을 해보는 것이 좋다.
  • 병렬 스트림은 처리해야할 데이터가 아주 많거나 각 요소를 처리하는데 오랜 시간이 걸릴 때 성능을 높일 수 있다.
  • 기본형 특화 스트림을 이용하는 것이 병렬 처리보다 더욱 성능을 높일 수 있는 방법이다.
  • 스트림의 병렬처리는 포크/조인 프레임워크 이용하여 병렬화할 수 있는 태스크를 작은 태스크로 분할한 후, 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.
  • Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할 것인지를 정의한다.

참고 출처

profile
동료들이 같이 일하고 싶어하는 백엔드 개발자가 되고자 합니다!

0개의 댓글