Spark 완벽 가이드 ch14. 분산형 공유 변수

Q·2023년 1월 25일
0

Spark 완벽 가이드

목록 보기
15/24
  • 스파크의 저수준 API에는 RDD 인터페이스 외에도 두 번재 유형인 분산형 공유 변수가 있다.
  • 분산형 공유 변수 종류
    • 브로드캐스트 변수
    • 어큐뮬레이터
  • 어큐뮬레이터를 사용하면 모든 태스크의 데이터를 공유 결과에 추가할 수 있음
  • 브로드캐스트 변수를 사용하면 모든 워커 노드에 큰 값을 저장하므로 재전송 없이 많은 액션에서 재사용 가능

브로드캐스트 변수

  • 원래는 태스크에서 드라이버 노드의 변수를 사용할 때 클로저 함수 내부에서 단순하게 참조하는 방법을 사용한다.

    • 하지만 룩업 테이블이나 머신러닝 모델 같은 큰 변수를 사용하는 경우 비효율적
    • 왜냐면 해당 변수를 사용할 때 워커 노드에서 태스크당 한 번씩 역직렬화가 일어나기 때문
    • 그리고 여러 스파크 액션과 잡에서 동일한 변수를 사용하면 잡을 실행할 때마다 워커로 큰 변수를 재전송해야함
  • 브로드캐스트 변수는 변하지 않는 값을 클러스터에서 효율적으로 공유하는 방법 제공

    • 모든 태스크마다 직렬화하지 않고 클러스터의 모든 머신에 캐시하는 불변성 공유 변수임
    • 액션을 실행할 때 클러스터의 모든 노드에 지연 처리 방식으로 복제됨

참고

my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words= spark.sparkContext.parallelize(my_collection,2)
#아래 구조체를 스파크에 브로드캐스트할 수 있음
supplementalData = {"Spark": 1000, "Definitive":200, "Big": -300, "simple":100}
suppBroadcast = spark.sparkContext.broadcast(supplementalData)
#value 메서드로 브로드캐스트된 값 참조
suppBroadcast.value

Out[4]: {'Spark': 1000, 
'Definitive': 200, 
'Big': -300, 
'simple': 100}
#브로드캐스트된 데이터를 사용해 RDD를 변환할 수 있음
words.map(lambda word: (word, suppBroadcast.value.get(word,0)))\
.sortBy(lambda wordPair: wordPair[1]).collect()
Out[6]: [('Big', -300), 
('The', 0), 
('Guide', 0), 
(':', 0), 
('Data', 0), 
('Processing', 0), 
('Made', 0), 
('Simple', 0), 
('Definitive', 200), 
('Spark', 1000)]
  • 브로드캐스트 변수에 큰 크기의 데이터를 사용하는 경우라면 전체 태스크에서 데이터를 직렬화하는 데 발생하는 부하가 매우 커질 수 있음
  • RDD말고 UDF나 Dataset에서도 사용할 수 있고, 동일한 효과를 얻을 수 있음

어큐뮬레이터

  • 트랜스포메이션 내부의 다양한 값을 갱신하는 데 사용

  • 어큐뮬레이터는 스파크 클러스터에서 로우 단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수를 제공함

  • 병렬 처리 과정에서 효율적으로 사용할 수 있음

    • 카운터나 합계를 구하는 용도로 사용 가능
  • 어큐뮬레이터 값은 액션을 처리하는 과정에서만 갱신

  • 각 태스크에서 어큐뮬레이터를 한 번만 갱신하도록 제어

    • 재시작한 태스크는 갱신 못함
  • 이름이 지정된 어큐뮬레이터만 결과가 스파크 UI에 표시됨

참고

path='/FileStore/tables/bin/2010-summary.parquet'
flights = spark.read.parquet(path)
#출발지나 도착지가 중국인 항공편의 수를 구하는 어큐뮬레이터 생성
accChina = spark.sparkContext.accumulator(0)
#어큐뮬레이터 값 조회
accChina.value
Out[12]: 0
def accChinaFunc(flight_row):
  destination = flight_row['DEST_COUNTRY_NAME']
  origin = flight_row['ORIGIN_COUNTRY_NAME']
  
  if destination == 'China' or origin == 'China':
    accChina.add(flight_row['count'])
#foreach 메서드(액션)를 사용해 매 로우마다 위 함수 적용하기
flights.foreach(lambda flight_row: accChinaFunc(flight_row))
accChina.value
Out[15]: 953

사용자 정의 어큐뮬레이터

  • 직접 어큐뮬레이터를 정의하고자 한다면, AccumulatorV2 클래스(스칼라) 또는 AccumulatorParam(파이썬)을 상속받아야함
from pyspark.accumulators import AccumulatorParam
class EvenAccumulator(AccumulatorParam):    
  def zero(self, value):
    return value
  
  def addInPlace(self,value1, value2):
    return value1 + value2
evenAcc = spark.sparkContext.accumulator(0,EvenAccumulator())
evenAcc.value
Out[108]: 0
flights.take(5)
Out[60]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=69), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=24), Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1)]
def add(x):
    if x['count']%2==0:
      evenAcc.add(x['count'])

flights.foreach(lambda row: add(row))
evenAcc.value
Out[110]: 31390
class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())
accumulator.value
Out[89]: ''
def add_s(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add_s)
accumulator.value
Out[92]: 'bca'
profile
Data Engineer

0개의 댓글