- 이 장에서 알아볼 주제
- 스파크 애플리케이션의 아키텍처와 컴포넌트
- 스파크 내/외부에서 실행되는 스파크 애플리케이션의 생애주기
- 파이프라이닝과 같은 중요한 저수준 실행 속성
- 스파크 애플리케이션을 실행하는 데 필요한 사항
스파크 애플리케이션의 아키텍처
스파크 드라이버
- 스파크 애플리케이션의 운전자 역할을 하는 프로세스
- 스파크 애플리케이션의 실행을 제어하고
- 스파크 클러스터(익스큐터의 상태와 태스크)의 모든 상태 정보를 유지하고
- 물리적 컴퓨팅 자원 확보와 익스큐터 실행을 위해 클러스터 매니저와 통신할 수 있어야함
스파크 익스큐터
- 스파크 익스큐터는 스파크 드라이버가 할당한 태스크를 수행하는 프로세스
- 드라이버가 할당한 태스크를 받아 실행하고
- 태스크의 상태와 결과(성공 또는 실패)를 드라이버에 보고함
- 모든 스파크 애플리케이션은 개별 익스큐터 프로세스를 사용함
클러스터 매니저
-
스파크 드라이버와 익스큐터를 허공에 띄울 수는 없으므로 클러스터 매니저가 필요
-
클러스터 매니저는 스파크 애플리케이션을 실행할 클러스터 머신을 유지함
-
드라이버(마스터)와 워커라는 개념을 가지고 있음
- 프로세스가 아닌 물리적인 머신에 연결되는 개념
- 원은 개별 워커 노드를 실행하고 관리하는 데몬 프로세스
- 위에 표시된 원들은 클러스터 매니저의 프로세스일 뿐
-
스파크 애플리케이션을 실행할 때가 되면 클러스터 매니저에 자원 할당을 요청함
-
스파크가 지원하는 클러스터 매니저
- 스탠드얼론 클러스터 매니저
- 아파치 메소스
- 하둡 YARN
실행 모드
- 실행 모드는 애플리케이션을 실행할 때 요청한 자원의 물리적인 위치를 결정함
- 선택 가능한 실행 모드
클러스터 모드
- 가장 흔한 모드
- 클러스터 모드를 사용하려면 컴파일된 JAR파일이나 파이썬 스크립트 도는 R스크립트를 클러스터 매니저에 전달
- 클러스터 매니저는 파일은 받은 다음 워커 노드에 드라이버와 익스큐터 프로세스를 실행함
- 클러스터 매니저는 모든 스파크 애플리케이션과 관련된 프로세스를 유지하는 역할을 함
- 위 그림은 하나의 워커 노드에 스파크 드라이버를 할당하고 다른 워커 노드에 익스큐터를 할당한 모습
- 실선으로 그려진 직사각형: 드라이버 프로세스
- 점선으로 그려진 직사각형: 익스큐터 프로세스
클라이언트 모드
- 애플리케이션을 제출한 클라이언트 머신에 스파크 드라이버가 위치한다는 것을 제외하면 클러스터 모드와 비슷
- 스파크 드라이버 프로세스를 유지하며 클러스터 매니저는 익스큐터 프로세스를 유지함
- 스파크 애플리케이션이 클러스터와 무관한 머신에서 동작
- 보통 이런 머신을 게이트웨이 머신 또는 에지 노드라고 부름
- 드라이버는 클러스터 외부의 머신에서 실행되고
- 나머지 워커는 클러스터에 위치
- 실선으로 그려진 직사각형: 드라이버 프로세스
- 점선으로 그려진 직사각형: 익스큐터 프로세스
로컬 모드
- 학습, 테스트 용도로 주로 사용
- 로컬 모드로 설정된 경우 모든 스파크 애플리케이션은 단일 머신에서 실행됨
- 로컬 모드는 애플리케이션의 병렬 처리를 위해 단일 머신의 스레드를 활용
스파크 애플리케이션의 생애주기(스파크 외부)
- 초기화부터 종료까지 스파크 애플리케이션의 생애주기를 단계별로 볼 것임
1단계: 클라이언트 요청
- 두꺼운 화살표 선: 스파크나 스파크 관련 프로세스가 수행하는 통신
- 실선: 클러스터 매니저와의 통신 같은 일반적인 통신
1. 스파크 애플리케이션(컴파일된 JAR나 라이브러리 파일) 제출
- 제출하는 시점에 로컬 머신에서 코드가 실행되어 클러스터 드라이버 노드에 요청함
- 이 과정에서 스파크 드라이버 프로세스의 자원을 함께 요청
2. 클러스터 매니저는 이 요청을 받아들이고 클러스터 노드 중 하나에 드라이버 프로세스를 실행
3. 스파크 잡을 제출한 클라이언트 프로세스는 종료되고 애플리케이션은 클러스터에서 실행
2단계: 시작
- 사용자 코드에는 반드시 스파크 클러스터(드라이버와 익스큐터)를 초기화하는 SparkSession이 포함되어야 함
- SparkSession은 클러스터 매니저와 통신하여 스파크 익스큐터 프로세스의 실행을 요청함
- 사용자는 spark-submit을 실행할 때 사용하는 명령행 인수로 익스큐터 수와 설정값을 지정 가능
1. 클러스터 매니저는 익스큐터 프로세스를 시작하고
2. 결과를 응답 받아 익스큐터의 위치와 관련된 정보를 드라이버 프로세스로 전송
3. 모든 작업이 정상적으로 완료되면 '스파크 클러스터'가 완성
3단계: 실행
1. 드라이버와 워커는 코드를 실행하고 데이터를 이동하는 과정에서 서로 통신
2. 드라이버는 각 워커에 태스크 할당
3. 태스크를 할당받은 워커는 태스크의 상태와 성공/실패 여부를 드라이버에 전송
4단계: 완료
1. 스파크 애플리케이션의 실행이 완료되면 드라이버 프로세스가 성공이나 실패 중 하나의 상태로 종료됨
2. 그 다음 클러스터 매니저는 드라이버가 속한 스파크 클러스터의 모든 익스큐터를 종료시킴
3. 이 시점에 스파크 애플리케이션의 성공/실패 여부를 클러스터 매니저에 요청해 확인 가능
스파크 애플리케이션의 생애주기(스파크 내부)
- 스파크 애플리케이션을 정의하는 실제 '사용자 코드'와 관련된 내용을 다룰 것임
- 스파크 애플리케이션은 하나 이상의 스파크 잡으로 구성됨
- 스레드를 사용해 여러 액션을 병렬로 수행하는 경우가 아니라면 애플리케이션의 스파크 잡은 차례대로 실행됨
SparkSession
- 모든 스파크 애플리케이션은 가장 먼저 SparkSession을 생성함
- 여러 대화형 모드에선 자동으로 생성
- 애플리케이션을 만드는 경우라면 직접 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('word count').config('spark.some.config.option','some-value').getOrCreate()
SparkContext
- SparkSession의 SparkContext는 스파크 클러스터에 대한 연결을 나타냄
- SparkContext를 이용해 RDD같은 스파크의 저수준 API를 사용할 수 있음
- 또한 브로드캐스트 변수를 생성하고 코드를 실행할 수 있음
- getOrCreate메서드로 직접 SparkContext초기화 가능
논리적 명령
- 스파크 코드는 트랜스포메이션과 액션으로 구성됨
- 사용자는 SQL, 저수준 RDD 처리, 머신러닝 알고리즘 등을 사용해 트랜스포메이션과 액션을 마음대로 구성할 수 있음
- DataFrame과 같은 선언적 명령을 사용하는 방법과 논리적 명령이 물리적 실행 계획으로 어떻게 변환되는지 이해하는 것은 중요
df1= spark.range(2,10000000, 2)
df2= spark.range(2,10000000, 4)
step1= df1.repartition(5)
step12 = df2.repartition(6)
step2=step1.selectExpr('id*5 as id')
step3 = step2.join(step12,['id'])
step4 = step3.selectExpr('sum(id)')
step4.collect()
Out[2]: [Row(sum(id)=2500000000000)]
- 액션을 호출하면 개별 스테이지와 태스크로 이루어진 스파크 잡이 실행됨
step1.explain()
== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *(1) Range (2, 10000000, step=2, splits=16)
step12.explain()
== Physical Plan ==
Exchange RoundRobinPartitioning(6)
+- *(1) Range (2, 10000000, step=4, splits=16)
step2.explain()
== Physical Plan ==
*(2) Project [(id
+- Exchange RoundRobinPartitioning(5)
+- *(1) Range (2, 10000000, step=2, splits=16)
step4.explain()
== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_sum(merge sum
+- Exchange SinglePartition
+- *(4) HashAggregate(keys=[], functions=[partial_sum(id
+- *(4) Project [id
+- *(4) SortMergeJoin [id
+- *(2) Project [(id
+- *(1) Range (2, 10000000, step=2, splits=16) +- Sort [id
+- *(3) Range (2, 10000000, step=4, splits=16)
스파크 잡
- 보통 액션 하나당 하나의 스파크 잡이 생성되면 액션은 항상 결과를 반환함
- 스파크 잡은 일련의 스테이지로 나뉨
- 스테이지 수는 셔플 작업(넓은 의존성 트랜스포메이션)이 얼마나 많이 발생하는지에 따라 달라짐
스테이지
-
스파크의 스테이지는 다수의 머신에서 동일한 연산을 수행하는 태스크의 그룹을 나타냄
-
가능한 한 많은 태스크(잡의 트랜스포메이션)를 동일한 스테이지로 묶으려 노력함
-
셔플 작업이 일어난 후에는 반드시 새로운 스테이지를 시작
- 최종 결과를 계산하기 위해 스테이지 실행 순서를 계속 추적해야함
-
위 잡의 스테이지 구성
-
Stage 23: 16/16
-
Stage 24: 16/16
- DataFrame 생성을 위해 사용한 range명령을 수행한 단계
- range명령으로 df를 생성하면 기본적으로 16개의 파티션을 생성함
-
Stage 25: 5/5
- 파티션 재분배 단계(df1의 파티션 16 -> 5개)
- df2(step12)은 이후에 트랜스포메이션이 없어서 생략된건가?
- 데이터 셔플링으로 파티션 수를 변경함
- DataFrame은 스테이지의 태스크 수와 같은 수의 파티션으로 재분배됨
-
Stage 26: 200/200
-
Stage 27: 1/1
- 조인(셔플) 수행 단계
- 스파크 SQL은 spark.sql.suffle.partition 속성의 기본값이 200임
- 그래서 스파크 잡이 실행되는 도중에 셔플을 수행하면 200개의 셔플 파티션이 기본적으로 생성
-
경험적으로 보았을 때 클러스터의 익스큐터 수보다 파티션 수를 더 크게 지정하는 것이 좋음
- 대신 로컬 머신에서는 병렬로 처리할 수 있는 태스크 수가 제한적이므로 파티션 수를 작게 설정해야함
태스크
세부 실행 과정
- 스테이지와 태스크의 중요한 특성
- map 연산 후 다른 map 연산이 이어진다면 함께 실행할 수 있도록 스테이지와 태스크를 자동으로 연결
- 스파크는 모든 셔플을 작업할 때 데이터를 안정적인 저장소(디스크 같은)에 저장하므로 여러 잡에서 재사용 가능
1. 스테이지와 태스크를 자동 연결: 파이프라이닝
-
스파크를 '인메모리 컴퓨팅 도구'로 만들어주는 핵심 요소 중 하나는 메모리나 디스크에 데이터를 쓰기 전에 최대한 많은 단계를 수행한다는 점
-
스파크의 주요 최적화 기법 중 하나는 RDD나 RDD보다 더 아래에서 발생하는 파이프라이닝 기법
-
파이프라이닝 기법은 노드 간의 데이터 이동 없이 각 노드가 데이터를 직접 공급할 수 있는 연산만 모아 태스크의 단일 스테이지로 만듦
- ex) map, filter map 순으로 수행되는 RDD기반의 프로그램이 있다면,
- 먼저 입력 레코드를 읽어 map으로 전달한 다음 -> filter -> 마지막 map 함수로 전달해 처리하는 과정을 태스크의 단일 스테이지로 만듦
-
파이프라이닝으로 구성된 연산 작업은 단계별로 메모리나 디스크에 중간 결과를 저장하는 방식보다 훨씬 더 처리 속도가 빠름
-
스파크 런타임에서 파이프라이닝이 자동 수행되므로 눈에 보이진 않음
- 스파크 UI나 로그 파일로 애플리케이션을 확인해보면 다수의 RDD 또는 DataFrame 연산이 하나의 스테이지로 파이프라이닝되어 있음을 알 수 있음
2. 안정적인 저장소에 데이터 저장: 셔플 결과 저장
- 만약 잡이 실패한 경우, 셔플 파일을 디스크에 저장했으므로 소스 스테이지가 아닌 해당 스테이지부터 처리 가능
- 즉, 소스 태스크를 재실행할 필요 없이 실패한 리듀스 태스크부터 다시 시작 가능
- 단, 이미 셔플된 데이터를 이용해 새로운 잡을 실행하면 소스와 관련된 셔플이 다시 실행되지 않음
- 디스크에 이미 기록되어 있는 셔플 파일을 다시 사용할 수 있다고 판단하기 때문에 이전 스테이지를 처리하지 않음
- 이러한 상황은 스파크 UI와 로그 파일에서 'skipped'라고 표시