Spark 완벽 가이드 ch3. 스파크 기능 둘러보기

Q·2023년 1월 9일
0

Spark 완벽 가이드

목록 보기
4/24

운영용 애플리케이션 실행하기

  • spark-submit 명령을 사용하면 대화형 셀에서 개발한 프로그램을 운영용 애플리케이션으로 전환
    • spark-submit 명령은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 함
    • 참고

Dataset: 타입 안정성을 제공하는 구조적 API

  • DataSet은 자바와 스칼라의 정적 데이터 타입을 지원함
    • 타입 안정성을 지원하므로 파이썬과 R에서는 사용X
  • 타입 안정성을 지원하므로 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없음
    • 대규모 애플리케이션을 개발하는데 유용
  • 필요한 경우에 선택적으로 사용 가능
    • ex) Dataset으로 처리 후 결과를 DataFrame으로 변환하는 등

구조적 스트리밍

  • 참고
  • 스파크 2.2 버전에서 안정화된 스트림 처리용 고수준 API
  • 구조적 스트리밍은 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고, 값을 빠르게 얻을 수 있게함

예제: 특정 고객이 대량으로 구매하는 영업 시간 살펴보기

  • 구매비용 컬럼을 추가하고
  • 고객이 가장 많이 소비한 날 구하기

Case 1: 기본(정적) DataFrame 처리

