[모던 자바 인 액션] - 병렬 데이터 처리와 성능

김성혁·2022년 5월 9일
0

모던 자바 인 액션

목록 보기
7/7

자바 7이 등장하기 전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠다.

데이터를 서브파트로 분할하고, 분할된 서브파트를 각각의 스레드로 할당하고, 레이스 컨디션이 발생하지 않도록 동기화 문제에도 신경써야 했고, 마지막으로 부분 결과를 합치는 일련의 과정들을 거쳐야 했다.

자바 7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인 프레임워크 기능을 제공

📝 스트림으로 데이터 컬렉션 관련 동작을 얼마나 쉽게 병렬로 실행할 수 있는지

포크/조인 프레임워크와 내부적인 병렬 스트림 처리는 어떤 관계가 있는지

병렬 스트림이 요소를 여러 청크로 분할하는 방법


👨🏻‍💻 병렬 스트림

각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환

// 반복형
public static long iterativeSum(long n) {
	long result = 0;
	for(long i = 1L; i <= n; i++) {
		result += i;
	}
	return result;
}

// 순차 스트림
public static long sequentialSum(long n) {
	return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림 생성
               .limit(n) // n 개 이하로 제한
               .reduce(0L, Long::sum); // 모든 숫자를 더하는 스트림 리듀싱 연산
}

// 병렬 스트림
public static long parallelSum(long n) {
	return Stream.iterate(1L, i -> i + 1)
               .limit(n)
               .parallel()
               .reduce(0L, Long::sum);
}

리듀싱 연산을 여러 청크에 병렬로 수행

리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출

  • 내부적으로 parallel을 호출하면 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정
  • parallel, sequential 이 두 메서드를 이용해 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어 가능
  • 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.

병렬 스트림에서 사용하는 스레드 풀 설정

  • 병렬 스트림은 내부적으로 ForkJoinPool을 사용
    • 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRunTime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.

스트림 성능 측정

  • 자바 마이크로벤치마크 하니스(JMH)
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime) // 벤치마크 대상 메서드를 실행하는 데 걸린 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) // 벤치마크 결과를 밀리초 단위로 출력
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
public class Sample {
    private static final long N = 10_000_000L;

    @Benchmark
    public long iterativeSum() {
        long result = 0;
        for (long i = 1L; i <= N; i++) {
            result += i;
        }
        return result;
    }

    @Benchmark
    public long sequentialSum() {
        return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림 생성
                     .limit(N) // n 개 이하로 제한
                     .reduce(0L, Long::sum); // 모든 숫자를 더하는 스트림 리듀싱 연산
    }

    @Benchmark
    public long parallelSum() {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(N)
                     .parallel()
                     .reduce(0L, Long::sum);
    }

    @TearDown(Level.Invocation) // 매 번 벤치마크를 실행한 다음에도 가비지 컬렉터 동작 시도
    public void tearDown() {
        System.gc();
    }
}
Benchmark             Mode  Cnt    Score   Error  Units
Sample.iterativeSum   avgt         3.182          ms/op
Sample.parallelSum    avgt       101.162          ms/op
Sample.sequentialSum  avgt        74.647          ms/op
  • iterate 연산은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 청크로 분할하기가 어렵다. (순차적)
    • 더 본질적으로 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았기 때문에
    • 병렬을 처리하도록 지시했지만 결국 순차처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가

더 특화된 메서드 사용

  • LongStream.rangeClose
    • 기본형 long 사용 → 박싱과 언박싱 오버헤드 제거
    • 숫자 범위를 생산
    @Benchmark
    public long rangedSum() {
        return LongStream.rangeClosed(1, N)
                         .reduce(0L, Long::sum);
    }

    @Benchmark
    public long parallelRangedSum() {
        return LongStream.rangeClosed(1, N)
                         .parallel()
                         .reduce(0L, Long::sum);
    }
Benchmark                 Mode  Cnt    Score   Error  Units
Sample.iterativeSum       avgt         3.578          ms/op
Sample.parallelRangedSum  avgt         0.474          ms/op
Sample.parallelSum        avgt       110.169          ms/op
Sample.rangedSum          avgt         3.972          ms/op
Sample.sequentialSum      avgt        87.143          ms/op

올바른 자료구조를 선택해야 최적의 성능을 발휘할 수 있다. 그리고 멀티코어 간의 데이터 이동의 비용은 비싸기 때문에 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직

