해당 게시물은 [스파크 완벽 가이드] 책을 기반으로 작성되었습니다.
해당 게시글에서 알아볼 주제는 다음과 같다.
스파크 애플리케이션과 관련된 고수준 컴포넌트를 알아보자.
스파크 드라이버
스파크 드라이버는 스파크 애플리케이션의 '운전자 역할'을 하는 프로세스다. 드라이버는 스파크 애플리케이션의 실행을 제어하 스파크 클러스터(익스큐터의 상태와 태스크)의 모든 상태 정보를 유지한다. 또한 물리적 컴퓨팅 자원 확보와 익스큐터 실행을 위해 클러스터 매니저와 통신할 수 있어야 한다.
요약하자면 스파크 드라이버는 물리적 머신의 프로세스이며 클러스터에서 실행 중인 애플리케이션의 상태를 유지한다.
스파크 익스큐터
스파크 익스큐터는 스파크 드라이버가 할당한 태스크를 수행하는 프로세스이다. 익스큐터는 드라이버가 할당한 태스크를 받아 실행하고 태스크의 상태와 결과(성공 또는 실패)를 드라이버에 보고한다. 모든 스파크 애플리케이션은 개별 익스큐터 프로세스를 사용한다.
클러스터 매니저
스파크 드라이버와 익스큐터를 허공에 띄울 수는 없으므로 클러스터 매니저가 필요하다. 클러스터 매니저는 스파크 애플리케이션을 실행할 클러스터 머신을 유지한다. 클러스터 매니저는 '드라이버'와 '워커'라는 개념을 가지고 있으며 이 때문에 혼란스러울 수도 있다. 가장 큰 차이점은 프로세스가 아닌 물리적인 머신에 연결되는 개념이라는 점이다.
다음은 기본적인 클러스터 구성을 나타낸 그림이다. 그림 왼쪽에 있는 머신은 클러스터 매니저의 드라이버 노드다. 원은 개별 워커 노드를 실행하고 관리하는 데몬 프로세스다. 그림에서 스파크 애플리케이션은 아직 실행되지 않았다. 표시된 원들은 클러스터 매니저의 프로세스일 뿐이다.

스파크 애플리케이션을 실제로 실행할 때가 되면 우리는 클러스터 매니저에 자원 할당을 요청한다. 사용자 애플리케이션의 설정에 따라 스파크 드라이버를 실행할 자원을 포함해 요청하거나 스파크 애플리케이션 실행을 위한 익스큐터 자원을 요청할 수도 있다. 스파크 애플리케이션의 실행 과정에서 클러스터 매니저는 애플리케이션이 실행되는 머신을 관리한다.
스파크가 지원하는 클러스터 매니저는 다음과 같다.
지금까지 애플리케이션의 기본 컴포넌트를 알아보았다. 이제 애플리케이션을 실행할 때 처음으로 선택해야 하는 '실행 모드'를 알아보겠다.
실행 모드는 애플리케이션을 실행할 때 요청한 자원의 물리적인 위치를 결정한다. 선택할 수 있는 실행 모드는 다음과 같다.
위 그림을 기준으로 각 실행 모드를 자세히 알아보겠다. 이어지는 그림에서 실선으로 그려진 직사각형은 스파크 드라이버 프로세스를 나타내며 점선으로 그려진 직사각형은 익스큐터 프로세스를 나타낸다.
가장 흔하게 사용되는 스파크 애플리케이션 실행 방식은 클러스터 모드다. 클러스터 모드를 사용할 때 거쳐야 하는 과정은 다음과 같다.
즉, 클러스터 매니저는 모든 스파크 애플리케이션과 관련된 프로세스를 유지하는 역할을 한다.
다음 그림은 하나의 워커 노드에 스파크 드라이버를 할당하고 다른 워커 노드에 익스큐터를 할당하는 모습을 나타낸다.
왼쪽의 큰 직사각형은 클러스터 매니저의 드라이버 노드이며, 오른쪽의 큰 직사각형은 클러스터 매니저의 워커 노드이다. 원들은 개별 워커 노드를 실행하고 관리하는 데몬 프로세스이다. 위에서 언급한 대로 실선으로 그려진 작은 직사각형은 드라이버 프로세스를 의미하며, 점선으로 그려진 직사각형은 익스큐터 프로세스를 의미한다.

