실시간 빅데이터 처리를 위한 Spark & Flink Online 6) Shuffling & Partitioning

Bradley·2022년 4월 12일
0

Shuffling

  • 그룹핑시 데이터를 한 노드에서 다른 노드로 옮길때
  • 성능을 (많이) 저하시킨다

Shuffle을 일으킬 수 있는 작업들

  • Join, leftOuterJoin, rightOuterJoin
  • GroupByKey
  • ReduceByKey
  • ComebineByKey
  • Distinct
  • Intersection
  • Repartition
  • Coalesce

    데이터를 합치거나 하나로 줄이는 작업들

Shuffle은 언제 일어날까?

Shuffle은 이럴때 발생한다

  • 결과로 나오는 RDD가 원본 RDD의 다른 요소를 참조하거나
  • 다른 RDD를 참조할때

Partitioner를 이용한 성능 최적화

파티셔닝을 이용해서 셔플링을 최소화 할 수 있음
안좋은 예) groupByKeys + Reduce
파티션에서 Key별로 데이터를 수집 후 Reduce를 진행
좋은 예) ReduceByKey
각각의 파티션에서 Reduce를 거친 후 groupBy를 하기 때문에 파티션에 담기는 데이터의 수가 훨씬 적음

코드 예제

GroupByKey vs ReduceByKey

# reduceByKey
(textRDD
.flatMap(lambda line: line.split())
,map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b))

# groupByKey
(textRDD
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.groupByKey()
.map(lambda (w, counts): (w, sum(counts))))

노드를 통해 전달되는 데이터의 양이 많아 out of disk 문제 발생 가능

Shuffle을 최소화 하려면

  • 미리 파티션을 만들어 두고 캐싱 후 reduceByKey 실행
  • 미리 파티션을 만들어 두고 캐싱 후 join 실행
  • 둘 다 파티션과 캐싱을 조합해서 최대한 로컬 환경에서 연산이 실행되도록 하는 방식

셔플을 최소화해서 10배의 성능 향상이 가능하다!

Partition은 어떻게 결정될까?

데이터가 어느 노드/파티션에 들어가는지는 어떻게 결정될까?
Partition의 목적: 데이터를 최대한 균일하게 퍼트리고 쿼리가 같이 되는 데이터를 최대한 옆에 두어 검색 성능을 향상시키는 것
Partition의 특징

  • RDD는 쪼개져서 여러 파티션에 저장됨
  • 하나의 파티션은 하나의 노드 (서버)에
  • 하나의 노드는 여러개의 파티션을 가질 수 있음
  • 파티션의 크기와 배치는 자유롭게 설정가능하며 성능에 큰 영향을 미침
  • Key-Value RDD를 사용할때만 의미가 있다

스파크의 파티셔닝 == 일반 프로그래밍에서 자료구조를 선택하는 것

Partition의 종류

Hash Partitioning

  • 데이터를 여러 파티션에 균일하게 분배하는 방식

Hash Partitioning의 잘못된 사용

  • [극단적인 예] 2개의 파티션이 있는 상황에서:
    • 짝수의 Key만 있는 데이터셋에 Hash 함수가 (x%2) 라면?
    • 한쪽 파티션만 사용하게 될 것

Partition 1:[2,4,6,8,10, ...]
Partition 2:[]

이러한 경우를 Data skew 문제라고 칭함
데이터가 한쪽으로 쏠리는 문제라고하며, 분산 환경의 Software라면 겪을 수 밖에 없는 문제라고 한다
기본적으로는 node간 데이터 처리량이 달라 다수의 node들이 소수의 node의 작업이 끝나기를 기다리는 문제라고 한다

Range Partitioning

  • 순서가 있는, 정렬된 파티셔닝
    • 키의 순서에 따라
    • 키의 집함의 순서에 따라

서비스의 쿼리 패턴이 날짜 위주면 일별 Range Partition 고려

Memory & Disk Partition

디스크에서 파티션하기

사용자가 지정한 파티션을 가지는 RDD를 생성하는 함수

  • partitionBy()
  • 파티션을 만든 후엔 persist() 하지 않으면:
    • 다음 연산에 불릴때마다 반복하게된다!(셔플링이 반복적으로 일어난다)
>>> pairs = sc.parallelize([1,2,3,4,2,4,1]).map(lambda x: (x, x))
>>> pairs.collect()
[(1,1),(2,2),(3,3),(4,4),(2,2),(4,4),(1,1)]
>>> pairs.partitionBy(2).glom().collect()
[[(2,2),(4,4),(2,2),(4,4)], [(1,1),(3,3),(1,1)]]

glom() : 파티션까지 다 보여주는 Transformation (파티션 형상 확인)

>>> pairs.partitionBy(2, lambda x: x%2).glom().collect()
[[(2,2),(4,4),(2,2),(4,4)], [(1,1),(3,3),(1,1)]]

메모리에서 파티션하기

  • repartition()
  • coalesce()
    Repartition과 Coalesce 둘 다 파티션의 갯수를 조절하는데 사용
  • 둘다 shuffling을 동반하여 매우 비싼 작업
  • Repartition: 파티션의 크기를 줄이거나 늘리는데 사용됨
  • Coalesce: 파티션의 크기를 줄이는데 사용됨

연산중 파티션을 만드는 작업들

아래 함수들은 연산중에 새로운 파티션을 만들 수 있다

  • Join (leftOuterJoin, rightOuterJoin)
  • groupByKey
  • reduceByKey
  • foldByKey
  • partitionBy
  • Sort
  • mapValues (parent)
  • flatMapValues (parent)
  • filter (parent)
  • 등등

map vs mapValues

  • map vs mapValues
  • flatMap vs faltMapValues
  • map과 flatMap은 Key의 변형이 가능하기 때문

Summary

  • Shuffling
  • Partitioning
profile
2022년부턴 후회없이

0개의 댓글

관련 채용 정보