안녕하세요.
데이터 엔지니어링 & 운영 업무를 하는 중 알게 된 지식이나 의문점들을 시리즈 형식으로 계속해서 작성해나가며
새로 알게 된 점이나 잘 못 알고 있었던 점을 더욱 기억에 남기기 위해 글을 꾸준히 작성 할려고 합니다.
Spark의 경우 Spark 완벽 가이드 책을 많이 참고하여 운영을 하고 있습니다.
반드시 글을 읽어 주실 때 잘 못 말하고 있는 부분은 정정 요청 드립니다.
저의 지식에 큰 도움이 됩니다. :)
Spark 은 분산 병렬처리를 지원하는 빅데이터 수집 & 분석 & 처리 프레임워크 입니다.
여기서 데이터를 분산 병렬처리를 하기 위해 택하는 방식은 많은 데이터를 어떤 기준(Column 등)으로 분할 하여 여러 머신에서 동시적으로 처리함으로 속도를 내는 방식입니다.
데이터를 분할 병렬 처리할 때 상당히 까다로운 점은 보통 통계나 처리할 때 각 분할 데이터 마다가 아니라 모든 데이터에서의 종합 적인 결과를 보고 싶어합니다.
그리고 어떤 조건(where 에 넣을 수 없는 변수나 어떤 메소드) 을 먼저 가지고 있어야 데이터를 처리할 수 있는 경우가 발생하기도 합니다.
이럴 때 우리는 Spark 의 분산형 공유 변수를 활용하여 데이터를 병렬 처리 후 원하는 값을 받아낼 수 있습니다.
아래는 Spark 3.2.3 doc 에서 설명하고 있는 부분을 가져왔습니다.
" Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators. "
Boradcast 변수는 모든 워커 노드에 큰 값을 저장하는 변수로 액션 마다 재전송 없이 사용할 수 있는 변수 입니다.
Driver 에서 Closure 를 별도로 정의 하지 않고 Broadcast 변수에 정의를 하는 것만으로 워커 노드에서 사용할 수 있도록 간단한 기능을 제공하고 있습니다.
Closure 를 사용하면 워커 노드는 테스크당 한 번씩 역직렬화가 일어나기 때문에 비효율 적이라고 합니다.
아래는 Broadcast 변수에 대한 Spark 3.2.3 doc 의 설명 입니다.
" Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. "
" Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important. "
Broadcast 변수를 선언하는 방법은 아래와 같습니다.
//scala
import scala.collection.mutable
val bvHashMap: mutable.Map[String, Int] = new mutable.HashMap[String, Int]()
bvHashMap.put("test_1", 1)
bvHashMap.put("test_2", 1)
val boradCastVal = spark.sparkContext.broadcast(bvHashMap)
boradCastVal.value
# python
bv_dict = dict()
bv_dict["test_1"] = 1
bv_dict["test_2"] = 2
broad_cast_val = spark.sparkContext.broadcast(bv_dict)
broad_cast_val.value
driver 에서 선언한 상수로 사용될 변수들이 executor 들 마다 생성될 때 선언돼 사용됩니다.
이를 활용하여 Partition 별로 같은 작업(disk io) 을 동시에 처리해서 처리 속도를 증진 시킬 경우에는 처리하는 메소드를 Broadcast 하여 보내는 방식 또한 생각해볼만 합니다.
Accumulator 는 각 Executor (Partition)별로 처리한 내역을 Accumulator 에 갱신하여 처리한 결과를 종합시킬 수 있는 분산 공유형 변수 입니다.
이 때 Accumulator 는 Row 단위로 값을 갱신할 수 있고 연산은 결합성과 가환성의 성격을 지닌 덧셈과 곱셈 같은 것들이 어울립니다.
Accumulator 는 사용자 정의를 통해 새로운 Accumulator 를 만들 수 있습니다.
Accumulator 는 액션을 처리하는 과정에서만 갱신되며, 각 테스크에서 한 번만 값을 갱신하도록 제어합니다. 그러나 transformation 에서 테스크나 잡 스테이지를 재처리하는 경우 각 테스크의 갱신 작업이 두 번 이상 적용될 수 있다고 합니다.
Accumulator 의 이름은 선택적으로 지정할 수 있습니다. 이름이 지정된 어큐뮬레이터의 실행 결과는 스파크 UI 에 표시되며, 이름이 지정되지 않은 어큐뮬레이터의 경우 스파크 UI에 표시되지 않습니다.
아래는 Accumulator 에 대한 Spark 3.2.3 doc 의 설명 입니다.
" Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. "
" As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance counter) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table. "
Accumulator 를 선언하는 방법은 아래와 같습니다.
//scala
//unnamed
import org.apache.spark.util.LongAccumulator
val accumulatorTest = new LongAccumulator
val acc = spark.sparkContext.register(accumulatorTest)
//named1
val accumulatorTest2 = new LongAccumulator
spark.sparkContext.register(accumulatorTest2, "TestAcc")
//named2
val accumulatorTest3 = spark.sparkContext.longAccumulator("TestAcc2")
# Python
accumulator_test = spark.sparkContext.accumulator(0)
스파크에서 제공하는 클래스를 상속하여 사용자 정의 Accumulator 를 구현해낼 수 있습니다.
Scala 의 경우 AccumulatorV2
, Python 의 경우 AccumulatorParam
을 상속받아 사용자 정의 Accumulator 를 만들어 낼 수 있습니다.
AccumulatorV2 는 추상 클래스로 add
, copy
, isZero
, merge
, reset
, value
메소드를 재정의 해줘야 합니다.
AccumulatorV2 의 경우 해당 클래스로 들어올 데이터 타입 In
과 나갈 데이터 타입 Out
을 지정하여 AccumulatorV2[In, Out]
으로 지정하면 됩니다.
//scala
import org.apache.spark.util.AccumulatorV2
class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] {
private var num:BigInt = 0
override def reset(): Unit = {
this.num = 0
}
override def add(intValue: BigInt): Unit = {
if(intValue % 2 == 0) {
this.num += intValue
}
}
override def merge(other: AccumulatorV2[BigInt, BigInt]): Unit = {
this.num += other.value
}
override def value():BigInt = {
this.num
}
override def copy(): AccumulatorV2[BigInt, BigInt] = {
new EvenAccumulator
}
def isZero():Boolean = {
this.num == 0
}
}
Python 의 경우는 AccumulatorParam
을 상속받아 사용자 정의 Accumulator 를 생성해주면 됩니다.
AccumulatorParam 은 zero
와 addInPlace
만 내부에 구현을 해주면 됩니다.
addInPlace 의 경우는 value1 매개변수는 Accumulator 에서 현재 지니고 있는 값이고, value2 의 경우 사용자 정의 Accumulator 로 갱신을 요청하는 값입니다.
# Python
from pyspark.accumulators import AccumulatorParam
class EvenAccumulator(AccumulatorParam):
def zero(self, value):
return value
def addInPlace(self, value1, value2):
if (value2 % 2 == 0) {
return value1 + value2
}
foreachParition 을 활용하여 익스큐터 별로 Partition 을 처리하도록 구성하기 위해서는 Accumulator 와 Broadcast 가 필수라는 생각이 듭니다.
현재 collect 해서 데이터를 수집 & 처리 & 저장하고 있는 모듈을 repartition 후 foreachParition 을 통해 데이터를 처리하는 테스트를 해볼 계획입니다. 이 때 초 당 데이터의 메타데이터를 DB에서 조회하지 않도록 DB 조회 조건 카운터를 Accumulator 로 구현해보려고 합니다.