클라이언트 모드는 애플리케이션을 제출한 클라이언트 머신에 스파크 드라이버가 위치한다는 것을 제외하면 클러스터 모드와 비슷하다. 즉, 클라이언트 머신은 스파크 드라이버 프로세스를 유지하며, 클러스터 매니저는 스파크 익스큐터 프로세스를 유지한다.
다음 그림을 보면 스파크 애플리케이션이 클러스터와 무관한 머신에서 동작하는 것을 알 수 있다. 보통은 이런 머신을 게이트 웨이 머신(gateway machine) 또는 에지 노드(edge node)라고 부른다. 다음 그림을 보면 드라이버는 클러스터 외부의 머신에서 실행되며 나머지 워커는 클러스터에 위치하는 것을 알 수 있다.

로컬 모드는 앞서 알아본 두 모드와 상당히 다르다. 로컬 모드로 설정된 경우 모든 스파크 애플리케이션은 단일 머신에서 실행된다. 로컬 모드는 애플리케이션의 병렬 처리를 위해 단일 머신의 스레드를 활용한다.
이 모드는 스파크를 학습하거나 애플리케이션 테스트 그리고 개발 중인 애플리케이션을 반복적으로 실험하는 용도로 주로 사용된다. 그러므로 운영용 애플리케이션을 실행할 때는 로컬 모드 사용을 권장하지 않는다.
스파크 애플리케이션의 생애 주기를 알아보자. spark-submit 명령을 사용해 애플리케이션을 실행하는 예제를 그림과 함께 알아볼 것이다. 하나의 드라이버 노드와 세 개의 워커 노드로 구성된 총 네 대 규모의 클러스터가 이미 실행되고 있다고 가정하자.
위에서 알아본 용어를 사용해 초기화부터 종료까지 스파크 애플리케이션의 생애 주기를 단계별로 알아보자.
이어 나오는 그림에서는 추가로 네트워크 통신을 나타내는 선이 존재한다. 두꺼운 화살표 선은 스파크나 스파크 관련 프로세스가 수행하는 통신을 표현한다. 점선은 클러스터 매니저와의 통신 같은 일반적인 통신을 표현한다.
첫 단계는 스파크 애플리케이션을 제출하는 것이다. 스파크 애플리케이션은 컴파일된 JAR나 라이브러리 파일을 의미한다. 스파크 애플리케이션을 제출하는 시점에 로컬 머신에서 코드가 실행되어 클러스터 드라이버 노드에 요청한다. 이 과정에서 스파크 드라이버 프로세스의 자원을 함께 요청한다. 클러스터 매니저는 이 요청을 받아들이고 클러스터 노드 중 하나에 드라이버 프로세스를 실행한다. 스파크 잡을 제출한 클라이언트 프로세스는 종료되고 애플리케이션은 클러스터에서 실행된다.

좌측 상단의 직사각형에서 중간 직사각형으로 향하는 화살표는 스파크 애플리케이션을 제출함으로써 클러스터 드라이버 노드에 요청하는 것을 의미한다. 중간 직사각형인 클러스터 매니저의 드라이버 노드에서는 요청을 받아들이고, 클러스터 노드 중 하나에 드라이버 프로세스를 실행시킨다. 좌측 상단의 직사각형 안쪽의 직사각형인 클라이언트 프로세스는 스파크 잡을 제출한 뒤 종료되고, 애플리케이션은 클러스터에서 실행된다.
스파크 애플리케이션을 제추랗기 위해 터미널에서 다음과 같은 형태의 명령을 실행한다.
./bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode cluster \
--conf <key>=<value> \
... # 다른 옵션
<application-jar> \
[application-arguments]
드라이버 프로세스가 클러스터에 배치되었으므로 사용자 코드를 실행할 차례다. 사용자 코드에는 반드시 스파크 클러스터(예: 드라이버와 익스큐터)를 초기화하는 SparkSession이 포함되어야 한다. SparkSession은 클러스터 매니저와 통신(어두운 선)해 스파크 익스큐터 프로세스의 실행을 요청(밝은 선)한다. 사용자는 spark-submit을 실행할 때 사용하는 명령행 인수로 익스큐터 수와 설정값을 지정할 수 있다.

