Spark의 Driver 파헤치기

석형원·2025년 5월 23일
0

spark

목록 보기
3/4

Client Mode와 Cluster Mode를 정리하다보니
Driver에 대해 궁금한 점들이 생겼습니다.

Driver는 누가 실행시키고 Cluster Manager에게 어떤 것을 전달하는 걸까요?

spark-submit 명령에 따른 실행 흐름을 따라가보며 Driver의 정체에 대해 살펴 보겠습니다.

Java/Scala를 기준으로 진행하겠습니다.

// my-spark-app.jar
public class MySparkApp {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Java Spark App Example")
                .getOrCreate();

        spark.range(1, 10).toDF("number").show();

        spark.stop();
    }
}
# bash shell
$SPARK_HOME/bin/spark-submit \
  --class MySparkApp \
  --master spark://host:7077 \ # standalone, yarn, k8s etc..
  --jars lib/mylib.jar \
  my-spark-app.jar input.txt

이렇게 위 예제를 만들고 이대로 spark-submit을 수행하면,
SparkSumbit.scala 코드가 실행되게 됩니다.

SparkSumbit.scala의 동작 방식을 살펴보겠습니다.

// main
private[spark] object InProcessSparkSubmit {

  def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit()
    submit.doSubmit(args)
  }
  
}

main 함수를 통해 SparkSubmit 객체가 만들어지고 doSubmit이 실행됩니다.

이어서 쭉쭉 따라가보겠습니다.

// doSubmit -> submit -> doRunMain -> runMain
  def doSubmit(args: Array[String]): Unit = {
  	val appArgs = parseArguments(args) // spark-submit 명령 시 설정한 옵션들
    val sparkConf = appArgs.toSparkConf() // 파싱된 정보를 기반으로 객체 생성
  	...
  
  	appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog, sparkConf) // 해당 케이스를 선택
      case SparkSubmitAction.KILL => kill(appArgs, sparkConf)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs, sparkConf)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
    ...
    
    @tailrec //꼬리 재귀
  	private def submit(args: SparkSubmitArguments, uninitLog: Boolean, sparkConf: SparkConf): Unit = {
    	def doRunMain(): Unit = {
        	...
        	runMain(args, uninitLog) // appArgs의 인자들을 기반으로 실행
            ...
        }
  	}
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    // childArgs : Application 실행시 넘겨줄 인자들 (jar 뒤에 붙은 인자들)
    // -> ex) "input.txt"
    
    // childClasspath : JAR 파일 경로 ( Application 코드 및 의존성 라이브러리 )
    // -> ex) "lib/mylib.jar"
    
    // sparkConf : submit시 설정한 옵션들
   	// -> ex) "spark.master" -> "spark://host:7077"
    
    // ChildMainClass : main class 즉, spark를 실행하는 class를 지칭
    // -> ex) MySparkApp
    
    ...
    
    // JVM에 로딩하기 위한 ClassLoader ( jar파일과 main class를 로딩 )
    // -> Spark와 사용자 Application 간의 의존성 충돌을 방지
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      	addJarToClasspath(jar, loader)
    }
    
    // Spark 실행의 주체인 main class를 동적 로딩하기 위함
	var mainClass: Class[_] = null

    try {
    	// 현재 쓰레드의 ClassLoader를 통해 class 로딩
    	mainClass = Utils.classForName(childMainClass)
    } catch {
    	...
    }
    
    // mainClass 내에서 SparkApplication 인터페이스가 구현이 되어있는지 확인
    // Cluster Mode의 YARN, K8s, Stanalone은 자체적인 SparkApplication이 존재함
    // 즉, Cluster Mode, Client Mode 인 경우로 구분됨
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      	// Cluster Mode
        // SparkApplication 기반의 인스턴스를 생성
        mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
    	// Client Mode
        // 사용자의 코드를 JavaMainApplication으로 감싸 SparkApplication처럼 실행할 수 있게 만듬
      	new JavaMainApplication(mainClass)
    }
    ...
    
    // JVM을 실행
    app.start(childArgs.toArray, sparkConf)

정리를 해보자면,
spark-submit은 Spark 명령을 수행하기 위해 JVM을 만들고
JVM에서 해당 명령들을 수행하는 것입니다.

JVM을 사용하는 이유는 Spark의 핵심 코드 베이스는 Scala로 작성되어있고
ScalaJava 런타임 위에서 동작하기 때문입니다.
( Pyspark, SparkR도 마찬가지로 JVM을 통해 동작합니다. )

또한, 이 JVM이 바로 Spark의 Driver입니다.

이전에 "Driver는 Spark Application의 전체적인 실행 흐름을 제어하는 프로세스" 라고 언급했습니다.

app.start를 통해 실행된 JVM이 바로 Spark Application의 실질적인 수행을 담당합니다.

그러므로 이 JVM이 Driver인 것이죠.


app.start에 대해 조금 더 살펴보겠습니다.

위 코드에서 if문을 통해 mainClass 내에 SparkApplication이 구현되어 있는 가를 구분한다고 주석을 달아두었습니다.

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      	// Cluster Mode
        mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
    	// Client Mode
      	new JavaMainApplication(mainClass)
    }

