CHAPTER 2. 스파크 간단히 살펴보기

ack·2021년 6월 21일
0

Spark

목록 보기
2/6
post-thumbnail

CHAPTER 2. 스파크 간단히 살펴보기

2.1 스파크의 기본 아키텍처

  • 컴퓨터 클러스터 : 여러대의 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용하는 것
  • 스파크 : 클러스터의 데이터 처리 작업을 관리, 조율할 수 있는 프레임워크
  • 사용자는 클러스터 매니저에 스파크 애플리케이션을 제출한다. 이를 받은 클러스터 매니저는 애플리케이션 실행에 필요한 자원을 할당한다.

2.1.1 스파크 애플리케이션

  • 스파크 애플리케이션은 드라이버 프로세스(driver)와 다수의 익스큐터(executor) 프로세스로 구성된다.
  • 드라이버 프로세스
    • 클러스터 노드 중 하나에서 실행
    • main()함수 실행
    • 스파크 애플리케이션 정보의 유지 관리
    • 사용자 프로그램이나 입력에 대한 응답
    • 사용자 프로그램(Job)을 task 단위로 변환하여 Executor로 전달
    • 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포, 스케줄링 역할 수행
    • 스파크의 언어 API를 통해 다양한 언어로 실행 가능
    • 애플리케이션의 수명 주기 동인 관련 정보를 모두 유지
  • 익스큐터
    • 드라이버 프로세스가 할당한 작업을 수행
    • 드라이버가 할당한 코드를 실행하고 진행 상황을 다시 드라이버 노드에 보고
  • 클러스터 매니저
    • 물리적 머신을 관리
    • 스파크 애플리케이션에 사용 가능한 자원 파악 및 할당
    • 종류
      • 스파크 스탠드얼론(standalone) 클러스터 매니저
      • 하둡 YARN
      • 메소스(Mesos)

2.1.2 Spark Application 실행과정(흐름)

  1. 사용자가 클러스터 매니저에 스파크 애플리케이션을 제출
  2. Spark Driver가 main()을 실행하여 Spark Context 생성
  3. Spark Context가 클러스터 매니저와 연결
  4. Spark Driver가 클러스터 매니저로부터 Executor애플리케이션 실행에 필요한 자원을 요청
  5. Spark Context는 작업 내용을 task 단위로 분할하여 Executor에 보냄
  6. 각 Executor는 작업을 수행하고 결과를 저장하고 다시 드라이버에 보고

2.2 스파크의 다양한 언어 API

다양한 프로그래밍 언어로 스파크 코드 실행 가능

  • 스칼라 : 스파크의 기본언어. 스파크는 스칼라로 개발됨
  • 자바
  • 파이썬 : 스칼라가 지원하는 거의 모든 구조 지원
  • SQL : 스파크는 ANSI SQL:2003 표준 중 일부 지원
  • R : 스파크 코어에 호함된 Spark R, 커뮤니티 기반 패키지인 sparklyr 두개의 라이브러리 사용

스파크는 파이썬이나 R로 작성한 코드를 익스큐터에 전달하기 전에 JVM에서 실행할 수 있는 코드로 변환

2.3 스파크 API

  • 저수준의 비구조적 API
    • 분산 데이터 처리를 위한 RDD
      • RDD는 대량의 데이터를 요소로 가지는 분산 컬렉션
    • 분산형 공유 변수를 배포하고 다루기위한 API
  • 고수준의 구조적 API
    • 비정형 로그파일, 반정형 CSV파일 파케이(Parquet)파일을 처리함. 
    • 분산 컬렉션 API 
      • Dataset, DataFrame, SQL 테이블과 뷰

2.4 스파크 시작하기

  • 대화형 모드인 경우 SparkSession이 자동으로 생성된다.
  • standalone application으로 스파크를 시작하면 사용자 애플리케이션 코드에서 SparkSession 객체를 직접 생성해야 한다.

2.5 SparkSession

  • 스파크 애플리케이션은 SparkSession이라 불리는 드라이버 프로세스로 제어
  • SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
  • 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응한다.

