path = '/FileStore/tables/bin/2010-summary.parquet'
df = spark.read.parquet(path)
display(df.limit(10))
DEST_COUNTRY_NAME | ORIGIN_COUNTRY_NAME | count |
---|---|---|
United States | Romania | 1 |
United States | Ireland | 264 |
United States | India | 69 |
Egypt | United States | 24 |
Equatorial Guinea | United States | 1 |
United States | Singapore | 25 |
United States | Grenada | 54 |
Costa Rica | United States | 477 |
Senegal | United States | 29 |
United States | Marshall Islands | 44 |
df.repartition(2).groupby('dest_country_name').count().explain()
== Physical Plan ==
*(3) HashAggregate(keys=[dest_country_name#330], functions=[finalmerge_count(merge count#348L) AS count(1)#343L])
+- Exchange hashpartitioning(dest_country_name#330, 200)
+- *(2) HashAggregate(keys=[dest_country_name#330], functions=[partial_count(1) AS count#348L])
+- Exchange RoundRobinPartitioning(2)
+- *(1) FileScan parquet [DEST_COUNTRY_NAME#330] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/FileStore/tables/bin/2010-summary.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
df.repartition(2).groupby('dest_country_name').count().collect()
Out[4]: [Row(dest_country_name='Russia', count=1),
Row(dest_country_name='Anguilla', count=1),
Row(dest_country_name='Paraguay', count=1),
Row(dest_country_name='Senegal', count=1),
Row(dest_country_name='Sweden', count=1),
Row(dest_country_name='Kiribati', count=1),
Row(dest_country_name='Guyana', count=1),
Row(dest_country_name='Philippines', count=1), Row(dest_country_name='Singapore', count=1),
Row(dest_country_name='Malaysia', count=1),
Row(dest_country_name='Fiji', count=1),
Row(dest_country_name='Turkey', count=1),
Row(dest_country_name='Germany', count=1), Row(dest_country_name='Afghanistan', count=1),
Row(dest_country_name='Jordan', count=1),
Row(dest_country_name='Palau', count=1),
Row(dest_country_name='France', count=1),
Row(dest_country_name='Turks and Caicos Islands', count=1), Row(dest_country_name='Greece', count=1),
Row(dest_country_name='Taiwan', count=1),
Row(dest_country_name='British Virgin Islands', count=1), Row(dest_country_name='Dominica', count=1),
Row(dest_country_name='Equatorial Guinea', count=1), Row(dest_country_name='Slovakia', count=1),
Row(dest_country_name='Argentina', count=1),
Row(dest_country_name='Angola', count=1),
Row(dest_country_name='Belgium', count=1),
Row(dest_country_name='Qatar', count=1),
Row(dest_country_name='Ecuador', count=1),
Row(dest_country_name='Finland', count=1),
Row(dest_country_name='Ghana', count=1),
Row(dest_country_name='Nicaragua', count=1),
Row(dest_country_name='Peru', count=1),
Row(dest_country_name='China', count=1),
Row(dest_country_name='India', count=1),
Row(dest_country_name='Curacao', count=1),
Row(dest_country_name='United States', count=131), Row(dest_country_name='Malta', count=1),
Row(dest_country_name='Kuwait', count=1),
Row(dest_country_name='Marshall Islands', count=1), Row(dest_country_name='Chile', count=1),
Row(dest_country_name='Martinique', count=1),
Row(dest_country_name='Cayman Islands', count=1), Row(dest_country_name='Nigeria', count=1),
Row(dest_country_name='Bolivia', count=1),
Row(dest_country_name='Italy', count=1),
Row(dest_country_name='Suriname', count=1),
Row(dest_country_name='Netherlands Antilles', count=1), Row(dest_country_name='Norway', count=1),
Row(dest_country_name='Spain', count=1),
Row(dest_country_name='Cuba', count=1),
Row(dest_country_name='Guadeloupe', count=1),
Row(dest_country_name='Denmark', count=1),
Row(dest_country_name='Barbados', count=1),
Row(dest_country_name='Ireland', count=1),
Row(dest_country_name='Thailand', count=1),
Row(dest_country_name='Morocco', count=1),
Row(dest_country_name='Panama', count=1),
Row(dest_country_name='Cape Verde', count=1),
Row(dest_country_name='Hong Kong', count=1),
Row(dest_country_name='Ukraine', count=1),
Row(dest_country_name='Venezuela', count=1),
Row(dest_country_name='Israel', count=1),
Row(dest_country_name='Saint Barthelemy', count=1), Row(dest_country_name='Iceland', count=1),
Row(dest_country_name='Saint Kitts and Nevis', count=1),
Row(dest_country_name='French Polynesia', count=1),
Row(dest_country_name='South Korea', count=1),
Row(dest_country_name='Cyprus', count=1),
Row(dest_country_name='Bonaire, Sint Eustatius, and Saba', count=1),
Row(dest_country_name='Uruguay', count=1),
Row(dest_country_name='Mexico', count=1),
Row(dest_country_name='Aruba', count=1),
Row(dest_country_name='Georgia', count=1),
Row(dest_country_name='Saint Vincent and the Grenadines', count=1),
Row(dest_country_name='The Bahamas', count=1),
Row(dest_country_name='Guatemala', count=1),
Row(dest_country_name='Azerbaijan', count=1),
Row(dest_country_name='Sint Maarten', count=1),
Row(dest_country_name='Grenada', count=1),
Row(dest_country_name='Federated States of Micronesia', count=1),
Row(dest_country_name='Liberia', count=1),
Row(dest_country_name='Honduras', count=1),
Row(dest_country_name='Trinidad and Tobago', count=1),
Row(dest_country_name='Saudi Arabia', count=1),
Row(dest_country_name='Uganda', count=1),
Row(dest_country_name='French Guiana', count=1),
Row(dest_country_name='Switzerland', count=1),
Row(dest_country_name='Ethiopia', count=1),
Row(dest_country_name='Latvia', count=1),
Row(dest_country_name='Jamaica', count=1),
Row(dest_country_name='United Arab Emirates', count=1),
Row(dest_country_name='Saint Lucia', count=1),
Row(dest_country_name='Canada', count=1),
Row(dest_country_name='Kyrgyzstan', count=1),
Row(dest_country_name='Samoa', count=1),
Row(dest_country_name='Czech Republic', count=1),
Row(dest_country_name='Cook Islands', count=1),
Row(dest_country_name='Brazil', count=1),
Row(dest_country_name='Belize', count=1),
Row(dest_country_name='Antigua and Barbuda', count=1),
Row(dest_country_name='Dominican Republic', count=1),
Row(dest_country_name='Japan', count=1),
Row(dest_country_name='Luxembourg', count=1),
Row(dest_country_name='New Zealand', count=1),
Row(dest_country_name='Greenland', count=1),
Row(dest_country_name='Haiti', count=1),
Row(dest_country_name='Poland', count=1),
Row(dest_country_name='Portugal', count=1),
Row(dest_country_name='Australia', count=1),
Row(dest_country_name='Bulgaria', count=1),
Row(dest_country_name='Austria', count=1),
Row(dest_country_name='Egypt', count=1),
Row(dest_country_name='Costa Rica', count=1),
Row(dest_country_name='Kazakhstan', count=1),
Row(dest_country_name='El Salvador', count=1),
Row(dest_country_name='South Africa', count=1),
Row(dest_country_name='Bermuda', count=1),
Row(dest_country_name='Bahrain', count=1),
Row(dest_country_name='Colombia', count=1),
Row(dest_country_name='Hungary', count=1),
Row(dest_country_name='Pakistan', count=1),
Row(dest_country_name='United Kingdom', count=1),
Row(dest_country_name='Vietnam', count=1),
Row(dest_country_name='Netherlands', count=1)]
집계 연산 전에 파티션 수를 증가시키면 태스크별로 처리할 키 수를 줄일 수 있음
익스큐터의 메모리를 증가시킨 경우 데이터가 많은 키를 처리하는 익스큐터는 여전히 느릴 수 있음
집계 처리가 끝나고 이어서 실행되는 태스크가 느리면 집계 처리된 데이터셋에 불균형 현상이 남아 있음을 의미함
모든 필터와 select 구문이 집계 연산보다 먼저 처리된다면 필요한 데이터만 이용해서 집계 연산 수행 가능
null값을 나타내기 위해 " " 또는 "EMPTY" 같은 값을 대체 값으로 사용하는건 아닌지 확인함
collect_list, collect_set 등은 일치하는 모든 객체를 드라이버에 전송하므로 아주 느리게 동작하므로 성능이 중요하면 사용을 피하자
많은 조인 연산은 다른 조인 타입으로 변경해 최적화(자동 또는 수동 방식)할 수 있음
조인 순서를 변경하면서 잡의 처리 속도가 올라가는지 테스트함
조인을 수행하기 전에 데이터셋을 분할하면 클러스터 노드 간 데이터 이동을 줄일 수 있음
데이터 치우침 현상은 느린 조인을 유발할 수 있음
모든 필터와 select 구문이 조인 연산보다 우선 처리되도록 한다.(구조적 API는 자동 최적화)
조인 대상 테이블 중 하나가 작은 경우 강제로 브로드캐스트하거나(8장 참고) 스파크의 통계 수집 명령을 사용해 테이블 분석
스파크의 투기적 실행(spark.speculation 속성을 true로)을 사용하면 느린 읽기와 쓰기 속도를 개선하는 데 도움이 될 수 있음
스파크 클러스터와 저장소 시스템 간의 네트워크 대역폭이 충분하지 않을 수 있으므로 네트워크 성능에 문제가 없는지 확인함
단일 클러스터에서 스파크 HDFS같은 분산 파일 시스템을 함께 구성하려면 클러스터의 노드마다 스파크와 분산 파일 시스템 모두 동일한 호스트명을 인식하는지 확인함
사용자 코드에서 collect 메서드 같은 연산을 실행해 너무 큰 데이터셋을 드라이버에 전송하려고 시도했을 수 있음
브로드캐스트하기에 너무 큰 데이터를 브로드캐스트 조인에 사용한 경우
장시간 실행되는 애플리케이션은 드라이버에 많은 양의 객체를 생성해 해제하지 못할 수 있음
가능하면 더 많은 데이터를 다룰 수 있도록 드라이버의 가용 메모리를 증가시킴
JVM 메모리 부족 현상은 파이썬 같은 다른 언어를 함께 사용하는 경우에 발생 가능
SQL JDBC 서버와 노트북 환경을 이용해 다른 사용자와 SparkContext를 공유하는 상황이라면 여러 사용자가 동시에 대량의 데이터를 드라이버 메모리로 전송할 수 있는 명령을 막아둬야함
익스큐터의 가용 메모리와 익스큐터 수를 증가시킴
관련 파이썬 설정을 변경해 PySpark 워커의 크기를 증가시킴
익스큐터 로그에 가비지 컬렉션 오류 메시지가 발생했는지 확인함
null값을 정확하게 제어하기 위해 ' ' 또는 'EMPTY' 같은 값을 기본값으로 사용하는 것은 아닌지 확인함
가능하면 UDF 사용을 줄이고 스파크의 구조적 API를 더 많이 사용함
자바의 jmap 도구를 사용해 익스큐터 힙 메모리의 히스토그램을 보고 가장 많은 메모리를 사용하는 클래스를 확인함
키-값 저장소 같이 다른 워크로드를 처리하는 노드에 익스큐터가 위치한다면 스파크 잡을 다른 작업과 분리해야함
비즈니스 로직을 변경하지 않았다면 데이터 포맷이 변경되었을 수 있음 (즉, 이전에 정상 동작했던 코드가 더는 동작하지 않는 상황)
어큐뮬레이터를 사용해 레코드나 특정 데이터 타입의 수를 확인할 수 있음
트랜스포메이션이 실제 유효한 쿼리 실행 계획을 생성하는지 확인함
더 많은 디스크 공간을 확보하면 됨
제한된 용량의 저장소를 가진 클러스터를 사용하는 경우, 데이터 치우침 현상이 발생하면 일부 노드의 저장소 공간이 모두 소진될 수 있음
몇 가지 저장소 설정을 실험함
문제가 되는 머신의 오래된 로그 파일과 셔플 파일을 수동으로 제거함