CHAPTER 3. 스파크 기능 둘러보기

ack·2021년 6월 22일
0

Spark

목록 보기
3/6
post-thumbnail
  • 스파크의 구조
    • 저수준 API
    • 구조적 API : 스파크의 에코시스템의 방대한 기능과 라이브러리의 바탕
    • 표준 라이브러리
      • 스파크 라이브러리 : 그래프 분석, 머신 러닝, 스트리밍 등 다양한 작업 지원

3.1 운영용 애플리케이션 실행하기

  • spark-submit 명령
    • 대화형 셸에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환
    • 애플리케이션 코드를 클러스터에 전송해 실행시킴 이 애플리케이션은 작업이 종료되거나 에러가 발생할 때까지 실행됨
    • 애플리케이션 실행에 필요한 자원, 실행방식, 옵션 지정가능

3.2 Dataset : 타입 안전성(type-safe)을 제공하는 구조적 API

  • Dataset API
    • 자바와 스칼라의 정적 데이터 타입(정적 타입 코드)를 지원하기 위해 고안된 스파크의 구조적 API
      • 정적 타입 코드 : 컴파일시에 자료형을 결정함
      • 동적 타입 코드 : 실행시 자료형 결정함
    • 안전성 지원
    • 동적 타입인 파이썬과 R에선 사용불가
    • DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당하고 자바의 ArrayList, 스칼라의 Seq 객체 등의 고정 타입형 컬렉션을 다룰 수 있는 기능 제공
      • DataFrame : 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 로우타입의 객체로 구성된 분산 컬렉션
    • 타입 안전성 지원 : 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근 불가
    • 다수의 SW 엔지니어가 잘 정의된 인터페이스로 상호작용하는 대규모 애플리케이션 개발에 유용
  • Dataset 클래스
    • 자바 - Dataset, 스칼라 - Dataset[T]로 표시
    • 내부 객체의 데이터 타입(T)을 매개변수로 사용
    • 스파크2.0버전에선 JavaBean패턴과 스칼라의 케이스 클래스 유형으로 정의된 클래스 지원
    • 자동으로 타입T를 분석한 다음 Dataset의 표 형식 데이터에 적합한 스키마를 생성하기 때문에 타입을 제한적으로 사용할 수밖에 없음
  • Dataset API의 장점
    • 필요한 경우에 선택적 사용 가능
      • 데이터 타입 정의 후 map, fliter 함수 사용 가능
      • 스파크 처리를 마치고 결과를 DataFrame으로 자동 변환해 반환함
      • 스파크가 제공하는 여러 함수를 이용해 추가 처리 작업 가능
        • 타입 안전성을 보장하는 코드에서 저수준 API를 사용 가능
        • 고수준 API의 SQL을 사용해 빠른 분석 가능
    • collect, take 메서드를 호출하면 DataFrame을 구성하는 row타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환
      • 코드 변경없이 타입 안전성을 보장
      • 로컬이나 분산 클러스터 환경에서 데이터를 안전하게 다룰 수 있음

3.3 구조적 스트리밍

  • 안정화된 스트림 처리용 고수준 API
  • 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행 가능
  • 지연시간 감소, 증분 처리 가능
  • 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있음
  • 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있음

정적 DataFrame 버전의 코드

#정적 데이터셋의 DataFrame 생성
staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(path)
staticDataFrame.createOrReplaceTempView("retail_data")

#정적 데이터셋의 스키마 생성
staticSchema = staticDataFrame.schema

#총 구매비용 컬럼을 추가하고 고객이 가장 많이 소비한 날 검색
#window함수 집계시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성, 관련 날짜의 데이터를 그룹화함
staticDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 Day")).sum("total_cost").show(5)
spark.conf.set("spark.sql.shuffle.partitions", "5")

스트리밍 코드

#readStream 메서드 사용
#maxFilesPerTrigger 옵션 : 한번에 읽을 파일 수 설정
streamingDataFrame = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger", 1).format("csv").option("header", "true").load(path)

#Dataset이 데이터를 연속적으로 전달하는 데이터 소스라면 true 반환
streamingDataFrame.isStreaming

#지연연산이므로 data flow를 실행하기 위해 스트리밍 액션 호출 필요
purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")).sum("total_cost")

#trigger가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터 저장
#스트림 시작되면 쿼리 실행 결과가 어떠한 형태로 인메모리 테이블에 기록되는지 확인가능
purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()

#어떠한 형태로 인메모리 테이블에 기록되는지 확인
spark.sql("SELECT * FROM customer_purchases ORDER BY 'sum(total_cost)' DESC").show(5)

#처리 결과를 콘솔에 출력할 수 있음
purchaseByCustomerPerHour.writeStream.format("console").queryName("customer_purchases_2").outputMode("complete").start()

지연 연산 발생 → 데이터 플로를 실행하기 위해 스트리밍 액션 호출

스트리밍 액션은 어딘가에 데이터를 채워 넣어야 함 , Count메서드 같은 일반적인 정적 액션과는 다른 특성을 지님