0~999까지의 1000개의 로우로 구성된 DataFrame 생성

myRange = spark.range(1000).toDF("number")

각 로우에는 0~999까지의 값이 할당되어 있습니다.
이 숫자들은 분산 컬렉션을 나타냅니다.
클러스터 모드에서 예제를 실행하면 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당되고 이것이 스파크의 DataFrame입니다.

2.6 DataFrame

  • 대표적인 구조적 API
  • DataFrame : 데이터를 로우와 컬럼으로 단순하게 표현
  • Schema : 컬럼과 컬럼의 타입으로 정의한 목록
  • 스파크의 DataFrame은 수천대의 컴퓨터에 분산되어 있음.
    • 한 대의 컴퓨터에 저장하기에는 데이터가 크거나 계산에 너무 오랜 시간이 걸림

2.6.1 파티션

  • 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 chunk 단위로 데이터를 분할한다.

  • 파티션은 클러스터의 물리적 머신에 존재하는 row의 집합

  • DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타낸다.

  • 파티션이 하나라면 수천개의 익스큐터가 있더라도 병렬성은 1이 된다. 또한 수백개의 파티션이 있더라도 익스큐터가 하나밖에 없다면 병렬성은 1이 된다.

2.7 트랜스포메이션

  • 스파크의 핵심 데이터 구조는 불변성(immutable)을 가진다.

  • 데이터 구조를 변경하려면 트랜스포메이션을 사용해야 한다.

  • 액션을 호출해야만 실제 트랜스포메이션이 수행됨

  • 비즈니스 로직을 표현하는 핵심 개념

  • 유형

  • 좁은 의존성

    (narrow dependency)

    • 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침 (1:1)
    • 파이프라이닝을 자동 수행
    • DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 발생
  • 넓은 의존성(wide dependency)

    • 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침 (1:N)
    • 셔플(스파크가 클러스에서 파티션을 교환하는 작업)을 자동 수행
    • 셔플의 결과를 디스크에 저장

DataFrame에서 짝수를 찾는 트랜스포메이션

divisBy2 = myRange.where("number % 2 = 0")

결과는 출력되지 않는 추상적인 트랜스포메이션만 지정

2.7.1 지연 연산

  • 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
  • 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성한다. 스파크 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
  • 스파크는 이 과정을 거치며 전체 데이터 흐름을 최적화함
  • 조건절 푸시다운(predicate pushdown)

2.8 액션

  • 실제 연산을 수행하려면 액션 명령을 내려야 함
  • 액션은 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령
  • 액션의 유형
    • 콘솔에서 데이터를 보는 액션
    • 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
    • 출력 데이터소스에 저장하는 액션
  • 액션을 지정하면 Spark Job이 시작된다.
    • Spark Job은 좁은 트랜스포메이션을 수행한 후 넓은 트랜스포메이션을 수행한 후 각 언어에 적합한 네이티브 객체에 결과를 모읍니다.

DataFrame의 전체 레코드 수를 반환하는 액션

divisBy2.count()

2.9 스파크UI

  • http://localhost:4040
  • 스파크 잡의 진행 상황을 모니터링할 때 사용
  • 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보를 확인할 수 있음
  • 스파크 잡을 튜닝하거나 디버깅할 때 매우 유용

2.10 종합 예제

데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽는다. 이때 특정 파일의 포맷과 몇 가지 옵션을 함께 설정한다.

read 메서드 - 스키마추론을 활용하여 데이터를 읽기

flightData2015 = spark.read.option("inferSchema", "true")\ #스키마 추론 
	.option("header", "true")\ #첫 로우를 헤더로 지정 
    .csv("path") 

flightData2015.show()

스키마 추론(Schema Interence) : 스파크는 스키마 정보를 얻기 위해 데이터를 조금 읽는다. 그리고 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석한다.

Take액션 호출 - Array형태로 n개 확인, head명령과 동일

flightData2015.take(3)

sort() 트랜스포메이션 사용, 실행계획 확인

