CHAPTER 2. 스파크 간단히 살펴보기
2.1 스파크의 기본 아키텍처
- 컴퓨터 클러스터 : 여러대의 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용하는 것
- 스파크 : 클러스터의 데이터 처리 작업을 관리, 조율할 수 있는 프레임워크
- 사용자는 클러스터 매니저에 스파크 애플리케이션을 제출한다. 이를 받은 클러스터 매니저는 애플리케이션 실행에 필요한 자원을 할당한다.
2.1.1 스파크 애플리케이션
- 스파크 애플리케이션은 드라이버 프로세스(driver)와 다수의 익스큐터(executor) 프로세스로 구성된다.
- 드라이버 프로세스
- 클러스터 노드 중 하나에서 실행
- main()함수 실행
- 스파크 애플리케이션 정보의 유지 관리
- 사용자 프로그램이나 입력에 대한 응답
- 사용자 프로그램(Job)을 task 단위로 변환하여 Executor로 전달
- 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포, 스케줄링 역할 수행
- 스파크의 언어 API를 통해 다양한 언어로 실행 가능
- 애플리케이션의 수명 주기 동인 관련 정보를 모두 유지
- 익스큐터
- 드라이버 프로세스가 할당한 작업을 수행
- 드라이버가 할당한 코드를 실행하고 진행 상황을 다시 드라이버 노드에 보고
- 클러스터 매니저
- 물리적 머신을 관리
- 스파크 애플리케이션에 사용 가능한 자원 파악 및 할당
- 종류
- 스파크 스탠드얼론(standalone) 클러스터 매니저
- 하둡 YARN
- 메소스(Mesos)
2.1.2 Spark Application 실행과정(흐름)
- 사용자가 클러스터 매니저에 스파크 애플리케이션을 제출
- Spark Driver가 main()을 실행하여 Spark Context 생성
- Spark Context가 클러스터 매니저와 연결
- Spark Driver가 클러스터 매니저로부터 Executor애플리케이션 실행에 필요한 자원을 요청
- Spark Context는 작업 내용을 task 단위로 분할하여 Executor에 보냄
- 각 Executor는 작업을 수행하고 결과를 저장하고 다시 드라이버에 보고
2.2 스파크의 다양한 언어 API
다양한 프로그래밍 언어로 스파크 코드 실행 가능
- 스칼라 : 스파크의 기본언어. 스파크는 스칼라로 개발됨
- 자바
- 파이썬 : 스칼라가 지원하는 거의 모든 구조 지원
- SQL : 스파크는 ANSI SQL:2003 표준 중 일부 지원
- R : 스파크 코어에 호함된 Spark R, 커뮤니티 기반 패키지인 sparklyr 두개의 라이브러리 사용
스파크는 파이썬이나 R로 작성한 코드를 익스큐터에 전달하기 전에 JVM에서 실행할 수 있는 코드로 변환
2.3 스파크 API
- 저수준의 비구조적 API
- 분산 데이터 처리를 위한 RDD
- RDD는 대량의 데이터를 요소로 가지는 분산 컬렉션
- 분산형 공유 변수를 배포하고 다루기위한 API
- 고수준의 구조적 API
- 비정형 로그파일, 반정형 CSV파일 파케이(Parquet)파일을 처리함.
- 분산 컬렉션 API
- Dataset, DataFrame, SQL 테이블과 뷰
2.4 스파크 시작하기
- 대화형 모드인 경우 SparkSession이 자동으로 생성된다.
- standalone application으로 스파크를 시작하면 사용자 애플리케이션 코드에서 SparkSession 객체를 직접 생성해야 한다.
2.5 SparkSession
- 스파크 애플리케이션은 SparkSession이라 불리는 드라이버 프로세스로 제어
- SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
- 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응한다.
0~999까지의 1000개의 로우로 구성된 DataFrame 생성
myRange = spark.range(1000).toDF("number")
각 로우에는 0~999까지의 값이 할당되어 있습니다.
이 숫자들은 분산 컬렉션을 나타냅니다.
클러스터 모드에서 예제를 실행하면 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당되고 이것이 스파크의 DataFrame입니다.
2.6 DataFrame
- 대표적인 구조적 API
- DataFrame : 데이터를 로우와 컬럼으로 단순하게 표현
- Schema : 컬럼과 컬럼의 타입으로 정의한 목록
- 스파크의 DataFrame은 수천대의 컴퓨터에 분산되어 있음.
- 한 대의 컴퓨터에 저장하기에는 데이터가 크거나 계산에 너무 오랜 시간이 걸림
2.6.1 파티션
-
스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 chunk 단위로 데이터를 분할한다.
-
파티션은 클러스터의 물리적 머신에 존재하는 row의 집합
-
DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타낸다.
-
파티션이 하나라면 수천개의 익스큐터가 있더라도 병렬성은 1이 된다. 또한 수백개의 파티션이 있더라도 익스큐터가 하나밖에 없다면 병렬성은 1이 된다.
2.7 트랜스포메이션
-
스파크의 핵심 데이터 구조는 불변성(immutable)을 가진다.
-
데이터 구조를 변경하려면 트랜스포메이션을 사용해야 한다.
-
액션을 호출해야만 실제 트랜스포메이션이 수행됨
-
비즈니스 로직을 표현하는 핵심 개념
-
유형
-
좁은 의존성
(narrow dependency)
- 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침 (1:1)
- 파이프라이닝을 자동 수행
- DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 발생
-
넓은 의존성(wide dependency)
- 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침 (1:N)
- 셔플(스파크가 클러스에서 파티션을 교환하는 작업)을 자동 수행
- 셔플의 결과를 디스크에 저장
DataFrame에서 짝수를 찾는 트랜스포메이션
divisBy2 = myRange.where("number % 2 = 0")
결과는 출력되지 않는 추상적인 트랜스포메이션만 지정
2.7.1 지연 연산
- 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
- 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성한다. 스파크 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
- 스파크는 이 과정을 거치며 전체 데이터 흐름을 최적화함
- 조건절 푸시다운(predicate pushdown)
2.8 액션
- 실제 연산을 수행하려면 액션 명령을 내려야 함
- 액션은 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령
- 액션의 유형
- 콘솔에서 데이터를 보는 액션
- 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
- 출력 데이터소스에 저장하는 액션
- 액션을 지정하면 Spark Job이 시작된다.
- Spark Job은 좁은 트랜스포메이션을 수행한 후 넓은 트랜스포메이션을 수행한 후 각 언어에 적합한 네이티브 객체에 결과를 모읍니다.
DataFrame의 전체 레코드 수를 반환하는 액션
divisBy2.count()
2.9 스파크UI
- http://localhost:4040
- 스파크 잡의 진행 상황을 모니터링할 때 사용
- 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보를 확인할 수 있음
- 스파크 잡을 튜닝하거나 디버깅할 때 매우 유용
2.10 종합 예제
데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽는다. 이때 특정 파일의 포맷과 몇 가지 옵션을 함께 설정한다.
read 메서드 - 스키마추론을 활용하여 데이터를 읽기
flightData2015 = spark.read.option("inferSchema", "true")\
.option("header", "true")\
.csv("path")
flightData2015.show()
스키마 추론(Schema Interence) : 스파크는 스키마 정보를 얻기 위해 데이터를 조금 읽는다. 그리고 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석한다.
Take액션 호출 - Array형태로 n개 확인, head명령과 동일
flightData2015.take(3)
sort() 트랜스포메이션 사용, 실행계획 확인
flightData2015.sort("count").explain()
- sort 메소드는 트랜스포메이션이기 때문에 호출 시 데이터에 아무런 변화도 일어나지 않는다.
- read 메소드는 액션이 호출되기 전까지 데이터를 읽지 않는다.
- DataFrame 객체에 explain 메서드를 호출하면 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.
- 실행 계획은 위에서 아래로 읽는다.
- 최종 결과는 가장 위에, 데이터 소스는 가장 아래에 있다.
- 실행 계획은 트랜스포메이션의 지향성 비순환 그래프,DAG(Directed Acyclic Graph)이며, 액션이 호출되면 결과를 만들어낸다. DAG의 각 단계는 불변성을 가진 신규 DataFrame을 생성한다.
- 트랜스포메이션의 논리적 실행 계획은 DataFrame의 계보를 정의한다. 스파크는 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있다.
2.10.1 DataFrame과 SQL
DataFrame과 테이블에 쿼리 수행하고 실행계획 비교하기
flightData2015.createOrReplaceTempView("flightData2015")
sqlWay = spark.sql("SELECT DEST_COUNTRY_NAME, COUNT(1) FROM flightData2015 GROUP BY DEST_COUNTRY_NAME")
dataFrameWay = flightData2015\ .groupBy("DEST_COUNTRY_NAME")\ .count()
sqlWay.explain()
dataFrameWay.explain()
- DataFrame과 테이블이 동일한 기본 실행 계획으로 컴파일 된다.
- 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획으로 컴파일한다.
- createOrReplateTempView 메소드를 호출하면 모든 DataFrame을 테이블이나 뷰로 만들 수 있다.
- 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰(임시 테이블)로 등록한 후 SQL 쿼리를 사용할 수 있다.
- DataFrame에 쿼리를 수행하면 새로운 DataFrame이 반환된다.
max함수
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)
- max함수는 DataFrame의 특정 컬럼값을 스캔하면서 이전 최댓값보다 큰 값을 찾는다.
- max함수는 필터링을 수행해 단일 로우를 결과로 반환하는 트랜스포메이션이다.
flightData2015.createOrReplaceTempView("flightData2015")
maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total FROM flightData2015 GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5""") maxSql.show()
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).show()
- 실행계획은 트랜스포메이션의 지향성 비순환 그래프(DAG)이며 액션이 호출되면 결과를 만들어냅니다. 각 단계는 불변성을 가진 신규 DataFrame을 생성합니다.
- 데이터를 읽는다. 다만, 스파크는 해당 DataFrame이나 자신의 원본 DataFrame에 액션이 호출되기 전까지 데이터를 읽지 않습니다.
- 데이터를 그룹화한다. groupBy메서드가 호출되면 최종적으로 그룹화된 DataFrame을 지칭하는 이름을 가진 RealationGroupedDataset을 반환한다.
- 집계 유형을 지청하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum메서드 사용, 새로운 스키마 정보를 가지는 DataFrame을 생성
- 컬럼명 변경 withColumnRenamed 메서드에 원본 컬럼명과 신규 컬럼명을 인수로 저장
- 데이터를 정렬한다.
- 역순으로 정렬하기 위해 desc함수를 임포트한다.
- limit메서드로 반환 결과의 수를 제한한다. 결과 DataFrame의 상위 5개 로우를 반환한다.
- 액션을 수행한다. DataFrame의 결과를 모으는 프로세스를 시작한다. 처리가 끝나면 리스트나 배열을 반환한다.
참고 스파크 완벽 가이드 (Spark The Definitive Guide)