static_df = spark.read.format('csv').option('header', 'true')\
.option('inferSchema','true').load('FileStore/tables/by-day/*.csv')
display(static_df.limit(10))
InvoiceNo	StockCode	Description	Quantity	InvoiceDate	UnitPrice	CustomerID	Country
537226	22811	SET OF 6 T-LIGHTS CACTI	6	2010-12-06T08:34:00.000+0000	2.95	15987.0	United Kingdom
537226	21713	CITRONELLA CANDLE FLOWERPOT	8	2010-12-06T08:34:00.000+0000	2.1	15987.0	United Kingdom
537226	22927	GREEN GIANT GARDEN THERMOMETER	2	2010-12-06T08:34:00.000+0000	5.95	15987.0	United Kingdom
537226	20802	SMALL GLASS SUNDAE DISH CLEAR	6	2010-12-06T08:34:00.000+0000	1.65	15987.0	United Kingdom
537226	22052	VINTAGE CARAVAN GIFT WRAP	25	2010-12-06T08:34:00.000+0000	0.42	15987.0	United Kingdom
537226	22705	WRAP GREEN PEARS	25	2010-12-06T08:34:00.000+0000	0.42	15987.0	United Kingdom
537226	20781	GOLD EAR MUFF HEADPHONES	2	2010-12-06T08:34:00.000+0000	5.49	15987.0	United Kingdom
537226	22310	IVORY KNITTED MUG COSY	6	2010-12-06T08:34:00.000+0000	1.65	15987.0	United Kingdom
537226	22389	PAPERWEIGHT SAVE THE PLANET	6	2010-12-06T08:34:00.000+0000	2.55	15987.0	United Kingdom
537227	22941	CHRISTMAS LIGHTS 10 REINDEER	2	2010-12-06T08:42:00.000+0000	8.5	17677.0	United Kingdom
static_df.createOrReplaceTempView('retail_data')
staticSchema = static_df.schema
from pyspark.sql import functions as F
static_df.selectExpr(
  "CustomerId",
  "(UnitPrice * Quantity) as total_cost",
  "InvoiceDate"
).groupBy(
  F.col("CustomerId"), F.window(F.col('InvoiceDate'), "1 day")
).sum('total_cost').show(5)
+----------+--------------------+------------------+ 
CustomerId| window| sum(total_cost)| 
+----------+--------------------+------------------+ 
13408.0|[2010-12-01 00:00...|1024.6800000000003| 
17460.0|[2010-12-01 00:00...| 19.9| 
16950.0|[2010-12-07 00:00...| 172.0| 
13269.0|[2010-12-05 00:00...| 351.43| 
12647.0|[2010-12-05 00:00...| 372.0| 
+----------+--------------------+------------------+ 
only showing top 5 rows

Case 2: 스트리밍 DataFrame 처리

  • 가장 큰 차이점: read 메서드 대신 readStream 메서드를 사용
#read -> readStream
stream_df = spark.readStream.schema(staticSchema)\
.option('maxFilesPerTrigger', 1)\
.format('csv')\
.option('header','true')\
.load('FileStore/tables/by-day/*.csv')
#dataframe이 스트리밍 유형인지 확인
stream_df.isStreaming
Out[7]: True
static_df.isStreaming
Out[8]: False
#위와 동일한 로직 적용  -> 이것도 역시 지연 연산이므로 스트리밍 액션을 호출해야함
purchasePerCustomerPerHour=\
stream_df.selectExpr(
  "CustomerId",
  "(UnitPrice * Quantity) as total_cost",
  "InvoiceDate"
).groupBy(
  F.col("CustomerId"), F.window(F.col('InvoiceDate'), "1 day")
).sum('total_cost')
#스트리밍 DataFrame은 show() 안먹힘
purchasePerCustomerPerHour.show(5)
--------------------------------------------------------------------------- 
AnalysisException Traceback (most recent call last) 
<command-389380722962912> in <module>() 
	1 #스트리밍 DataFrame은 show() 안먹힘 
----> 2 purchasePerCustomerPerHour.show(5) 

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) 
	377 """ 
	378 if isinstance(truncate, bool) and truncate: 
--> 379 print(self._jdf.showString(n, 20, vertical)) 
	380 else: 
	381 print(self._jdf.showString(n, int(truncate), vertical)) 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 
	1255 answer = self.gateway_client.send_command(command) 
	1256 return_value = get_return_value( 
-> 1257 answer, self.gateway_client, self.target_id, self.name) 
	1258 
	1259 for temp_arg in temp_args: 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
	67 e.java_exception.getStackTrace())) 
	68 if s.startswith('org.apache.spark.sql.AnalysisException: '): 
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 
	70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 
	71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 

AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[FileStore/tables/by-day/*.csv]'
스트리밍 액션
  • 스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 일반적인 정적 액션과는 다른 특성을 가짐
  • 본 예제에서 사용할 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장함
    • 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신
  • 스트림이 시작되면 쿼리 실행 결과가 어떠한 형태로 인메모리 테이블에 기록되는지 확인 가능
purchasePerCustomerPerHour.writeStream.format('memory')\#인메모리 테이블에 저장
.queryName('customer_purchases')\#인메모리에 저장될 테이블명
.outputMode('complete')\#complete: 모든 카운트 수행결과를 테이블에 저장
.start()
Out[15]: <pyspark.sql.streaming.StreamingQuery at 0x7f96a246bac8>
#인메모리 테이블에 어떻게 기록되는지 찍어보기 
spark.sql('''
select * from customer_purchases
''').show(5)
+----------+--------------------+---------------+ 
CustomerId| window|sum(total_cost)| 
+----------+--------------------+---------------+ 
13269.0|[2010-12-05 00:00...| 351.43| 
16950.0|[2010-12-07 00:00...| 172.0| 
14560.0|[2010-12-23 00:00...| -9.95| 
13050.0|[2010-12-14 00:00...| 292.42| 
17790.0|[2010-12-13 00:00...| 154.8| 
+----------+--------------------+---------------+ 
only showing top 5 rows
purchasePerCustomerPerHour.writeStream.format('console')\#이번엔 콘솔에 결과 출력
.queryName('customer_purchases_2')\
.outputMode('complete')\
.start()
Out[15]: <pyspark.sql.streaming.StreamingQuery at 0x7f96a246bac8>

머신러닝과 고급 분석

  • 내장된 머신러닝 알고리즘 라이브러리인 MLlib를 이용해 대규모 머신러닝 수행 가능
  • 대용량 데이터를 대상으로 전처리 / 랭글링 / 학습 / 예측 을 할 수 있음
    • 구조적 스트리밍에서 예측하고자 할 때도 MLlib에서 학습시킨 다양한 예측 모델 적용 가능
  • 스파크는 분류 / 회귀 / 군집화 / 딥러닝과 관련된 정교한 API 제공

예제: k-means로 기본적인 군집화 수행

pre_df = static_df.na.fill(0).withColumn('day_of_week', F.date_format(F.col('InvoiceDate'), 'EEEE')).coalesce(5)

  • 파티션 수 조절을 위한 메서드
    • coalesce
    • repartition
    • coalesce는 파티션 수를 줄이기만 가능, repartition은 늘리고 줄이기 모두 가능
      • 디폴트의 차이인 것 같고, 파라미터 값으로 조절 가능
    • 참고
display(pre_df.limit(10))
InvoiceNo	StockCode	Description	Quantity	InvoiceDate	UnitPrice	CustomerID	Country	day_of_week
537226	22811	SET OF 6 T-LIGHTS CACTI	6	2010-12-06T08:34:00.000+0000	2.95	15987.0	United Kingdom	Monday
537226	21713	CITRONELLA CANDLE FLOWERPOT	8	2010-12-06T08:34:00.000+0000	2.1	15987.0	United Kingdom	Monday
537226	22927	GREEN GIANT GARDEN THERMOMETER	2	2010-12-06T08:34:00.000+0000	5.95	15987.0	United Kingdom	Monday
537226	20802	SMALL GLASS SUNDAE DISH CLEAR	6	2010-12-06T08:34:00.000+0000	1.65	15987.0	United Kingdom	Monday
537226	22052	VINTAGE CARAVAN GIFT WRAP	25	2010-12-06T08:34:00.000+0000	0.42	15987.0	United Kingdom	Monday
537226	22705	WRAP GREEN PEARS	25	2010-12-06T08:34:00.000+0000	0.42	15987.0	United Kingdom	Monday
537226	20781	GOLD EAR MUFF HEADPHONES	2	2010-12-06T08:34:00.000+0000	5.49	15987.0	United Kingdom	Monday
537226	22310	IVORY KNITTED MUG COSY	6	2010-12-06T08:34:00.000+0000	1.65	15987.0	United Kingdom	Monday
537226	22389	PAPERWEIGHT SAVE THE PLANET	6	2010-12-06T08:34:00.000+0000	2.55	15987.0	United Kingdom	Monday
537227	22941	CHRISTMAS LIGHTS 10 REINDEER	2	2010-12-06T08:42:00.000+0000	8.5	17677.0	United Kingdom	Monday
display(pre_df.select('InvoiceNo').describe())
summary	InvoiceNo
count	148117
mean	542603.1728328828
stddev	3669.8110037898678
min	536365
max	C549162

train/test 셋 분리

train_df = pre_df.where('InvoiceNo < 542603')
test_df = pre_df.where('InvoiceNo >= 542603')
train_df.count()
Out[61]: 74198
test_df.count()
Out[62]: 71191
  • TrainValidationSplit이나 CrossValidator API로도 분리 가능(이건 나중에 다룸)

전처리

#StringIndexer로 각 요일을 수치형으로 변환
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol('day_of_week').setOutputCol('day_of_week_index')
#OneHotEncoder로 index -> binary vector로 변환
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCol('day_of_week_index').setOutputCol('day_of_week_encoded')
from pyspark.ml import Pipeline
trans_pipeline = Pipeline().setStages([indexer, encoder, vector_asb])
  • 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
from pyspark.ml.feature import VectorAssembler
vector_asb = VectorAssembler().setInputCols(['UnitPrice', 'Quantity', 'day_of_week_encoded']).setOutputCol('features')

파이프라인 생성

fit_pipeline = trans_pipeline.fit(train_df)
trans_train = fit_pipeline.transform(train_df).cache()
display(trans_train.limit(5))
InvoiceNo	StockCode	Description	Quantity	InvoiceDate	UnitPrice	CustomerID	Country	day_of_week	day_of_week_index	day_of_week_encoded	features
537226	22811	SET OF 6 T-LIGHTS CACTI	6	2010-12-06T08:34:00.000+0000	2.95	15987.0	United Kingdom	2010-12-06	0.0	List(0, 42, List(0), List(1.0))	List(0, 44, List(0, 1, 2), List(2.95, 6.0, 1.0))
537226	21713	CITRONELLA CANDLE FLOWERPOT	8	2010-12-06T08:34:00.000+0000	2.1	15987.0	United Kingdom	2010-12-06	0.0	List(0, 42, List(0), List(1.0))	List(0, 44, List(0, 1, 2), List(2.1, 8.0, 1.0))
537226	22927	GREEN GIANT GARDEN THERMOMETER	2	2010-12-06T08:34:00.000+0000	5.95	15987.0	United Kingdom	2010-12-06	0.0	List(0, 42, List(0), List(1.0))	List(0, 44, List(0, 1, 2), List(5.95, 2.0, 1.0))
537226	20802	SMALL GLASS SUNDAE DISH CLEAR	6	2010-12-06T08:34:00.000+0000	1.65	15987.0	United Kingdom	2010-12-06	0.0	List(0, 42, List(0), List(1.0))	List(0, 44, List(0, 1, 2), List(1.65, 6.0, 1.0))
537226	22052	VINTAGE CARAVAN GIFT WRAP	25	2010-12-06T08:34:00.000+0000	0.42	15987.0	United Kingdom	2010-12-06	0.0	List(0, 42, List(0), List(1.0))	List(0, 44, List(0, 1, 2), List(0.42, 25.0, 1.0))
  • 모델의 하이퍼파라미터를 튜닝할 때, 캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하므로 빠르게 반복적으로 수행 가능

모델 적용

  • MLlib의 DataFrame API에서 제공하는 모든 알고리즘은 항상 두 가지 유형
    • [알고리즘명]: 학습 전 명칭
    • [알고리즘명]+Model: 학습 후 명칭
from pyspark.ml.clustering import KMeans,KMeansModel
kmeans= KMeans().setK(10).setSeed(1)
model = kmeans.fit(trans_train)
#학습 데이터셋에 대한 비용 계산
model.computeCost(trans_train)
Out[89]: 37504492.819086105
  • 군집 비용을 말하며, 각 군집 중심점과의 제곱거리의 합을 의미

저수준 API

  • 스파크의 거의 모든 기능은 RDD 기반으로 만들어짐
    • DataFrame 연산도 RDD 기반으로 만들어졌고, 매우 효율적인 분산 처리를 위해 저수준 명령으로 컴파일됨
  • RDD를 이용하면 파티션과 같은 물리적 실행 특성을 결정할 수 있다는 장점이 있음
    • 구조적 API보다 더 세밀한 제어 가능(근데 대부분은 구조적 API를 사용하는게 좋음)
    • 메모리에 저장된 원시 데이터를 병렬처리하는 데 RDD 사용
    • 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD를 사용해야함
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
Out[91]: DataFrame[_1: bigint]

SparkR

  • 스파크를 R언어로 사용하기 위한 기능
  • 파이썬 API와 유사
profile
Data Engineer

0개의 댓글