flightData2015.sort("count").explain()
  • sort 메소드는 트랜스포메이션이기 때문에 호출 시 데이터에 아무런 변화도 일어나지 않는다.
  • read 메소드는 액션이 호출되기 전까지 데이터를 읽지 않는다.
  • DataFrame 객체에 explain 메서드를 호출하면 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.
    • 실행 계획은 위에서 아래로 읽는다.
    • 최종 결과는 가장 위에, 데이터 소스는 가장 아래에 있다.
    • 실행 계획은 트랜스포메이션의 지향성 비순환 그래프,DAG(Directed Acyclic Graph)이며, 액션이 호출되면 결과를 만들어낸다. DAG의 각 단계는 불변성을 가진 신규 DataFrame을 생성한다.
  • 트랜스포메이션의 논리적 실행 계획은 DataFrame의 계보를 정의한다. 스파크는 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있다.

2.10.1 DataFrame과 SQL

DataFrame과 테이블에 쿼리 수행하고 실행계획 비교하기

#DataFrame을 테이블이나 뷰로 만들기 
flightData2015.createOrReplaceTempView("flightData2015") 

#SQL로 데이터 조회하기 
sqlWay = spark.sql("SELECT DEST_COUNTRY_NAME, COUNT(1) FROM flightData2015 GROUP BY DEST_COUNTRY_NAME") 
#DataFrame에 쿼리 수행하기 
dataFrameWay = flightData2015\ .groupBy("DEST_COUNTRY_NAME")\ .count() 

#둘의 실행계획 비교 
sqlWay.explain() 
dataFrameWay.explain()
  • DataFrame과 테이블이 동일한 기본 실행 계획으로 컴파일 된다.
  • 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획으로 컴파일한다.
  • createOrReplateTempView 메소드를 호출하면 모든 DataFrame을 테이블이나 뷰로 만들 수 있다.
  • 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰(임시 테이블)로 등록한 후 SQL 쿼리를 사용할 수 있다.
  • DataFrame에 쿼리를 수행하면 새로운 DataFrame이 반환된다.

max함수

from pyspark.sql.functions import max 

flightData2015.select(max("count")).take(1)
  • max함수는 DataFrame의 특정 컬럼값을 스캔하면서 이전 최댓값보다 큰 값을 찾는다.
  • max함수는 필터링을 수행해 단일 로우를 결과로 반환하는 트랜스포메이션이다.
flightData2015.createOrReplaceTempView("flightData2015") 

maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total FROM flightData2015 GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5""") maxSql.show() 

flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).show()
  • 실행계획은 트랜스포메이션의 지향성 비순환 그래프(DAG)이며 액션이 호출되면 결과를 만들어냅니다. 각 단계는 불변성을 가진 신규 DataFrame을 생성합니다.
  1. 데이터를 읽는다. 다만, 스파크는 해당 DataFrame이나 자신의 원본 DataFrame에 액션이 호출되기 전까지 데이터를 읽지 않습니다.
  2. 데이터를 그룹화한다. groupBy메서드가 호출되면 최종적으로 그룹화된 DataFrame을 지칭하는 이름을 가진 RealationGroupedDataset을 반환한다.
  3. 집계 유형을 지청하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum메서드 사용, 새로운 스키마 정보를 가지는 DataFrame을 생성
  4. 컬럼명 변경 withColumnRenamed 메서드에 원본 컬럼명과 신규 컬럼명을 인수로 저장
  5. 데이터를 정렬한다.
  6. 역순으로 정렬하기 위해 desc함수를 임포트한다.
  7. limit메서드로 반환 결과의 수를 제한한다. 결과 DataFrame의 상위 5개 로우를 반환한다.
  8. 액션을 수행한다. DataFrame의 결과를 모으는 프로세스를 시작한다. 처리가 끝나면 리스트나 배열을 반환한다.
참고 스파크 완벽 가이드 (Spark The Definitive Guide)
profile
아자 (*•̀ᴗ•́*)و

0개의 댓글