병렬 스트림의 올바른 사용법

  • 병렬 스트림의 올바른 동작을 위해서는 공유된 가변 상태를 피해야 한다.

병렬 스트림 효과적으로 사용하기

  • 적절한 벤치마크로 직접 성능을 측정하라
  • 자동 박싱과 언박싱은 성능을 크게 저하시키는 요소이기 때문에 박싱을 주의하라
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 존재
    • 요소에 순서에 의존하는 findFirst 보다는 요소의 순서와 상관없이 연산하는 findAny를
    • 스트림 N개 요소가 있을 때 요소의 순서가 상관없다면 비정렬된 스트림에 limit를 호출하는 것이 더 효율적
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라. 하나의 요소를 처리하는데 드는 비용이 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음
  • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
  • 스트림을 구성하는 자료구조가 적절한지를 확인하라
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
  • 최종 연산의 병합 과정 비용을 살펴봐라
  • 마지막으로 병렬 스트림이 수행되는 내부 인프라구조도 확인하라.

👨🏻‍💻 포크/조인 프레임워크

포크/조인 프레임워크란?

  • 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만듬
  • 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현

RecursiveTask 활용

  • 스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다. R은 병렬화된 태스크가 생성하는 결과 형식을 의미하며, 결과가 없는 Void 형태일 경우 RecursiveAction을 이용

  • RecurisiveTask를 정의하려면 추상 메서드 compute를 구현해야 함

    // 주요 산술 연산이 해당 태스크에 의해 수행됨.
    protected abstract void compute();
    • compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생성할 알고리즘을 정의
      if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
      	순차적으로 태스크 계산
      } else {
      	태스크를 두 서브태스크로 분할
      	태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
      	모든 서브태스크의 연산이 완료될 때까지 기다림
      	각 서브태스크의 결과를 합침
      }
  • 포크 조인 과정

    • 각각의 서브태스크의 크기가 충분히 작아질 때까지 태스크를 재귀적으로 포크함
    • 모든 서브태스크를 병렬로 수행
    • 부분 결과를 조합
  • 범위의 숫자를 더하는 문제

    public class ForkJoinSumCalculator extends RecursiveTask<Long> {
        private final long[] numbers;
        private final int start;
        private final int end;
        private static final long THRESHOLD = 10_000; 
    
        public ForkJoinSumCalculator(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
    
        private ForkJoinSumCalculator(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            int length = end - start;
            if(length <= THRESHOLD) {
                return computeSequentially();
            }
            ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
            leftTask.fork();
            ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
            Long rightResult = rightTask.compute();
            Long leftResult = leftTask.join();
            return leftResult + rightResult;
        }
    
        private long computeSequentially() {
            long sum = 0;
            for(int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }
    public static long forkJoinSum(long n) {
    	long[] numbers = LongStream.rangeClosed(1, n).toArray();
    	ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    	return new ForkJoinPool().invoke(task);
    }

ForkJoinSumCalculator 실행

  • ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행

포크/조인 프레임워크를 제대로 사용하는 방법

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다. 대신 compute나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용
  • 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다. 왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork를 호출하는 것보다 compute를 호출하는 것이 효율적이다. 그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다. 보통 IDE로 디버깅할 때 스택 트레이스로 문제가 일어난 과정을 쉽게 확인할 수 있는데, 포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
  • 병렬 스트림에서 살펴본 것처럼 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 거라는 생각은 버려야 한다. 병렬 처리로 성능을 개선하려면 테스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다. 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 한다. 예를 들어 I/O를 한 서비태스크에 할당하고 다른 서브태스크에서는 계산을 실행, 즉 I/O와 계산을 병렬로 실행할 수 있다. 또한 순차 버전과 병렬 버전의 성능을 비교할 때는 다른 요소도 고려해야 한다. 다른 자바 코드와 마찬가지로 JIT 컴파일러에 의해 최적화되려면 몇 차례의 ‘준비 과정' 또는 실행 과정을 거쳐야 한다. 따라서 성능을 측정할 때는 지금까지 살펴본 하니스에서 그랬던 것처럼 여러 번 프로그램을 실행한 결과를 측정해야 한다. 또한 컴파일러 최적화는 병렬 버전보다는 순차 버전에 집중될 수 있다는 사실도 기억하자

작업 훔치기(작업을 효율적으로 분할 하는 방법)

  • 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복

👨🏻‍💻 Spliterator 인터페이스 (분할할 수 있는 반복자)

  • 자동으로 스트림을 분할하는 기법
  • 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공(spliterator() 메서드)
public interface Spliterator<T> {
	boolean tryAdvance(Consumer<? super T> action);
	Spliterator<T> trySplit();
	long estimateSize();
	int characteristics();
  • T는 Spliterator에서 탐색하는 요소의 형식
  • tryAdvance : Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환
  • trySplit : Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드
  • estimateSize : 탐색해야 할 요소 수 정보를 제공
  • characteristics : Spliterator와 그 요소의 특성을 반환

분할 과정

  • trySplit을 호출하면 두 번째 Spliterator 생성
  • 두 개의 Spliterator에 trySplit을 호출하면 네 개의 Spliterator 생성
  • trySplit의 결과가 null이 될 때까지 이 과정을 반복(더 이상 자료구조를 분할할 수 없음을 의미)

Spliterator의 특성

ORDERED리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다.
DISTINCTx, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환
SORTED탐색된 요소는 미리 정의된 정렬 순서를 따른다.
SIZED크기가 알려진 소스(예를 들면 Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환
NON-NULL탐색하는 모든 요소는 null이 아니다
IMMUTABLE이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 고칠 수 없다.
CONCURRENT동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
SUBSIZED이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.

커스텀 Spliterator 구현하기

문자열의 단어 수를 계산하는 단순한 메서드 구현

  • 반복형으로 단어 수를 세는 메서드
    public int countWordsIteratively(String s) {
    	int counter = 0;
    	boolean lastSpace = true;
    	for (char c : s.toCharArray()) {
    		if (Character.isWhitespace(c)) {
    			lastSpace = true;
    		} else {
    			if (lastSpace) counter++;
    			lastSpace = false;
    		}
    	}
    	return counter;
    }
  • 함수형으로 직접 스레드를 동기화하지 않고 병렬 스트림으로 작업을 병렬화
    • 스트림에 리듀싱 연산을 통해 단어의 수를 계산할건데, 지금까지 발견한 단어의 수를 계산한느 int 변수와 마지막 문자가 공백이었는지 여부를 기억하는 Boolean 변수가 필요

      public class WordCounter {
          private final int counter;
          private final boolean lastSpace;
      
          public WordCounter(int counter, boolean lastSpace) {
              this.counter = counter;
              this.lastSpace = lastSpace;
          }
          
          public WordCounter accumulate(Character c) {
              if (Character.isWhitespace(c)) {
                  return lastSpace ? this : new WordCounter(counter, true);
              } else {
                  return lastSpace ? new WordCounter(counter + 1, false) : this;
              }
          }
          
          public WordCounter combine(WordCounter wordCounter) {
              return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace));
          }
          
          public int getCounter() {
              return counter;
          }
      }
    • accumulate : Word의 상태를 어떻게 바꿀 것인지를 정의, 스트림을 탐색하면서 새로운 문자를 찾을 때마다 호출

    • 새로운 비공백 문자를 탐색한 다음에 마지막 문자가 공백이면 counter 증가

    • combine : 문자열 서브 스트림을 처리한 WordCounter의 결과를 합친다.

      private int countWords(Stream<Character> stream) {
      	WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
      																							WordCounter::accumulate,
      																							WordCounter::combine);
      	return wordCounter.getCounter();
      }
  • WordCounter 병렬로 수행하기(Spliterator 이용)
    public class WordCounterSpliterator implements Spliterator<Character> {
        private final String string;
        private int currentChar = 0;
    
        public WordCounterSpliterator(String string) {
            this.string = string;
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super Character> action) {
            action.accept(string.charAt(currentChar++));
            return currentChar < string.length();
        }
    
        @Override
        public Spliterator<Character> trySplit() {
            int currentSize = string.length() - currentChar;
            if(currentSize < 10) {
                return null;
            }
            for(int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
                if(Character.isWhitespace(string.charAt(splitPos))) {
                    Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
                    currentChar = splitPos;
                    return spliterator;
                }
            }
            return null;
        }
    
        @Override
        public long estimateSize() {
            return string.length() - currentChar;
        }
    
        @Override
        public int characteristics() {
            return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
        }
    }

0개의 댓글