Dynamically optimizing skew join.
- Skew 파티션으로 인한 성능 문제를 해결하기 위함.
- 한두 개의 오래 걸리는 테스크들로 인한 전체 Job/Stage 종료 지연 방지.
- 먼저 skew 파티션의 존재 여부 파악 -> skew 파티션을 작게 나눔 -> 상대 조인 파티션을 중복하여 만들고 조인 수행.
- spark.sql.adaptive.skewJoin.skewedPartitionFactor (기본값: 5)
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (기본값: 256MB)
- 위 두 조건으로 스큐 파티션 여부를 판단함. (파티션 크다/작다 기준)
- spark.sql.adaptive.skewJoin.enabled (디폴트: 트루)
실습.
pyspark --driver-memory 2g --executor-memory 2g
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", False)
df_f = spark.sql("""
SELECT date, sum(quantity * price) AS total_sales FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
""")
df_f.count()
: lazy 실행이기에 메모리로 불러오기 위해 키운트 호출.spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
: aqe 사용.df_t = spark.sql("""
SELECT date, sum(quantity * price) AS total_sales FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
""")
df_t.count()
SQL 정리.
CREATE TABLE items
USING parquet
AS
SELECT id,
CAST(rand() * 1000 AS INT) <!-- AS price # 0~999의 값을 갖는 price 칼럼 -->
FROM RANGE(30000000); <!-- 총 3천만 개의 레코드 생성 -->
CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS item_id, CAST(rand() * 100 AS INT) AS quantity, <!-- 일부러 80%는 100이라는 값을 갖도록 skew 생성. -->
DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS date FROM RANGE(1000000000); <!-- 10억 개의 레코드 -->
<!-- shuffling 후에 coalescing하는 예시. (파티션 수 줄이기) -->
SELECT date, sum(quantity) AS q
FROM sales
GROUP BY 1
ORDER BY 2 DESC;
<!-- 조인 전략. 동적으로 리플래닝하여 통계정보를 알고 브로드캐스트 조인함. -->
SELECT date, sum(quantity * price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
WHERE price < 10
GROUP BY 1 ORDER BY 2 DESC;
<!-- skew join 예시. 드라이버와 엑시큐터 메모리가 더 필요했음. -->
SELECT date, sum(quantity * price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
AQE가 나오기 전 data skew 처리 방법: salting.
환경 변수.
- spark.sql.adaptive.enabled (True)
- spark.sql.shuffle.partitions (200)
- spark.sql.files.maxPartitionBytes (128MB)
- spark.sql.files.openCostInBytes (4MB)
- spark.sql.files.minPartitionNum (default parallelism)
- spark.default.parallelism
spark.sql.shuffle.partitions.
- 클러스터 자원과 처리 데이터의 크기를 고려하여 잡마다 바꿔서 설정.
- 큰 데이터를 처리하는 거살면 클러스터 전체 코어의 수로 설정.
- AQE를 사용하는 관점에서는 조금 더 크게 잡는 것이 좋음.
Salting.
- Skew Partition을 처리하기 위한 테크닉.
- AQE의 등장 이후로는 잘 사용되지 않음.
- 랜덤 필드를 만들고 그 기준으로 파티션을 새로 만들어서 처리함.
Spill이란,
- 파티션의 크기가 너무 커서 메모리가 부족한 경우 그 여분을 디스크에 쓰는 것.
- Spill이 발생하는 경우 (메모리가 부족해지는 경우)
.
- Skew Partition 대상 Aggregation.
- Skew Partition 대상 JOIN.
- 굉장히 큰 explode 작업.
- 큰 파티션이라면 위의 작업 시 spill 가능성이 더 높아짐.
- 종류)
.
- spill (memory) : 디스크로 spill된 데이터가 메모리에 있을 때의 크기. (압축 해제된 상태라 용량이 더 큼.)
- spill (disk) : 메모리에서 spill된 데이터가 디스크에서 차지하는 크기. (압축된 상태라 용량이 더 작음.)
요약.
- Repartition과 Coalesce를 사용한 파티션 크기와 분포 조절.
- AQE (Adaptive Query Execution) 소개와 데모. (최적화 케이스: 1.셔플링 후 coalesce로 파티션 크기 축소 2.동적 JOIN(브로드캐스트 조인) 3.Skew 조인)
- Salting을 통한 Data Skew 처리와 Spill
세 종류의 파티션.
- 입력 데이터를 로드할 때의 파티션.
- 셔플링 후 만들어지는 파티션.
- 데이터를 최종적으로 저장할 때의 파티션.
- 파티션의 크기는 128MB ~ 1GB가 좋음.
입력 데이터를 로드할 때 파티션의 수와 크기.
- 기본적으로 파티션의 최대 크기(spark.sql.files.maxPartitionBytes)에 의해 결정.
- 결국 해당 데이터가 어떻게 저장되는 지와 연관이 많이 됨.
-
- 파일 포맷과 압축 여부 등.
- 기타 Spark 환경 변수.
입력 데이터 파티션 수와 크기를 결정하는 변수들.
- bucketBy로 저장된 데이터를 읽는 경우 (버킷의 수와 버킷 기준 칼럼과 정렬 기준 칼럼)
- 읽어들이는 데이터 파일이 splitable한지. (parquet/avro: 항상 splitable, json/csv: 경우에 따라 다름)
- 입력 데이터의 전체 크기 (입력 데이터를 구성하는 파일의 수).
- 내가 리소스 매니저에게 요청한 CPU의 수.
- spark.sql.files.maxPartitionBytes (128MB)
- spark.sql.files.openCostInBytes (4MB)
- spark.sql.files.minPartitionNum
- spark.default.parallelism
절차.
- bytesPerCore = (데이터파일들 전체 크기 + 파일 수*OpenCostInBytes) / default.parallelism.
- maxSplitBytes = Min(maxPartitionBytes, bytesPerCore)
- Splitable하다면 maxSplitBytes 단위로 분할하여 File Chunk 생성. (else 크기가 maxSplitBytes보다 작다면 하나의 File Chunk로 생성.)
- 위에서 만들어진 File Chunk들로부터 파티션 생성.
- 한 파티션은 하나 혹은 그 이상의 File Chunk들로 구성됨.
- 정리) maxSplitBytes 결정 -> File Chunk 생성 -> File Chunk로부터 파티션 생성.
입력 데이터 파티션 수와 크기 설정 예제 1.
- 파일 수 : 50
- 파일 포맷 : parquet
- 파일 크기 : 65MB
- Spakr App에 할당된 코어 수 : 10
- spark.sql.files.maxPartitionBytes : 128MB
- spark.default.parallelism : 10
- spark.sql.files.openCostInBytes : 4MB
- bytesPerCore = (5065MB + 504MB) / 10 = 345MB.
- maxSplitBytes = Min(128MB, 345MB) = 128MB.
- 최종 파티션 수 : 50. (50개의 파일 청크 생성 -> 128MB에는 (65MB+4MB)가 하나만 들어가기에 각 파일 청크당 하나의 파티션 생성.)
입력 데이터 파티션 수와 크기 설정 예제 2.
- 파일 수 : 50
- 파일 포맷 : parquet
- 파일 크기 : 63MB (아마 압축된 크기일 것.)
- Spakr App에 할당된 코어 수 : 10
- spark.sql.files.maxPartitionBytes : 128MB
- spark.default.parallelism : 10
- spark.sql.files.openCostInBytes : 4MB
- bytesPerCore = (5063MB + 504MB) / 10 = 335MB.
- maxSplitBytes = Min(128MB, 335MB) = 128MB.
- 최종 파티션 수 : 50. (50개의 파일 청크 생성 -> 128MB에는 (63MB+4MB)가 하나만 들어가기에 각 파일 청크당 하나의 파티션 생성.)
입력 데이터 파티션 수와 크기 설정 예제 3.
- 파일 수 : 50
- 파일 포맷 : parquet
- 파일 크기 : 40MB
- Spakr App에 할당된 코어 수 : 10
- spark.sql.files.maxPartitionBytes : 128MB
- spark.default.parallelism : 10
- spark.sql.files.openCostInBytes : 4MB
- bytesPerCore = (5040MB + 504MB) / 10 = 220MB.
- maxSplitBytes = Min(128MB, 220MB) = 128MB.
- 최종 파티션 수 : 17. (50개의 파일 청크 생성 -> 128MB에는 (40MB+4MB+40MB+4MB+40MB)가 들어갈 수 있음.)
입력 데이터 파티션 수와 크기 설정 예제 4.
- 파일 수 : 10
- 파일 포맷 : csv (splitable하지 않은 gzip으로 압축.)
- 파일 크기 : 335MB
- Spakr App에 할당된 코어 수 : 10
- spark.sql.files.maxPartitionBytes : 128MB
- spark.default.parallelism : 10
- spark.sql.files.openCostInBytes : 4MB
- bytesPerCore = (10335MB + 104MB) / 10 = 339MB.
- maxSplitBytes = Min(128MB, 339MB) = 128MB.
- 최종 파티션 수 : 10. (10개의 파일 청크 생성 -> 128MB에는 (335MB + 4MB)가 1개 이상 들어갈 수 없음.)
- CSV나 JSON 파일의 경우 splitable하지 않게 압축된 경우가 많으므로 되도록 parquet/avro 활용.
입력 데이터 파티션 수와 크기 설정 예제 5.
- 파일 수 : 10
- 파일 포맷 : csv (splitable한 bzip2로 압축)
- 파일 크기 : 311MB
- Spakr App에 할당된 코어 수 : 10
- spark.sql.files.maxPartitionBytes : 128MB
- spark.default.parallelism : 10
- spark.sql.files.openCostInBytes : 4MB
- bytesPerCore = (10311MB + 104MB) / 10 = 315MB.
- maxSplitBytes = Min(128MB, 315MB) = 128MB.
- 최종 파티션 수 : 25. (103(128짜리 20개, 55짜리 10개)개의 파일 청크 생성 -> 128에는 그대로, 55는 2 + 4MB*2 하여 118짜리로.)
입력 데이터 파티션 수와 크기 설정 예제 6.
- 파일 수 : 100
- 파일 포맷 : csv (splitable하지 않은 gzip으로 압축.)
- 파일 크기 : 10MB
- Spakr App에 할당된 코어 수 : 10
- spark.sql.files.maxPartitionBytes : 128MB
- spark.default.parallelism : 10
- spark.sql.files.openCostInBytes : 4MB
- bytesPerCore = (10010MB + 1004MB) / 10 = 140MB.
- maxSplitBytes = Min(128MB, 140MB) = 128MB.
- 최종 파티션 수 : 12. (100개의 파일 청크 생성 -> 128MB에는 (14MB*9)가 들어갈 수 있음.)
입력 데이터 파티션 수와 크기 결정 방식 요약.
- bytesPerCore -> maxSplitBytes -> File Chunk -> Partitioning.
- 코어별로 처리해야 할 크기가 너무 크다면 maxPartitionBytes로 제약.
- 데이터의 크기가 적당하면 코어 수만큼의 파티션을 생성.
- 데이터의 크기가 너무 작으면 openCostInBytes로 설정한 크기로 파티션 생성.
Bucketing 소개.
- 데이터를 자주 사용되는 칼럼 기준으로 미리 저장해 두고 활용.
- 다양한 최적화 가능.
- 버킷 정보가 metastore에 저장되고 Spark Complier는 이것을 활용함.
- 버킷의 수 * 파티션의 수만큼의 파일이 만들어 짐.
- 다시 읽어들일 때 원래 파티션의 수만큼으로 재구성됨.
- 버켓팅 키를 기반으로 작업 시 셔플이 없어짐.
spark = SparkSession.builder...
df1 = spark.range(1, 10 ** 6)
print(df1.rdd.getNumPartitions())
df1.write.format('parquet').mode('overwrite').bucketBy(
100, 'id').saveAsTable('spark_bucket_table1')
df2 = spark.range(1 10 ** 5)
print(df2.rdd.getNumPartitions())
df2.write.format('parquet').mode('overwrite').bucketBy(
100, 'id').saveAsTable('spark_bucket_table2')
> small file 이슈.
- 64MB 파일을 하나 읽는 것 vs. 64Byte의 파일을 백만 개 읽는 것.
- 파일 시스템 접근 관련 오버헤드 발생.
- 읽어들이면서 파티션의 수를 줄일 수 있지만 오버헤드가 큼.
> 데이터를 저장할 때 파티션 수와 크기.
- 기본 방식.
- bucketBy.
- partitionBy.
- 파일 크기를 레코드 수로도 제어 가능. (spark.sql.files.maxRecordsPerFile)
> 기본 방식.
- 각 파티션이 하나의 파일로 쓰여짐.
- saveAsTable vs. save
- 적당한 크기와 수의 파티션을 찾는 것이 중요.
- Repartition 혹은 coalesce를 적절히 사용.
- parquet 포맷 사용.
> bucketBy 방식.
- 데이터 특성을 잘 아는 경우 특정 ID를 기준으로 나눠서 테이블로 저장.
- 다음에 재사용 시 이를 로딩하여 반복 처리 시 시간 단축.
- 버킷 수와 ID 지정.
- df.write.mode('overwrite').bucketBy(3, key).saveAsTable(table)
- metastore에 정보가 같이 저장됨.
> partitionBy 방식.
- 굉장히 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기를 많이 한다면?
-
- 데이터 자체를 연도-월-일의 폴더 구조로 저장.
- 이를 통해 데이터 읽기 과정을 최적화.
- 데이터 관리의 유용함.
- 하지만 Partition Key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨.
- partitioning할 키를 지정해야 함.
-
- df.write.mode('overwrite').partitionBy('order_month').saveAsTable('order')
- df.write.mode('overwrite').partitionBy('year', 'month', 'day').saveAsTable('appl_stock')
> buckeyBy & partitionBy.
- df.write.mode('overwrite').partitionBy('dept').bucketBy(5, 'employeeId')