코드 수준에서의 Spark RDD의 실제 연산 흐름 분석 (2)

Minseop Jeong·2022년 8월 14일
1
post-custom-banner

이전 편에서 RDD의 실제 연산과 클러스터 리소스 매니징에 관한 내용을 Spark cluster의 master node에 존재하는 driver 수준에서 파악하였다. 이어서 worker node의 executor에서는 어떻게 task를 처리하는지 알아 보도록 하겠다.

ExecutorBackend

receive

executorEndPoint를 통해 전송된 serialized task는 ExecutorBackend 에서 receive한다. deserialize 한 후 Executor에서 launchTask를 호출한다.

override def receive: PartialFunction[Any, Unit] = {
  ...
  case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        taskResources(taskDesc.taskId) = taskDesc.resources
        executor.launchTask(this, taskDesc)
      }

Executor

launchTask

launchTask에서는 task를 run할 객체인 TaskRunner type의 tr을 생성하고, 해당 thread를 실행한다.

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val taskId = taskDescription.taskId
    val tr = createTaskRunner(context, taskDescription)
    runningTasks.put(taskId, tr)
    val killMark = killMarks.get(taskId)
    if (killMark != null) {
      tr.kill(killMark._1, killMark._2)
      killMarks.remove(taskId)
    }
    threadPool.execute(tr)
  }

TaskRunner

TaskRunner 객체에는 taskId, taskName, 실행할 task 객체 등이 정의 되어 있고, 실제로 task를 실행 시키는 (Runnable interface의 run을 override 한) run 함수가 정의돼 있다.

run

run 함수에서는 task를 실행한다. task 실행 도중 JVM process의 GC time, CPU time, task duration 등의 metric을 측정한다.

override def run(): Unit = {
      ...
      var taskStartTimeNs: Long = 0
      var taskStartCpu: Long = 0
      startGCTime = computeTotalGcTime()
      var taskStarted: Boolean = false

      try {
        ...
        // Run the actual task and measure its runtime.
        taskStartTimeNs = System.nanoTime()
        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        var threwException = true
        val value = Utils.tryWithSafeFinally {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem,
            cpus = taskDescription.cpus,
            resources = taskDescription.resources,
            plugins = plugins)
          threwException = false
          res
        }
			  ...
        val resultSer = env.serializer.newInstance()
        val beforeSerializationNs = System.nanoTime()
        val valueBytes = resultSer.serialize(value)
        val afterSerializationNs = System.nanoTime()

Task

run

Task의 run 함수에서는 runTask 함수를 호출한다. runTask 함수는 ShuffleMapTask와 ResultTask에서 override 하여 구현 하고 있다.

final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem,
      cpus: Int,
      resources: Map[String, ResourceInformation],
      plugins: Option[PluginContainer]): T = {
    ...
    try {
      runTask(context)
    } catch {
    ...

ResultTask

Spark는 action이 호출 되야 실제로 연산을 수행하며(lazy-evaluation), action 호출 돼서 연산되는 작업을 Job이라고 칭한다. Job 내에서도 shuffleMap transformation이 호출돼서 구분된 ShuffleMapStage와 action이 호출 된 ResultStage로 구분된다. Job은 lineage에 따라 precede한 Stage 부터 수행 돼서 최종적으로 ResultStage가 수행 된다. 여기서는 ResultTask의 runTask 구현을 정리 해보았다.

runTask

runTask에서는 task가 실제로 수행된다. ResultTask의 인자로 받은 taskBinary를 deserialize 하고, rddfunc를 리턴 받는다. 여기서 rdd는 Stage DAG에서 가장 bottom에 위치한, 즉 action이 직접적으로 호출된 RDD 이고, func는 호출된 action에 정의된 lambda function이다. func의 type은 (TaskContext, Iterator[T]) => U이다. 실제 RDD partition은 Scala Iterator 형태로 존재하며, 각 transformation 들은 Iterator를 다른 형태의 Iterator로 변형 해 주는 것이다. action에 정의된 func 가 최종적으로 변형된 Iterator로 부터 결과를 도출한다. RDD에 iterator라는 함수가 정의돼 있고, iterator는 parent partition으로부터 새로운 Iterator를 리턴한다.

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTimeNs = System.nanoTime()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

RDD

iterator

iterator 함수는 storageLevel에 따라 연산을 수행한다. storageLevelStorageLevel.NONE이 아니라면 persist 함수가 호출됐었고, memroy나 disk에 존재할수도 있는 것이다. 물론 처음 생성되는 파티션이라 아직 캐시되지 않았을 수 있고, 캐시됐었지만 evict 돼서 현재는 존재하지 않을 수도 있다. persist가 호출됐던 RDD의 파티션이 현재 캐시에 존재하는지는 blockManager에서 관리를 한다. 캐시가 돼있지 않거나, storageLevelStorageLevel.NONE이면 computeOrReadCheckpoint를 호출한다.

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

computeOrReadCheckpoint

저장된 스냅샷을 사용해 checkpoint RDD 이후 나머지 lineage를 다시 계산하도록 하는 것이 checkpointing 이다. checkpointing을 실행하면 RDD의 모든 데이터를 디스크에 저장한다. RDD persist와 다른 점은 데이터만 저장하는 것이 아닌, lineage 까지 다 저장한다. checkpointing 후에는 저장한 RDD를 다시 계산할 필요가 없으므로 해당 RDD 의존 관계와 parent RDD 정보를 삭제한다. lineage 상에서 checkpoint된 RDD가 존재한다면, DAG에서 가장 인접한 parent RDD에서 iterator를 재귀적으로 호출한다. 아니라면 compute를 호출한다.

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

compute

compute의 실제 구현은 구현된 여러 RDD들마다 다르게 돼있다.

def compute(split: Partition, context: TaskContext): Iterator[T]

MapPartitionsRDD

대표적 transformation 중 하나인 map의 결과로 생성되는 RDD이다.

compute

compute에서는 parent RDD의 iterator를 호출한다. parent RDD의 iterator 결과를 map 함수의 인자로 받은 f: (TaskContext, Int, Iterator[T]) => Iterator[U] 를 사용하여 새로운 Iterator를 생성해서 리턴한다.

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

HadoopRDD

대부분의 Spark job의 시작은 input file을 읽는 것이다. input file을 읽으면서 생성하는 RDD 중 대표적으로 HadoopRDD 가 있다.

compute

transformation이 Iterator를 새로운 Iterator로 변환하는 과정이라면 최초의 Iterator는 어떻게 생성되는지가 궁금하다. HadoopRDD에서는 Hadoop의 recordReader를 사용해서 인풋파일을 읽어오는 Iterator를 생성해서 리턴한다.

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {
    ...
    reader =
        try {
          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
        }
    ...
    override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        }

Conclusion

두 편에 걸쳐서 Spark applicartion 코드에서 부터 실제로 Job, Stage, Task가 어떻게 worker node로 분배되고, 재귀적으로 처리되는지를 살펴보았다. Spark 실험을 위한 코드 수정을 할 때 파악했던 내용을 정리한 것인데, 각 클래스 별로 디테일 하게 보면 분산처리 과정 및 block managing 방식을 파악하기 좋다. 기회가 된다면 block managing 관련 내용도 정리해보겠다.

Reference

profile
Data Engineer
post-custom-banner

1개의 댓글

comment-user-thumbnail
2023년 3월 25일

잘봤습니다. 자세한 정리 감사합니다!

답글 달기