Client Mode와 Cluster Mode를 정리하다보니
Driver에 대해 궁금한 점들이 생겼습니다.
Driver는 누가 실행시키고 Cluster Manager에게 어떤 것을 전달하는 걸까요?
spark-submit 명령에 따른 실행 흐름을 따라가보며 Driver의 정체에 대해 살펴 보겠습니다.
Java/Scala를 기준으로 진행하겠습니다.
// my-spark-app.jar
public class MySparkApp {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Java Spark App Example")
.getOrCreate();
spark.range(1, 10).toDF("number").show();
spark.stop();
}
}
# bash shell
$SPARK_HOME/bin/spark-submit \
--class MySparkApp \
--master spark://host:7077 \ # standalone, yarn, k8s etc..
--jars lib/mylib.jar \
my-spark-app.jar input.txt
이렇게 위 예제를 만들고 이대로 spark-submit을 수행하면,
SparkSumbit.scala 코드가 실행되게 됩니다.
SparkSumbit.scala의 동작 방식을 살펴보겠습니다.
// main
private[spark] object InProcessSparkSubmit {
def main(args: Array[String]): Unit = {
val submit = new SparkSubmit()
submit.doSubmit(args)
}
}
main 함수를 통해 SparkSubmit 객체가 만들어지고 doSubmit이 실행됩니다.
이어서 쭉쭉 따라가보겠습니다.
// doSubmit -> submit -> doRunMain -> runMain
def doSubmit(args: Array[String]): Unit = {
val appArgs = parseArguments(args) // spark-submit 명령 시 설정한 옵션들
val sparkConf = appArgs.toSparkConf() // 파싱된 정보를 기반으로 객체 생성
...
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog, sparkConf) // 해당 케이스를 선택
case SparkSubmitAction.KILL => kill(appArgs, sparkConf)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs, sparkConf)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
...
@tailrec //꼬리 재귀
private def submit(args: SparkSubmitArguments, uninitLog: Boolean, sparkConf: SparkConf): Unit = {
def doRunMain(): Unit = {
...
runMain(args, uninitLog) // appArgs의 인자들을 기반으로 실행
...
}
}
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// childArgs : Application 실행시 넘겨줄 인자들 (jar 뒤에 붙은 인자들)
// -> ex) "input.txt"
// childClasspath : JAR 파일 경로 ( Application 코드 및 의존성 라이브러리 )
// -> ex) "lib/mylib.jar"
// sparkConf : submit시 설정한 옵션들
// -> ex) "spark.master" -> "spark://host:7077"
// ChildMainClass : main class 즉, spark를 실행하는 class를 지칭
// -> ex) MySparkApp
...
// JVM에 로딩하기 위한 ClassLoader ( jar파일과 main class를 로딩 )
// -> Spark와 사용자 Application 간의 의존성 충돌을 방지
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// Spark 실행의 주체인 main class를 동적 로딩하기 위함
var mainClass: Class[_] = null
try {
// 현재 쓰레드의 ClassLoader를 통해 class 로딩
mainClass = Utils.classForName(childMainClass)
} catch {
...
}
// mainClass 내에서 SparkApplication 인터페이스가 구현이 되어있는지 확인
// Cluster Mode의 YARN, K8s, Stanalone은 자체적인 SparkApplication이 존재함
// 즉, Cluster Mode, Client Mode 인 경우로 구분됨
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
// Cluster Mode
// SparkApplication 기반의 인스턴스를 생성
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
// Client Mode
// 사용자의 코드를 JavaMainApplication으로 감싸 SparkApplication처럼 실행할 수 있게 만듬
new JavaMainApplication(mainClass)
}
...
// JVM을 실행
app.start(childArgs.toArray, sparkConf)
정리를 해보자면,
spark-submit은 Spark 명령을 수행하기 위해 JVM을 만들고
그 JVM에서 해당 명령들을 수행하는 것입니다.
JVM을 사용하는 이유는 Spark의 핵심 코드 베이스는 Scala
로 작성되어있고
그 Scala
는 Java
런타임 위에서 동작하기 때문입니다.
( Pyspark, SparkR도 마찬가지로 JVM을 통해 동작합니다. )
또한, 이 JVM이 바로 Spark의 Driver입니다.
이전에 "Driver는 Spark Application의 전체적인 실행 흐름을 제어하는 프로세스" 라고 언급했습니다.
app.start
를 통해 실행된 JVM이 바로 Spark Application의 실질적인 수행을 담당합니다.
그러므로 이 JVM이 Driver인 것이죠.
app.start
에 대해 조금 더 살펴보겠습니다.
위 코드에서 if문을 통해 mainClass 내에 SparkApplication이 구현되어 있는 가를 구분한다고 주석을 달아두었습니다.
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
// Cluster Mode
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
// Client Mode
new JavaMainApplication(mainClass)
}
이 부분이 그 코드입니다. 이 코드를 보면 알 수 있듯이
Cluster와 Client 두 가지 경우로 구분이 됩니다.
if
SparkApplication이 구현되어 있는 경우
-> Cluster Mode
-> 자체적으로 구현된 Application의 인스턴스를 생성
else
SparkApplication 구현되어 있지 않은 경우
-> Client Mode
-> JavaMainApplication을 생성
JavaMainApplication를 살펴보겠습니다.
출처 : SparkApplication.Scala
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// 전달받은 class로부터 main 함수를 가져옵니다.
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
...
// SparkConf에 들어있는 옵션들(key-value)을 Property에 등록해
// JVM에서 바로 접근 가능하게 만듭니다.
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
// main 함수에 필요한 args 인자를 넘겨주고 실행
// null은 main()이 static 메소드이므로 인스턴스가 필요 없음
mainMethod.invoke(null, args)
}
}
결론적으로, 처음에 작성한 MySparkApp의 main 함수를 실행한다는 것이죠.
과정을 요약해보면 이렇게 정리할 수 있을 것 같습니다.
처음 main 함수에 작성된 Spark 명령들을 수행하기 위해
(my-spark-app.jar
->MySparkApp
->main
)spark-submit이라는 프로세스를 거쳐
-> JVM(Driver)를 생성하고
-> 이 JVM 내에서 main함수를 실행해서 SparkContext를 생성한다.
kubernetes를 Cluster Manager로 사용했다고 가정하겠습니다.
이 경우 KubernetesClientApplication이 사용됩니다.
출처 : KubernetesClientApplication.scala
private[spark] class KubernetesClientApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// 인자들을 Parsing해서 구조화된 형태로 변환
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
run(parsedArguments, conf)
}
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
// 해당 SparkApp을 지칭할 고유의 App ID를 생성
val kubernetesAppId = KubernetesConf.getKubernetesAppId()
// Spark 설정, 리소스 등을 포함하여
// Driver 실행용 KubernetesConf 객체 생성
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs,
clientArguments.proxyUser)
// K8s 클러스터 API 서버 주소
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
// Pod Log 출력용
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
// tryWithResource : 종료시 자동으로 close()를 호출해 자원 누수 방지
// SparkKubernetesClientFactory : Spark와 Kubernetes API가 통신할 수 있는 Client를 생성
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None)) { kubernetesClient =>
// Spark에서 K8s API를 사용할 수 있게하는 Connection
val client = new Client(
kubernetesConf,
new KubernetesDriverBuilder(), // Driver Pod의 사양 구성
kubernetesClient,
watcher)
// Driver Pod를 실행 => main class 실행
client.run()
// 추가로 spark.driver.* 설정에 따라 리소스 요청
// spark.executor.instances만큼 Executor Pod도 요청
}
}
}
즉, Kubernetes 클러스터에 Driver Pod를 만들고
그 Pod 안에서 MySparkApp의 main 함수를 실행하여 SparkContext를 생성합니다.
( + Client를 통해 Spark에서 Kubernetes API 사용 가능 )
별도의 컨테이너 환경인 Pod를 생성하여 그 내부에서 실행할 뿐,
실행 내용 자체는 JavaMainApplication과 거의 다를게 없다고 볼 수 있습니다.
그럼 이제 처음 의문으로 다시 돌아오겠습니다.
Driver는 누가 실행시키는 것일까?
spark-submit에 의해 JVM이 만들어지게 되므로,
Trigger(누가)는 local 환경에서 시킨 것이고,
실행의 주체(어디서)는
Client Mode는 local 환경 그대로,
Cluster Mode는 Cluster Manager라고 볼 수 있겠네요.
Cluster Manager에게 어떤 것을 전달하는 걸까요?
kubernetesConf
이 부분에 들어가는 인자를 살펴보면 되겠네요.
sparkConf
: Spark의 설정 값 ( executor.memory, driver.cores 등 )
kubernetesAppId
: Spark App을 구분하는 고유한 식별자
clientArguments.mainAppResource
: main() 함수에 필요한 JAR 및 라이브러리 등의 경로
clientArguments.mainClass
: Driver 컨테이너에서 실행할 main Class ("MySparkApp")
clientArguments.driverArgs
: main 함수에 넘겨줄 인자들
( spark-submit ... input.csv output.csv 와 같은 설정 값이 아닌 추가적인 인자들 )
clientArguments.proxyUser
: 보안을 위한 사용자 식별용