원래는 태스크에서 드라이버 노드의 변수를 사용할 때 클로저 함수 내부에서 단순하게 참조하는 방법을 사용한다.
브로드캐스트 변수는 변하지 않는 값을 클러스터에서 효율적으로 공유하는 방법 제공
my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words= spark.sparkContext.parallelize(my_collection,2)
#아래 구조체를 스파크에 브로드캐스트할 수 있음
supplementalData = {"Spark": 1000, "Definitive":200, "Big": -300, "simple":100}
suppBroadcast = spark.sparkContext.broadcast(supplementalData)
#value 메서드로 브로드캐스트된 값 참조
suppBroadcast.value
Out[4]: {'Spark': 1000,
'Definitive': 200,
'Big': -300,
'simple': 100}
#브로드캐스트된 데이터를 사용해 RDD를 변환할 수 있음
words.map(lambda word: (word, suppBroadcast.value.get(word,0)))\
.sortBy(lambda wordPair: wordPair[1]).collect()
Out[6]: [('Big', -300),
('The', 0),
('Guide', 0),
(':', 0),
('Data', 0),
('Processing', 0),
('Made', 0),
('Simple', 0),
('Definitive', 200),
('Spark', 1000)]
트랜스포메이션 내부의 다양한 값을 갱신하는 데 사용
어큐뮬레이터는 스파크 클러스터에서 로우 단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수를 제공함
병렬 처리 과정에서 효율적으로 사용할 수 있음
어큐뮬레이터 값은 액션을 처리하는 과정에서만 갱신
각 태스크에서 어큐뮬레이터를 한 번만 갱신하도록 제어
이름이 지정된 어큐뮬레이터만 결과가 스파크 UI에 표시됨
path='/FileStore/tables/bin/2010-summary.parquet'
flights = spark.read.parquet(path)
#출발지나 도착지가 중국인 항공편의 수를 구하는 어큐뮬레이터 생성
accChina = spark.sparkContext.accumulator(0)
#어큐뮬레이터 값 조회
accChina.value
Out[12]: 0
def accChinaFunc(flight_row):
destination = flight_row['DEST_COUNTRY_NAME']
origin = flight_row['ORIGIN_COUNTRY_NAME']
if destination == 'China' or origin == 'China':
accChina.add(flight_row['count'])
#foreach 메서드(액션)를 사용해 매 로우마다 위 함수 적용하기
flights.foreach(lambda flight_row: accChinaFunc(flight_row))
accChina.value
Out[15]: 953
from pyspark.accumulators import AccumulatorParam
class EvenAccumulator(AccumulatorParam):
def zero(self, value):
return value
def addInPlace(self,value1, value2):
return value1 + value2
evenAcc = spark.sparkContext.accumulator(0,EvenAccumulator())
evenAcc.value
Out[108]: 0
flights.take(5)
Out[60]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=69), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=24), Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1)]
def add(x):
if x['count']%2==0:
evenAcc.add(x['count'])
flights.foreach(lambda row: add(row))
evenAcc.value
Out[110]: 31390
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
accumulator.value
Out[89]: ''
def add_s(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add_s)
accumulator.value
Out[92]: 'bca'