클러스터 매니저는 익스큐터 프로세스를 시작하고 결과를 응답받아 익스큐터의 위치와 관련된 정보를 드라이버 프로세스로 전송한다. 모든 작업이 정상적으로 완료되면 '스파크 클러스터'가 완성된다.
스파크 애플리케이션 시작 과정을 정리해보면 다음과 같다.
'스파크 클러스터'가 생성되었으므로 다음 그림과 같이 코드를 실행한다. 드라이버와 워커는 코드를 실행하고 데이터를 이동하는 과정에서 서로 통신한다.
드라이버
각 워커에 태스크를 할당한다.
워커
태스크의 상태와 성공/실패 여부를 드라이버에 전달한다.

스파크 애플리케이션의 실행이 완료되면 드라이버 프로세스가 성공이나 실패 중 하나의 상태로 종료된다. 그런 다음 클러스터 매니저는 드라이버가 속한 스파크 클러스터의 모든 익스큐터를 종료시킨다. 이 시점에 스파크 애플리케이션의 성공/실패 여부를 클러스터 매니저에 요청해 확인할 수 있다.

지금까지 클러스터 관점(스파크를 지원하는 인프라 관점)에서 스파크 애플리케이션의 생애주기를 알아보았다.
이번에는 애플리케이션을 실행하면 스파크 내부에서 어떤 일이 발생하는지 알아보자. 여기서는 스파크 애플리케이션을 정의하는 실제 '사용자 코드'와 관련된 이야기를 한다. 스파크 애플리케이션은 하나 이상의 스파크 잡으로 구성된다. 스레드를 사용해 여러 액션을 병렬로 수행하는 경우가 아니라면 애플리케이션의 스파크 잡은 차례대로 실행된다.
SparkSession모든 스파크 애플리케이션은 가장 먼저 SparkSession을 생성해야 한다. 여러 대화형 모드에서는 자동으로 생성되지만, 애플리케이션을 만드는 경우라면 직접 생성해야 한다.
SparkSession을 직접 생성할 때에는 SparkSession의 빌더 메서드를 사용해 생성할 것을 권장한다. 이 방식을 사용하면 스파크와 스파크 SQL 컨텍스트를 안전하게 생성할 수 있다. 그리고 스파크 애플리케이션에서 다수의 라이브러리가 세션을 생성하려는 상황에서 컨텍스트 충돌을 방지할 수 있다.
// 스칼라에서 SparkSession 생성하기
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builer().appName("Databricks Spark Example")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.getOrCreate()
# 파이썬에서 SparkSession 생성하기
from pyspark.sql import SparkSession
spark = SparkSession.builder().master("local").appName("Word Count")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
SparkSession을 생성하면 스파크 코드를 실행할 수 있다. SparkSession을 사용해 모든 저수준 API, 기존 컨텍스트 그리고 관련 설정 정보에 접근할 수 있다.
SparkSession의 SparkContext는 스파크 클러스터에 대한 연결을 나타낸다. SparkContext를 이용해 RDD같은 스파크의 저수준 API를 사용할 수 있다. 스파크의 과거 버전의 예제나 문서를 보면 SparkContext는 일반적으로 sc 변수를 사용했다. SparkContext로 RDD, 어큐뮬레이터 그리고 브로드캐스트 변수를 생성하고 코드를 실행할 수 있다.
대부분의 경우 SparkSession으로 SparkContext에 접근할 수 있으므로 명시적으로 SparkContext를 초기화할 필요는 없다. 직접 초기화하는 일반적인 방법은 getOrCreatae 메서드를 사용하는 것이다.
// 스칼라 코드
import org.apache.spark.sparkContext
val sc = SparkContext.getOrCreate()
SparkSession이 초기화되었다면 코드를 실행할 차례다. 모든 스파크 코드는 RDD 명령으로 컴파일된다. 따라서 일부 논리적 명령(DataFrame 잡)을 알아보고 어떤 일이 발생하는지 단계별로 알아보겠다.
스파크 코드는 트랜스포메이션과 액션으로 구성된다. 사용자는 SQL, 저수준 RDD 처리, 머신러닝 알고리즘 등을 사용해 트랜스포메이션과 액션을 마음대로 구성할 수 있다. 그러므로 DataFrame과 같은 선언적 명령을 사용하는 방법과 논리적 명령이 물리적 실행 계획으로 어떻게 변환되는지 이해하는 것은 매우 중요하다. 이를 기반으로 스파크가 클러스터에서 동작하는 방식을 이해할 수 있다.
스파크가 사용자 코드를 어떻게 받아들이고 클러스터에 어떻게 명령을 전달하는지 되짚어보자. 그리고 코드 예제를 한 줄씩 살펴보면서 그 안에서 일어나는 작업을 알아보자.
간단한 DataFrame을 이용해 파티션을 재분배하는 잡, 값을 트랜스포메이션하는 잡, 집계 및 최종 결과를 얻어내는 잡 이렇게 세 단계의 잡을 수행한다.
아래 코드는 스파크 2.2 버전을 사용하는 파이썬 코드다. 잡 수가 크게 달라지지는 않겠지만, 물리적 실행 전략을 변화시키는 스파크의 최적화 과정에 따라 잡 수가 달라질 수 있다.
# 파이썬 코드
df1 = spark.range(1, 10000000, 2)
df2 = spark.range(1, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 결과는 2,500,000,000,000
위 코드 예제를 실행하면 액션으로 하나의 스파크 잡이 완료되는 것을 확인할 수 있다. 물리적 실행 계획에 대한 이해를 높이기 위해 실행 계획을 살펴보자. 실행 계획 정보는 쿼리를 실제로 수행한 다음 스파크 UI의 SQL 탭에서도 확인 가능하다.
step4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(id#8L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=44]
+- HashAggregate(keys=[], functions=[partial_sum(id#8L)])
+- Project [id#8L]
+- SortMergeJoin [id#8L], [id#2L], Inner
:- Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 200), ENSURE_REQUIREMENTS, [plan_id=36]
: +- Project [(id#0L * 5) AS id#8L]
: +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=26]
: +- Range (1, 10000000, step=2, splits=8)
+- Sort [id#2L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#2L, 200), ENSURE_REQUIREMENTS, [plan_id=37]
+- Exchange RoundRobinPartitioning(6), REPARTITION_BY_NUM, [plan_id=29]
+- Range (1, 10000000, step=4, splits=8)
collect같은 액션을 호출하면 개별 스테이지와 태스크로 이루어진 스파크 잡이 실행된다.
보통 액션 하나 당 하나의 스파크 잡이 생성되며 액션은 항상 결과를 반환한다. 스파크 잡은 일련의 스테이지로 나뉘며 스테이지 수는 셔플 작업이 얼마나 많이 발생하는지에 따라 달라진다.
이전 예제의 잡은 다음과 같이 나뉜다.
'어쩌다 저 숫자들이 나온 거지?'라는 의문이 들었을 것이다. 자세히 알아보자.
스파크의 스테이지는 다수의 머신에서 동일한 연산을 수행하는 태스크의 그룹을 나타낸다. 스파크는 가능한 한 많은 태스크(잡의 트랜스포메이션)를 동일한 스테이지로 묶으려 노력한다. 셔플 작업이 일어난 다음에는 반드시 새로운 스테이지를 시작한다. 셔플은 데이터의 물리적 재분배 과정이다. 예를 들어 DataFrame 정렬이나 키별로 적재된 파일 데이터를 그룹화하는 작업과 같다. 파티션을 재분배하는 과정은 데이터를 이동시키는 작업이므로 익스큐터 간의 조정이 필요하다. 스파크는 셔플이 끝난 다음 새로운 스테이지를 시작하며 최종 결과를 계산하기 위해 스테이지 실행 순서를 계속 추적한다.
이전 잡에서 처음 두 스테이지는 DataFrame 생성을 위해 사용한 range 명령을 수행하는 단계다. range 명령을 사용해 DataFrame을 생성하면 기본적으로 8개의 파티션을 생성한다.
다음은 파티션 재분배 단계다. 이 단계에서는 데이터 셔플링으로 파티션 수를 변경한다. 두 개의 DataFrame은 스테이지 3과 4의 태스크 수에 해당하는 5개, 6개의 파티션으로 재분배된다.
스테이지 3과 4는 개별 DataFrame에서 수행된다. 마지막 두 스테이지는 조인(셔플)을 수행한다. 왜 갑자기 태스크 수가 200개로 변했을까? 그 이유는 스파크 SQL 설정 때문이다. spark.sql.shuffle.partitions 속성의 기본값은 200이다. 그러므로 스파크 잡이 실행되는 도중에 셔플을 수행하면 기본적으로 200개의 셔플 파티션을 생성한다. spark.sql.shuffle.partitions 속성을 원하는 값으로 변경할 수 있다. 그러면 셔플을 수행할 때 생성되는 파티션 수가 변경된다.
파티션 수는 매우 중요한 파라미터다. 이 값은 효율적인 실행을 위해 클러스터의 코어 수에 맞추어 설정한다. 설정 방법은 다음과 같다.
spark.conf.set("spark.sql.shuffle.partitions", 50)
여러 요인에 의해 영향을 받을 수 있지만, 경험적으로 보면 클러스터의 익스큐터 수보다 파티션 수를 더 크게 지정하는 것이 좋다. 로컬 머신에서 코드를 실행하는 경우 병렬로 처리할 수 있는 태스크 수가 제한적이므로 이 값을 작게 설정해야 한다. 이 설정은 더 많은 익스큐터 코어를 사용할 수 있는 클러스터 환경을 위한 기본값이다.
최종 스테이지에서는 드라이버로 결과를 전송하기 전에 파티션마다 개별적으로 수행된 결과를 단일 파티션으로 모으는 작업을 수행한다.
스파크의 스테이지는 태스크로 구성된다. 각 태스크는 단일 익스큐터에서 실행할 데이터의 블록과 다수의 트랜스포메이션 조합으로 볼 수 있다. 만약 데이터셋이 거대한 하나의 파티션인 경우 하나의 태스크만 생성된다. 만약 1,000개의 작은 파티션으로 구성되어 있다면 1,000개의 태스크를 만들어 병렬로 실행할 수 있다.
즉, 태스크는 데이터 단위(파티션)에 적용되는 연산 단위를 의미한다. 파티션 수를 늘리면 더 높은 병렬성을 얻을 수 있다. 물론 만병통치약은 아니지만 최적화를 위한 가장 간단한 방법이다.
스파크의 스테이지와 태스크는 알아두면 좋을 만한 중요한 특성을 가지고 있다.
첫 째, 스파크는 map 연산 후 다른 map 연산이 이어진다면 함께 실행할 수 있도록 스테이지와 태스크를 자동으로 연결한다.
둘 째, 스파크는 모든 셔플을 작업할 때 데이터를 안정적인 저장소(예: 디스크)에 저장하므로 여러 잡에서 재사용할 수 있다.
스파크를 인메모리 컴퓨팅 도구로 만들어주는 핵심 요소 중 하나는 맵리듀스와 같은 스파크 이전 기능과 달리 스파크는 메모리나 디스크에 데이터를 쓰기 전에 최대한 많은 단계를 수행한다는 점이다. 스파크가 수행하는 주요 최적화 기법 중 하나는 RDD나 RDD보다 더 아래에서 발생하는 파이프라이닝 기법이다.
파이프라이닝 기법은 노드 간의 데이터 이동 없이 각 노드가 데이터를 직접 공급할 수 있는 연산만 모아 태스크의 단일 스테이지로 만든다.
예를 들어 map, filter, map 순서로 수행되는 RDD 기반의 프로그램을 개발했다면 개별 입력 레코드를 읽어 첫 번째 map으로 전달한 다음 filter하고 마지막 map 함수로 전달해 처리하는 과정을 태스크의 단일 스테이지로 만든다. 따라서 파이프라인으로 구성된 연산 작업은 단계별로 메모리나 디스크에 중간 결과를 기록하는 방식보다 훨씬 더 처리 속도가 빠르다.
select, filter 그리고 select를 수행하는 DataFrame이나 SQL 연산에서도 동일한 파이프라이닝 유형이 적용된다.
스파크 런타임에서 파이프라이닝을 자동으로 수행하기 때문에 애플리케이션을 개발할 때는 눈에 보이지 않는다. 스파크 UI나 로그 파일로 애플리케이션을 확인해보면 다수의 RDD 또는 DataFrame 연산이 하나의 스테이지로 파이프라이닝되어 있음을 알 수 있다.
두 번째 특성은 셔플 결과 저장이다. 스파크가 reduce-by-key 연산같이 노드 간 복제를 유발하는 연산을 실행하면 엔진에서 파이프라이닝을 수행하지 못하므로 네트워크 셔플이 발생한다.
노드 간 복제를 유발하는 연산은 각 키에 대한 입력 데이터를 먼저 여러 노드로부터 복사한다. 항상 데이터 전송이 필요한 '소스' 태스크를 먼저 수행하기 때문이다. 그리고 소스 태스크의 스테이지가 실행되는 동안 셔플 파일을 로컬 디스크에 기록한다. 그런 다음 그룹화나 리듀스를 수행하는 스테이지가 시작된다. 이 스테이지에서는 셔플 파일에서 레코드를 읽어 들인 다음 연산을 수행한다.
예를 들어 특정 범위의 키와 관련된 데이터를 읽고 처리한다. 만약 잡이 실패한 경우 셔플 파일을 디스크에 저장했기 때문에 '소스' 스테이지가 아닌 해당 스테이지부터 처리할 수 있다. 따라서 '소스' 태스크를 재실행할 필요 없이 실패한 리듀스 태스크부터 다시 시작할 수 있다.
셔플 결과를 저장할 때 발생할 수 있는 부작용은 이미 셔플된 데이터를 이용해 새로운 잡을 실행하면 '소스'와 관련된 셔플이 다시 실행되지 않는다는 것이다. 스파크는 다음 스테이지를 실행하는 과정에서 디스크에 이미 기록되어 있는 셔플 파일을 다시 사용할 수 있다고 판단하기 때문에 이전 스테이지를 처리하지 않는다. _스파크 UI와 로그 파일에서 'skipped'라고 표시된 사전 셔플 스테이지(pre-shuffle stage)를 확인할 수 있다. 이러한 자동 최적화 기능은 동일한 데이터를 사용해 여러 잡을 실행하는 워크로드의 시간을 절약할 수 있다.
물론 더 나은 성능을 얻기 위해 DataFrame이나 RDD의 cache 메서드를 사용할 수 있다. 그러면 사용자가 직접 캐싱을 수행할 수 있으며 정확히 어떤 데이터가 어디에 저장되는지 제어할 수 있다. 집계된 데이터에 스파크 액션을 수행한 다음 스파크 UI를 확인해보면 이러한 방식을 쉽게 이해할 수 있다.
이번 게시물에서는 스파크 애플리케이션을 클러스터에서 실행하면 어떤 일이 일어나는지 알아보았다. 즉, 클러스터에서 실제로 실행되는 방식과 스파크 애플리케이션 내부에서 어떤 일이 일어나는지 알 수 있었다.