Spark 완벽 가이드 ch2. 스파크 간단히 살펴보기(종합예제)

Q·2023년 1월 9일
0

Spark 완벽 가이드

목록 보기
3/24

종합예제

#DataFrame의 스키마 정보를 알아내는 스키마 추론(inferSchema -> true)
#파일의 첫 row를 헤더로 지정(header -> true)
flightDate2015 = spark.read.option('inferSchema', 'true').option('header', 'true').csv('/FileStore/tables/flight-data/csv/2015_summary.csv')
  • 스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 row와 column을 가짐
  • row의 수를 알 수 없는 이유는, 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문
  • 스파크는 각 column의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽음
#Dataframe에서 csv파일을 읽어 로컬 배열이나 리스트 형태로 변환
flightDate2015.take(3)
Out[2]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

실행 계획 확인하기

  • explain메서드를 호출하면 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.
flightDate2015.sort('count').explain()
== Physical Plan == Sort [count#28 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(count#28 ASC NULLS FIRST, 200) +- *(1) FileScan csv [DEST_COUNTRY_NAME#26,ORIGIN_COUNTRY_NAME#27,count#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/flight-data/csv/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
  • 최종 결과는 가장 위에
  • 데이터 소스는 가장 아래에
  • 각 줄의 첫 번째 키워드(Sort, Exchange, FileScan)를 보면됨

실행 계획 시작하기

  • 액션을 호출함
  • 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성함
#셔플의 출력 파티션 수를 200 -> 5로 설정
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightDate2015.sort('count').take(2)
Out[4]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1), Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]
  • 사용자는 물리적 데이터를 직접 다루진 않음
  • 대신, 위처럼 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성을 제어할 수 있음

DataFrame과 SQL

  • 스파크는 언어에 상관없이 같은 방식으로 트랜스포메이션을 실행할 수 있다.
  • 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면, 스파크가 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획으로 컴파일함
  • 스파크 SQL을 사용하면, 모든 DataFrame을 테이블이나 뷰로 등록 후 SQL쿼리 사용
#createOrReplaceTempView메서드로 모든 DataFrame을 테이블이나 뷰로 만듦
flightDate2015.createOrReplaceTempView('flight_data_2015')

두 가지 실행 계획 비교

  • 위에서 뷰로 등록했으니 SQL로 데이터를 조회할 수 있음
  • 새로운 DataFrame을 반환하는 spark.sql메서드로 SQL쿼리 실행
    • spark는 SparkSession의 변수
    • DataFrame에 쿼리를 수행하면 새로운 DataFrame을 반환(변형시키는게 아님)

ex1)

#방법1
sqlWay = spark.sql('''
select dest_country_name, count(1)
from flight_data_2015
group by dest_country_name
''')
sqlWay.explain()
== Physical Plan == *(2) HashAggregate(keys=[dest_country_name#26], functions=[finalmerge_count(merge count#46L) AS count(1)#41L]) +- Exchange hashpartitioning(dest_country_name#26, 5) +- *(1) HashAggregate(keys=[dest_country_name#26], functions=[partial_count(1) AS count#46L]) +- *(1) FileScan csv [DEST_COUNTRY_NAME#26] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/flight-data/csv/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
#방법2
dataFrameWay = flightDate2015.groupby('dest_country_name').count()
dataFrameWay.explain()
== Physical Plan == *(2) HashAggregate(keys=[dest_country_name#26], functions=[finalmerge_count(merge count#58L) AS count(1)#53L]) +- Exchange hashpartitioning(dest_country_name#26, 5) +- *(1) HashAggregate(keys=[dest_country_name#26], functions=[partial_count(1) AS count#58L]) +- *(1) FileScan csv [DEST_COUNTRY_NAME#26] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/flight-data/csv/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
  • 근데 실행 계획을 보면 둘 다 똑같음

ex2)

  • max함수는 DataFrame의 특정 컬럼 값을 스캔하면서 이전 최댓값보다 더 큰 값을 찾음
  • max함수는 필터링을 수행해 단일 row를 결과로 반환하는 트랜스포메이션
#방법1
spark.sql('select max(count) from flight_data_2015').take(1)
Out[10]: [Row(max(count)=370002)]
#방법2
from pyspark.sql import functions as F
flightDate2015.select(F.max('count')).take(1)
Out[11]: [Row(max(count)=370002)]

ex3)

maxSql = spark.sql('''
                   select dest_country_name, sum(count) as destination_total
                   from flight_data_2015
                   group by dest_country_name
                   order by sum(count) desc
                   limit 5
                   ''')
maxSql.show()
+-----------------+-----------------+
dest_country_name|destination_total| 
+-----------------+-----------------+ 
United States| 411352| 
Canada| 8399| 
Mexico| 7140| 
United Kingdom| 2025| 
Japan| 1548| 
+-----------------+-----------------+
flightDate2015\
.groupBy('dest_country_name')\
.sum('count')\
.withColumnRenamed('sum(count)','destination_total')\
.sort(F.desc('destination_total'))\
.limit(5)\
.show()
+-----------------+-----------------+
dest_country_name|destination_total| 
+-----------------+-----------------+ 
United States| 411352| 
Canada| 8399| 
Mexico| 7140| 
United Kingdom| 2025| 
Japan| 1548| 
+-----------------+-----------------+
flightDate2015\
.groupBy('dest_country_name')\
.sum('count')\
.withColumnRenamed('sum(count)','destination_total')\
.sort(F.desc('destination_total'))\
.limit(5)\
.explain()
== Physical Plan == 
TakeOrderedAndProject(limit=5, orderBy=[destination_total#126L DESC NULLS LAST], 
output=[dest_country_name#26,destination_total#126L]) 
+- *(2) HashAggregate(keys=[dest_country_name#26], functions=[finalmerge_sum(merge sum#130L) AS sum(cast(count#28 as bigint))#122L])
+- Exchange hashpartitioning(dest_country_name#26, 5) 
+- *(1) HashAggregate(keys=[dest_country_name#26], functions=[partial_sum(cast(count#28 as bigint)) AS sum#130L]) 
+- *(1) FileScan csv [DEST_COUNTRY_NAME#26,count#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/flight-data/csv/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
  • 총 7단계가 있음
    • read
    • groupby
    • sum
    • columnrename
    • sort
    • limit
    • collect
  • 위 단계의 순서는 최적화로 인해 다를 수 있음
  • 그리고 각 단계는 불변성을 가진 신규 DataFrame을 생성함
profile
Data Engineer

0개의 댓글