종합예제
flightDate2015 = spark.read.option('inferSchema', 'true').option('header', 'true').csv('/FileStore/tables/flight-data/csv/2015_summary.csv')
- 스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 row와 column을 가짐
- row의 수를 알 수 없는 이유는, 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문
- 스파크는 각 column의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽음
flightDate2015.take(3)
Out[2]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]
실행 계획 확인하기
- explain메서드를 호출하면 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.
flightDate2015.sort('count').explain()
== Physical Plan == Sort [count
- 최종 결과는 가장 위에
- 데이터 소스는 가장 아래에
- 각 줄의 첫 번째 키워드(Sort, Exchange, FileScan)를 보면됨
실행 계획 시작하기
- 액션을 호출함
- 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성함
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightDate2015.sort('count').take(2)
Out[4]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1), Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]
- 사용자는 물리적 데이터를 직접 다루진 않음
- 대신, 위처럼 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성을 제어할 수 있음
DataFrame과 SQL
- 스파크는 언어에 상관없이 같은 방식으로 트랜스포메이션을 실행할 수 있다.
- 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면, 스파크가 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획으로 컴파일함
- 스파크 SQL을 사용하면, 모든 DataFrame을 테이블이나 뷰로 등록 후 SQL쿼리 사용
flightDate2015.createOrReplaceTempView('flight_data_2015')
두 가지 실행 계획 비교
- 위에서 뷰로 등록했으니 SQL로 데이터를 조회할 수 있음
- 새로운 DataFrame을 반환하는 spark.sql메서드로 SQL쿼리 실행
- spark는 SparkSession의 변수
- DataFrame에 쿼리를 수행하면 새로운 DataFrame을 반환(변형시키는게 아님)
ex1)
sqlWay = spark.sql('''
select dest_country_name, count(1)
from flight_data_2015
group by dest_country_name
''')
sqlWay.explain()
== Physical Plan == *(2) HashAggregate(keys=[dest_country_name
dataFrameWay = flightDate2015.groupby('dest_country_name').count()
dataFrameWay.explain()
== Physical Plan == *(2) HashAggregate(keys=[dest_country_name#26], functions=[finalmerge_count(merge count#58L) AS count(1)#53L]) +- Exchange hashpartitioning(dest_country_name#26, 5) +- *(1) HashAggregate(keys=[dest_country_name#26], functions=[partial_count(1) AS count#58L]) +- *(1) FileScan csv [DEST_COUNTRY_NAME#26] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/flight-data/csv/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
ex2)
- max함수는 DataFrame의 특정 컬럼 값을 스캔하면서 이전 최댓값보다 더 큰 값을 찾음
- max함수는 필터링을 수행해 단일 row를 결과로 반환하는 트랜스포메이션
spark.sql('select max(count) from flight_data_2015').take(1)
Out[10]: [Row(max(count)=370002)]
from pyspark.sql import functions as F
flightDate2015.select(F.max('count')).take(1)
Out[11]: [Row(max(count)=370002)]
ex3)
maxSql = spark.sql('''
select dest_country_name, sum(count) as destination_total
from flight_data_2015
group by dest_country_name
order by sum(count) desc
limit 5
''')
maxSql.show()
+-----------------+-----------------+
dest_country_name|destination_total|
+-----------------+-----------------+
United States| 411352|
Canada| 8399|
Mexico| 7140|
United Kingdom| 2025|
Japan| 1548|
+-----------------+-----------------+
flightDate2015\
.groupBy('dest_country_name')\
.sum('count')\
.withColumnRenamed('sum(count)','destination_total')\
.sort(F.desc('destination_total'))\
.limit(5)\
.show()
+-----------------+-----------------+
dest_country_name|destination_total|
+-----------------+-----------------+
United States| 411352|
Canada| 8399|
Mexico| 7140|
United Kingdom| 2025|
Japan| 1548|
+-----------------+-----------------+
flightDate2015\
.groupBy('dest_country_name')\
.sum('count')\
.withColumnRenamed('sum(count)','destination_total')\
.sort(F.desc('destination_total'))\
.limit(5)\
.explain()
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total
output=[dest_country_name
+- *(2) HashAggregate(keys=[dest_country_name
+- Exchange hashpartitioning(dest_country_name
+- *(1) HashAggregate(keys=[dest_country_name
+- *(1) FileScan csv [DEST_COUNTRY_NAME
- 총 7단계가 있음
- read
- groupby
- sum
- columnrename
- sort
- limit
- collect
- 위 단계의 순서는 최적화로 인해 다를 수 있음
- 그리고 각 단계는 불변성을 가진 신규 DataFrame을 생성함