운영용 애플리케이션 실행하기
- spark-submit 명령을 사용하면 대화형 셀에서 개발한 프로그램을 운영용 애플리케이션으로 전환
- spark-submit 명령은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 함
- 참고
Dataset: 타입 안정성을 제공하는 구조적 API
- DataSet은 자바와 스칼라의 정적 데이터 타입을 지원함
- 타입 안정성을 지원하므로 파이썬과 R에서는 사용X
- 타입 안정성을 지원하므로 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없음
- 필요한 경우에 선택적으로 사용 가능
- ex) Dataset으로 처리 후 결과를 DataFrame으로 변환하는 등
구조적 스트리밍
- 참고
- 스파크 2.2 버전에서 안정화된 스트림 처리용 고수준 API
- 구조적 스트리밍은 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고, 값을 빠르게 얻을 수 있게함
예제: 특정 고객이 대량으로 구매하는 영업 시간 살펴보기
- 구매비용 컬럼을 추가하고
- 고객이 가장 많이 소비한 날 구하기
Case 1: 기본(정적) DataFrame 처리
static_df = spark.read.format('csv').option('header', 'true')\
.option('inferSchema','true').load('FileStore/tables/by-day/*.csv')
display(static_df.limit(10))
InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country
537226 22811 SET OF 6 T-LIGHTS CACTI 6 2010-12-06T08:34:00.000+0000 2.95 15987.0 United Kingdom
537226 21713 CITRONELLA CANDLE FLOWERPOT 8 2010-12-06T08:34:00.000+0000 2.1 15987.0 United Kingdom
537226 22927 GREEN GIANT GARDEN THERMOMETER 2 2010-12-06T08:34:00.000+0000 5.95 15987.0 United Kingdom
537226 20802 SMALL GLASS SUNDAE DISH CLEAR 6 2010-12-06T08:34:00.000+0000 1.65 15987.0 United Kingdom
537226 22052 VINTAGE CARAVAN GIFT WRAP 25 2010-12-06T08:34:00.000+0000 0.42 15987.0 United Kingdom
537226 22705 WRAP GREEN PEARS 25 2010-12-06T08:34:00.000+0000 0.42 15987.0 United Kingdom
537226 20781 GOLD EAR MUFF HEADPHONES 2 2010-12-06T08:34:00.000+0000 5.49 15987.0 United Kingdom
537226 22310 IVORY KNITTED MUG COSY 6 2010-12-06T08:34:00.000+0000 1.65 15987.0 United Kingdom
537226 22389 PAPERWEIGHT SAVE THE PLANET 6 2010-12-06T08:34:00.000+0000 2.55 15987.0 United Kingdom
537227 22941 CHRISTMAS LIGHTS 10 REINDEER 2 2010-12-06T08:42:00.000+0000 8.5 17677.0 United Kingdom
static_df.createOrReplaceTempView('retail_data')
staticSchema = static_df.schema
from pyspark.sql import functions as F
static_df.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate"
).groupBy(
F.col("CustomerId"), F.window(F.col('InvoiceDate'), "1 day")
).sum('total_cost').show(5)
+----------+--------------------+------------------+
CustomerId| window| sum(total_cost)|
+----------+--------------------+------------------+
13408.0|[2010-12-01 00:00...|1024.6800000000003|
17460.0|[2010-12-01 00:00...| 19.9|
16950.0|[2010-12-07 00:00...| 172.0|
13269.0|[2010-12-05 00:00...| 351.43|
12647.0|[2010-12-05 00:00...| 372.0|
+----------+--------------------+------------------+
only showing top 5 rows
Case 2: 스트리밍 DataFrame 처리
- 가장 큰 차이점: read 메서드 대신 readStream 메서드를 사용
stream_df = spark.readStream.schema(staticSchema)\
.option('maxFilesPerTrigger', 1)\
.format('csv')\
.option('header','true')\
.load('FileStore/tables/by-day/*.csv')
stream_df.isStreaming
Out[7]: True
static_df.isStreaming
Out[8]: False
purchasePerCustomerPerHour=\
stream_df.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate"
).groupBy(
F.col("CustomerId"), F.window(F.col('InvoiceDate'), "1 day")
).sum('total_cost')
purchasePerCustomerPerHour.show(5)
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<command-389380722962912> in <module>()
1
----> 2 purchasePerCustomerPerHour.show(5)
/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
377 """
378 if isinstance(truncate, bool) and truncate:
--> 379 print(self._jdf.showString(n, 20, vertical))
380 else:
381 print(self._jdf.showString(n, int(truncate), vertical))
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[FileStore/tables/by-day/*.csv]'
스트리밍 액션
- 스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 일반적인 정적 액션과는 다른 특성을 가짐
- 본 예제에서 사용할 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장함
- 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신함
- 스트림이 시작되면 쿼리 실행 결과가 어떠한 형태로 인메모리 테이블에 기록되는지 확인 가능
purchasePerCustomerPerHour.writeStream.format('memory')\#인메모리 테이블에 저장
.queryName('customer_purchases')\#인메모리에 저장될 테이블명
.outputMode('complete')\#complete: 모든 카운트 수행결과를 테이블에 저장
.start()
Out[15]: <pyspark.sql.streaming.StreamingQuery at 0x7f96a246bac8>
spark.sql('''
select * from customer_purchases
''').show(5)
+----------+--------------------+---------------+
CustomerId| window|sum(total_cost)|
+----------+--------------------+---------------+
13269.0|[2010-12-05 00:00...| 351.43|
16950.0|[2010-12-07 00:00...| 172.0|
14560.0|[2010-12-23 00:00...| -9.95|
13050.0|[2010-12-14 00:00...| 292.42|
17790.0|[2010-12-13 00:00...| 154.8|
+----------+--------------------+---------------+
only showing top 5 rows
purchasePerCustomerPerHour.writeStream.format('console')\#이번엔 콘솔에 결과 출력
.queryName('customer_purchases_2')\
.outputMode('complete')\
.start()
Out[15]: <pyspark.sql.streaming.StreamingQuery at 0x7f96a246bac8>
머신러닝과 고급 분석
- 내장된 머신러닝 알고리즘 라이브러리인 MLlib를 이용해 대규모 머신러닝 수행 가능
- 대용량 데이터를 대상으로 전처리 / 랭글링 / 학습 / 예측 을 할 수 있음
- 구조적 스트리밍에서 예측하고자 할 때도 MLlib에서 학습시킨 다양한 예측 모델 적용 가능
- 스파크는 분류 / 회귀 / 군집화 / 딥러닝과 관련된 정교한 API 제공
예제: k-means로 기본적인 군집화 수행
pre_df = static_df.na.fill(0).withColumn('day_of_week', F.date_format(F.col('InvoiceDate'), 'EEEE')).coalesce(5)
- 파티션 수 조절을 위한 메서드
- coalesce
- repartition
- coalesce는 파티션 수를 줄이기만 가능, repartition은 늘리고 줄이기 모두 가능
- 디폴트의 차이인 것 같고, 파라미터 값으로 조절 가능
- 참고
display(pre_df.limit(10))
InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country day_of_week
537226 22811 SET OF 6 T-LIGHTS CACTI 6 2010-12-06T08:34:00.000+0000 2.95 15987.0 United Kingdom Monday
537226 21713 CITRONELLA CANDLE FLOWERPOT 8 2010-12-06T08:34:00.000+0000 2.1 15987.0 United Kingdom Monday
537226 22927 GREEN GIANT GARDEN THERMOMETER 2 2010-12-06T08:34:00.000+0000 5.95 15987.0 United Kingdom Monday
537226 20802 SMALL GLASS SUNDAE DISH CLEAR 6 2010-12-06T08:34:00.000+0000 1.65 15987.0 United Kingdom Monday
537226 22052 VINTAGE CARAVAN GIFT WRAP 25 2010-12-06T08:34:00.000+0000 0.42 15987.0 United Kingdom Monday
537226 22705 WRAP GREEN PEARS 25 2010-12-06T08:34:00.000+0000 0.42 15987.0 United Kingdom Monday
537226 20781 GOLD EAR MUFF HEADPHONES 2 2010-12-06T08:34:00.000+0000 5.49 15987.0 United Kingdom Monday
537226 22310 IVORY KNITTED MUG COSY 6 2010-12-06T08:34:00.000+0000 1.65 15987.0 United Kingdom Monday
537226 22389 PAPERWEIGHT SAVE THE PLANET 6 2010-12-06T08:34:00.000+0000 2.55 15987.0 United Kingdom Monday
537227 22941 CHRISTMAS LIGHTS 10 REINDEER 2 2010-12-06T08:42:00.000+0000 8.5 17677.0 United Kingdom Monday
display(pre_df.select('InvoiceNo').describe())
summary InvoiceNo
count 148117
mean 542603.1728328828
stddev 3669.8110037898678
min 536365
max C549162
train/test 셋 분리
train_df = pre_df.where('InvoiceNo < 542603')
test_df = pre_df.where('InvoiceNo >= 542603')
train_df.count()
Out[61]: 74198
test_df.count()
Out[62]: 71191
- TrainValidationSplit이나 CrossValidator API로도 분리 가능(이건 나중에 다룸)
전처리
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol('day_of_week').setOutputCol('day_of_week_index')
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCol('day_of_week_index').setOutputCol('day_of_week_encoded')
from pyspark.ml import Pipeline
trans_pipeline = Pipeline().setStages([indexer, encoder, vector_asb])
- 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
from pyspark.ml.feature import VectorAssembler
vector_asb = VectorAssembler().setInputCols(['UnitPrice', 'Quantity', 'day_of_week_encoded']).setOutputCol('features')
파이프라인 생성
fit_pipeline = trans_pipeline.fit(train_df)
trans_train = fit_pipeline.transform(train_df).cache()
display(trans_train.limit(5))
InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country day_of_week day_of_week_index day_of_week_encoded features
537226 22811 SET OF 6 T-LIGHTS CACTI 6 2010-12-06T08:34:00.000+0000 2.95 15987.0 United Kingdom 2010-12-06 0.0 List(0, 42, List(0), List(1.0)) List(0, 44, List(0, 1, 2), List(2.95, 6.0, 1.0))
537226 21713 CITRONELLA CANDLE FLOWERPOT 8 2010-12-06T08:34:00.000+0000 2.1 15987.0 United Kingdom 2010-12-06 0.0 List(0, 42, List(0), List(1.0)) List(0, 44, List(0, 1, 2), List(2.1, 8.0, 1.0))
537226 22927 GREEN GIANT GARDEN THERMOMETER 2 2010-12-06T08:34:00.000+0000 5.95 15987.0 United Kingdom 2010-12-06 0.0 List(0, 42, List(0), List(1.0)) List(0, 44, List(0, 1, 2), List(5.95, 2.0, 1.0))
537226 20802 SMALL GLASS SUNDAE DISH CLEAR 6 2010-12-06T08:34:00.000+0000 1.65 15987.0 United Kingdom 2010-12-06 0.0 List(0, 42, List(0), List(1.0)) List(0, 44, List(0, 1, 2), List(1.65, 6.0, 1.0))
537226 22052 VINTAGE CARAVAN GIFT WRAP 25 2010-12-06T08:34:00.000+0000 0.42 15987.0 United Kingdom 2010-12-06 0.0 List(0, 42, List(0), List(1.0)) List(0, 44, List(0, 1, 2), List(0.42, 25.0, 1.0))
- 모델의 하이퍼파라미터를 튜닝할 때, 캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하므로 빠르게 반복적으로 수행 가능
모델 적용
- MLlib의 DataFrame API에서 제공하는 모든 알고리즘은 항상 두 가지 유형
- [알고리즘명]: 학습 전 명칭
- [알고리즘명]+Model: 학습 후 명칭
from pyspark.ml.clustering import KMeans,KMeansModel
kmeans= KMeans().setK(10).setSeed(1)
model = kmeans.fit(trans_train)
model.computeCost(trans_train)
Out[89]: 37504492.819086105
- 군집 비용을 말하며, 각 군집 중심점과의 제곱거리의 합을 의미
저수준 API
- 스파크의 거의 모든 기능은 RDD 기반으로 만들어짐
- DataFrame 연산도 RDD 기반으로 만들어졌고, 매우 효율적인 분산 처리를 위해 저수준 명령으로 컴파일됨
- RDD를 이용하면 파티션과 같은 물리적 실행 특성을 결정할 수 있다는 장점이 있음
- 구조적 API보다 더 세밀한 제어 가능(근데 대부분은 구조적 API를 사용하는게 좋음)
- 메모리에 저장된 원시 데이터를 병렬처리하는 데 RDD 사용
- 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD를 사용해야함
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
Out[91]: DataFrame[_1: bigint]
SparkR
- 스파크를 R언어로 사용하기 위한 기능
- 파이썬 API와 유사