정적 DataFrame 버전의 코드
#정적 데이터셋의 DataFrame 생성
staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(path)
staticDataFrame.createOrReplaceTempView("retail_data")
#정적 데이터셋의 스키마 생성
staticSchema = staticDataFrame.schema
#총 구매비용 컬럼을 추가하고 고객이 가장 많이 소비한 날 검색
#window함수 집계시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성, 관련 날짜의 데이터를 그룹화함
staticDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 Day")).sum("total_cost").show(5)
spark.conf.set("spark.sql.shuffle.partitions", "5")
스트리밍 코드
#readStream 메서드 사용
#maxFilesPerTrigger 옵션 : 한번에 읽을 파일 수 설정
streamingDataFrame = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger", 1).format("csv").option("header", "true").load(path)
#Dataset이 데이터를 연속적으로 전달하는 데이터 소스라면 true 반환
streamingDataFrame.isStreaming
#지연연산이므로 data flow를 실행하기 위해 스트리밍 액션 호출 필요
purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")).sum("total_cost")
#trigger가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터 저장
#스트림 시작되면 쿼리 실행 결과가 어떠한 형태로 인메모리 테이블에 기록되는지 확인가능
purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()
#어떠한 형태로 인메모리 테이블에 기록되는지 확인
spark.sql("SELECT * FROM customer_purchases ORDER BY 'sum(total_cost)' DESC").show(5)
#처리 결과를 콘솔에 출력할 수 있음
purchaseByCustomerPerHour.writeStream.format("console").queryName("customer_purchases_2").outputMode("complete").start()
지연 연산 발생 → 데이터 플로를 실행하기 위해 스트리밍 액션 호출
스트리밍 액션은 어딘가에 데이터를 채워 넣어야 함 , Count메서드 같은 일반적인 정적 액션과는 다른 특성을 지님
스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터 저장
파일마다 트리거 실행
이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신하므로 언제나 가장 큰 값을 얻을 수 있음
원본데이터를 올바른 포맷으로 만드는 트랜스포메이션을 정의하고, 실제로 모델을 학습한 다음 예측을 수행
from pyspark.sql.functions import date_format, col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
preppedDataFrame = staticDataFrame.na.fill(0).withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE")).coalesce(5)
#날짜를 기준으로 학습셋과 테스트셋으로 분리
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")
#액션을 호출하여 데이터 분리
trainDataFrame.count()
testDataFrame.count()
#스파크 MLlib는 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션 제공
#그 중 하나가 StringIndexer
#요일을 수치형으로 반환 (토요일-6, 월요일-1)
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")
#자체 컬럼으로 인코딩 특정 요일이 해당 요일인지 아닌지 boolean타입으로 나타낼 수 있음
encoder = OneHotEncoder().setInputCol("day_of_week_index").setOutputCol("day_of_week_encoded")
# encoder의 결과는 벡터 타입을 구성할 컬럼중 하나로 사용됨
#스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
#가격, 수량, 특정날짜의 요일을
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"]).setOutputCol("features")
#나중에 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인을 설정
transformationPipeline = Pipeline().setStages([indexer, encoder, vectorAssembler])
#변환자를 데이터셋에 적합(fit) 시킴
#학습을 위한 맞춤 파이프라인 생성
fittedPipeline = transformationPipeline.fit(trainDataFrame)
#생성된 파이프라인을 사용해서 일관되고 반복적인 방식으로 모든 데이터 변환
transformedTraining = fittedPipeline.transform(trainDataFrame)
#하이퍼 파라미터 튜닝값 적용
transformedTraining.cache()
##머신러닝 모델 학습과정
#첫번째 아직학습되지 않은 모델을 초기화
#두번쨰 해당 모델을 학습시킴
#MLlib의 DataFrame API에서 제공하는 모든 알고리즘은 항상 두가지 유형으로 구성되어 있음
#학습전 알고리즘 : Algorithm
#학습후 알고리즘 : AlgorithmModel
#KMean, KMeanModel이 해당
#모델 인스턴스 생성
kmeans=KMeans().setK(20).setSeed(1)
#학습
kmModel = kmeans.fit(transformedTraining)
transformedTest = fittedPipeline.transform(testDataFrame)
#학습데이터셋에 대한 비용을 계산
kmModel.computeCost(transformedTest)
간단한 숫자를 이용해 병렬화해 RDD를 생성하고 DF로 변환하는 예제
from pyspark.sql import Row
#숫자를 병렬화해 RDD를 생성하고 DF로 변환
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
스파크의 다양한 프로젝트와 패키지를 spark-packagese.org 저장소에서 확인할 수 있음