스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터 저장

파일마다 트리거 실행

이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신하므로 언제나 가장 큰 값을 얻을 수 있음

3.4 머신러닝과 고급 분석

  • 스파크는 내장 머신러닝 알고리즘 라이브러리인 MLlib를 사용해 대규모 머신러닝을 수행할 수 있음
  • MLlib을 사용하면 대용량 데이터를 대상으로 전처리(preprocessing), 멍잉(munging), 모델학습(model tarining) 및 예측(prediction)을 할 수 있음
  • 구조적 스트리밍에서 예측하고자 할때도 MLlib에서 학습시킨 다양한 예측 모델을 사용할 수 있음
  • 분류, 회귀, 군집화, 딥러닝에 이르기까지 머신러닝과 관련된 정교한 API제공
  • 데이터 전처리에 사용하는 다양한 메서드 제공

원본데이터를 올바른 포맷으로 만드는 트랜스포메이션을 정의하고, 실제로 모델을 학습한 다음 예측을 수행

from pyspark.sql.functions import date_format, col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans

preppedDataFrame = staticDataFrame.na.fill(0).withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE")).coalesce(5)

#날짜를 기준으로 학습셋과 테스트셋으로 분리
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")

#액션을 호출하여 데이터 분리
trainDataFrame.count()
testDataFrame.count()

#스파크 MLlib는 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션 제공
#그 중 하나가 StringIndexer

#요일을 수치형으로 반환 (토요일-6, 월요일-1)
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")

#자체 컬럼으로 인코딩 특정 요일이 해당 요일인지 아닌지 boolean타입으로 나타낼 수 있음
encoder = OneHotEncoder().setInputCol("day_of_week_index").setOutputCol("day_of_week_encoded")

# encoder의 결과는 벡터 타입을 구성할 컬럼중 하나로 사용됨
#스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
#가격, 수량, 특정날짜의 요일을 
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"]).setOutputCol("features")

#나중에 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인을 설정
transformationPipeline = Pipeline().setStages([indexer, encoder, vectorAssembler])

#변환자를 데이터셋에 적합(fit) 시킴
#학습을 위한 맞춤 파이프라인 생성
fittedPipeline = transformationPipeline.fit(trainDataFrame)

#생성된 파이프라인을 사용해서 일관되고 반복적인 방식으로 모든 데이터 변환
transformedTraining = fittedPipeline.transform(trainDataFrame)

#하이퍼 파라미터 튜닝값 적용
transformedTraining.cache()

##머신러닝 모델 학습과정
#첫번째 아직학습되지 않은 모델을 초기화
#두번쨰 해당 모델을 학습시킴
#MLlib의 DataFrame API에서 제공하는 모든 알고리즘은 항상 두가지 유형으로 구성되어 있음
#학습전 알고리즘 : Algorithm
#학습후 알고리즘 : AlgorithmModel
#KMean, KMeanModel이 해당

#모델 인스턴스 생성
kmeans=KMeans().setK(20).setSeed(1)

#학습
kmModel = kmeans.fit(transformedTraining)

transformedTest = fittedPipeline.transform(testDataFrame)

#학습데이터셋에 대한 비용을 계산
kmModel.computeCost(transformedTest)

3.5 저수준 API

  • RDD를 통해 자바와 파이썬 객체를 다루는데 필요한 저수준 API 제공
  • 스파크의 거의 모든 기능은 RDD기반으로 만들어짐
  • DataFrame연산도 RDD를 기반으로 만들어졌으며 효율적인 분산 처리를 위해 저수준 명렁으로 컴파일
  • 원시 데이터를 읽거나 다루는 용도로 RDD사용할 수 있으나 구조적 API를 사용하는 것이 더 좋음
  • RDD를 이용해 파티션과 같은 물리적 실행 특성을 결정할 수 있으므로 DataFrame보다 더 세밀한 제어 가능
  • 드라이버 시스템의 메모리에 저장된 원시 데이터를 병렬처리하는데 RDD를 사용

간단한 숫자를 이용해 병렬화해 RDD를 생성하고 DF로 변환하는 예제

from pyspark.sql import Row

#숫자를 병렬화해 RDD를 생성하고 DF로 변환
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

3.6 SparkR

  • 스파크를 R언어로 사용하기 위한 기능
  • 파이썬 대신 R구문을 사용한다
  • R사용자는 magrittr의 pipe연산자같은 R라이브러리를 사용해 스파크 트랜스 포메이션 과정을 R과 유사하게 만들 수 있음
  • 정교한 플로팅 작업을 지원하는 ggplot같은 라이브러리 사용 가능

3.7 스파크의 에코시스템과 패키지

스파크의 다양한 프로젝트와 패키지를 spark-packagese.org 저장소에서 확인할 수 있음

참고 스파크 완벽 가이드 (Spark The Definitive Guide)
profile
아자 (*•̀ᴗ•́*)و

0개의 댓글