이 부분이 그 코드입니다. 이 코드를 보면 알 수 있듯이
Cluster와 Client 두 가지 경우로 구분이 됩니다.

  • if SparkApplication이 구현되어 있는 경우
    -> Cluster Mode
    -> 자체적으로 구현된 Application의 인스턴스를 생성

  • else SparkApplication 구현되어 있지 않은 경우
    -> Client Mode
    -> JavaMainApplication을 생성

Client Mode의 경우

JavaMainApplication를 살펴보겠습니다.
출처 : SparkApplication.Scala

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    // 전달받은 class로부터 main 함수를 가져옵니다.
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    ...

	// SparkConf에 들어있는 옵션들(key-value)을 Property에 등록해 
    // JVM에서 바로 접근 가능하게 만듭니다. 
    val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }

	// main 함수에 필요한 args 인자를 넘겨주고 실행
    // null은 main()이 static 메소드이므로 인스턴스가 필요 없음
    mainMethod.invoke(null, args)
  }

}

결론적으로, 처음에 작성한 MySparkApp의 main 함수를 실행한다는 것이죠.

과정을 요약해보면 이렇게 정리할 수 있을 것 같습니다.

처음 main 함수에 작성된 Spark 명령들을 수행하기 위해
( my-spark-app.jar -> MySparkApp -> main )

spark-submit이라는 프로세스를 거쳐
-> JVM(Driver)를 생성하고
-> 이 JVM 내에서 main함수를 실행해서 SparkContext를 생성한다.

Cluster Mode의 경우

kubernetes를 Cluster Manager로 사용했다고 가정하겠습니다.
이 경우 KubernetesClientApplication이 사용됩니다.
출처 : KubernetesClientApplication.scala

private[spark] class KubernetesClientApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
  	// 인자들을 Parsing해서 구조화된 형태로 변환
    val parsedArguments = ClientArguments.fromCommandLineArgs(args)
    run(parsedArguments, conf)
  }

  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
	// 해당 SparkApp을 지칭할 고유의 App ID를 생성
    val kubernetesAppId = KubernetesConf.getKubernetesAppId()
    // Spark 설정, 리소스 등을 포함하여
    // Driver 실행용 KubernetesConf 객체 생성
    val kubernetesConf = KubernetesConf.createDriverConf(
      sparkConf,
      kubernetesAppId,
      clientArguments.mainAppResource,
      clientArguments.mainClass,
      clientArguments.driverArgs,
      clientArguments.proxyUser)

	// K8s 클러스터 API 서버 주소
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    // Pod Log 출력용
    val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)

   // tryWithResource : 종료시 자동으로 close()를 호출해 자원 누수 방지
   // SparkKubernetesClientFactory : Spark와 Kubernetes API가 통신할 수 있는 Client를 생성
   Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
      master,
      Some(kubernetesConf.namespace),
      KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
      SparkKubernetesClientFactory.ClientType.Submission,
      sparkConf,
      None)) { kubernetesClient =>
      	// Spark에서 K8s API를 사용할 수 있게하는 Connection
        val client = new Client(
          kubernetesConf,
          new KubernetesDriverBuilder(), // Driver Pod의 사양 구성
          kubernetesClient,
          watcher)
          
        // Driver Pod를 실행 => main class 실행
        client.run()
        // 추가로 spark.driver.* 설정에 따라 리소스 요청
        // spark.executor.instances만큼 Executor Pod도 요청
    }
  }
}

즉, Kubernetes 클러스터에 Driver Pod를 만들고
그 Pod 안에서 MySparkApp의 main 함수를 실행하여 SparkContext를 생성합니다.
( + Client를 통해 Spark에서 Kubernetes API 사용 가능 )

별도의 컨테이너 환경인 Pod를 생성하여 그 내부에서 실행할 뿐,
실행 내용 자체는 JavaMainApplication과 거의 다를게 없다고 볼 수 있습니다.


그럼 이제 처음 의문으로 다시 돌아오겠습니다.

  • Driver는 누가 실행시키는 것일까?
    spark-submit에 의해 JVM이 만들어지게 되므로,
    Trigger(누가)는 local 환경에서 시킨 것이고,

    실행의 주체(어디서)는
    Client Mode는 local 환경 그대로,
    Cluster Mode는 Cluster Manager라고 볼 수 있겠네요.

  • Cluster Manager에게 어떤 것을 전달하는 걸까요?
    kubernetesConf 이 부분에 들어가는 인자를 살펴보면 되겠네요.

    • sparkConf : Spark의 설정 값 ( executor.memory, driver.cores 등 )

    • kubernetesAppId : Spark App을 구분하는 고유한 식별자

    • clientArguments.mainAppResource : main() 함수에 필요한 JAR 및 라이브러리 등의 경로

    • clientArguments.mainClass : Driver 컨테이너에서 실행할 main Class ("MySparkApp")

    • clientArguments.driverArgs : main 함수에 넘겨줄 인자들
      ( spark-submit ... input.csv output.csv 와 같은 설정 값이 아닌 추가적인 인자들 )

    • clientArguments.proxyUser : 보안을 위한 사용자 식별용

Reference

profile
데이터 엔지니어를 꿈꾸는 거북이, 한걸음 한걸음

0개의 댓글