📖 학습주제
대용량 데이터 훈련 대비 Spark, SparkML 실습 (3)
Dynamically optimizing skew joins
Dynamically optimizing skew joins
왜 필요한가?
- Skew 파티션으로 인한 성능 문제를 해결하기 위함
- 한 두개의 오래 걸리는 태스크들로 인한 전체 Job/Stage 종료 지연
- 이 때 disk spill이 발생한다면 더 느려지게 됨
AQE의 해법
- 먼저 skew 파티션의 존재 여부 파악
- 다음으로 skew 파티션을 작게 나누고
- 다음으로 상대 조인 파티션을 중복하여 만들고 조인 수행
Dynamically optimizing skew joins 동작방식

example


Dynamically optimizing skew joins 환경 변수들 (3.3.1)
| 환경 변수 이름 | 기본값 | 설명 |
|---|
spark.sql.adaptive.skewJoin.enabled | True | 이 값과 spark.sql.adaptive.enabled가 true인 경우, Sort Merge Join 수행시 skew된 파티션들을 먼저 크기를 줄이고 난 다음에 조인 수행 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | 파티션의 크기가 중간 파티션 크기보다 5배 이상크고 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes의 값보다 크면 skew 파티션으로 간주 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | 파티션의 크기가 이 값보다 크고 중간 파티션 크기보다 5배 이상 크면 skew 파티션으로 간주. 이 값은 spark.sql.adaptive.advisoryPartitionSizeInBytes보다 커야함. |
Salting을 통한 Data Skew 처리
파티션 관련 환경 설정 변수들 (3.3.1)
| 환경 변수 이름 | 기본값 | 설명 |
|---|
spark.sql.adaptive.enabled | True | Spark 3.2부터 기본으로 적용되기 시작 (처음 도입은 1.6) |
spark.sql.shuffle.partitions | 200 | Shuffle 후 만들어질 파티션의 기본 수 |
spark.sql.files.maxPartitionBytes | 128MB | 파일 기반 데이터 소스(Parquet, JSON, ORC 등등)에서 데이터프레임을 만들 때 한 파티션의 최대 크기 |
spark.sql.files.openCostInBytes | 4MB | 파일 기반 데이터소스에서만 의미가 있으며 입력 데이터를 읽기 위해 몇 개의 파티션을 사용할지 결정하는데 사용됨. 대부분의 경우 별 의미 없는 변수이지만 기본적으로 코어별 입력 데이터 크기를 계산했을 때 이 변수의 값보다 작은 경우 파티션의 크기는 이 크기로 맞춰서 만들어짐 참고 : https://stackoverflow.com/questions/70985235/what-is-opencostinbytes |
spark.sql.files.minPartitionNum | Default parallelism | 파일 기반 데이터소스에서만 의미가 있으며 보장되지는 않지만 파일을 읽어들일 때 최소 몇 개의 파티션을 사용할지 결정하는 변수. 설정되어 있지 않다면 기본값은 spark.default.parallelism과 동일 |
spark.default.parallelism | 256MB | RDD를 직접 조작하는 경우 의미가 있는 변수로 shuffling 후 파티션의 수를 결정. 단순하게 이야기하자면 클러스터 내 총 코어의 수에 해당. |
spark.sql.shuffle.partitions
- 클러스터 자원과 처리 데이터의 크기를 고려해서 잡마다 바꿔서 설정
- 큰 데이터를 처리하는 거라면 클러스터 전체 코어의 수로 설정
- AQE를 사용하는 관점에서는 조금더 크게 잡는 것이 좋음
Salting
- Skew Partition을 처리하기 위한 테크닉
- AQE의 등장으로 인해 그렇게 많이 쓰이지 않음
◦ 단 AQE만으로 skew partition 이슈가 사라지지 않는다면 사용이 필요할 수도 있음
- 랜덤 필드를 만들고 그 기준으로 파티션을 새로 만들어서 처리
- Aggregation 처리의 경우에는 효과적일 수 있음
- Join의 경우에는 그리 효과적이지 않음
◦ AQE에 의존하는 것이 좋음
◦ 사실상 Join시 사용 Salting 테크닉을 일반화한 것이 AQE의 skew JOIN 처리방식
example
Aggregation에 사용

Skew JOIN에 사용-1

Skew JOIN에 사용-2
SELECT date, sum(quantity * price) AS total_sales
FROM (
SELECT *, CASE WHEN item_id = 100 THEN FLOOR(RAND()*20) ELSE 1 END AS salt
FROM sales
) s
JOIN (
SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) AS salt
FROM items
WHERE id = 100
UNION
SELECT *, 1 AS salt
FROM items
WHERE id <> 100
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC
Spill
- 파티션의 크기가 너무 커서 메모리가 부족한 경우 그 여분을 디스크에 쓰는 것
- Spill이 발생하면 실행시간이 늘어나고 OOM이 발생할 가능성이 올라감
- Spill이 발생하는 경우 (즉 메모리가 부족해지는 경우)
- Skew Partition 대상 Aggregation
- Skew Partition 대상 JOIN
- 굉장히 큰 explode 작업
- 큰 파티션(spark.sql.files.maxPartitionBytes)이라면 위의 작업시 spill 가능성이 더 높아짐
Spill의 종류
Spill (memory)
- 디스크로 spill된 데이터가 메모리에 있을 때의 크기
- Deserialized 형태라 크기가 보통 8-10배 정도 더 큼
Spill (disk)
- 메모리에서 spill된 데이터가 디스크에서 차지하는 크기
- Serialized 형태라 보통 크기가 훨씬 더 작음
All about Partitions
Partition의 종류 (Life cycle of Partitions)
- 입력 데이터를 로드할 때 파티션 수와 크기
- 셔플링 후 만들어지는 파티션 수와 크기
- 데이터를 최종적으로 저장할 때 파티션 수와 크기
파티션의 크기는 128MB ~ 1GB가 좋음
입력 데이터를 로드할 때 파티션 수와 크기

- 기본적으로는 파티션의 최대 크기에 의해 결정
- spark.sql.files.maxpartitionbytes (128MB)
- 해당 데이터가 어떻게 저장되었는지와 연관이 많이 됨
- 파일포맷이 무엇이고 압축되었는지? 압축되었다면 무슨 알고리즘으로?
◦ Splittable한가 -> 한 큰 파일을 다수의 파티션으로 나눠 로드할 수 있는가?
- 기타 관련 Spark 환경변수들
입력 데이터 파티션 수와 크기를 결정해주는 변수들
bucketBy로 저장된 데이터를 읽는 경우
- Bucket의 수와 Bucket 기준 컬럼들과 정렬 기준 컬럼들
읽어들이는 데이터 파일이 splittable한지?
- PARQUET/AVRO등이 좋은 이유가 됨: 항상 splittable
- JSON/CSV등의 경우 한 레코드가 multi-line이라면 splittable하지 않음
- Single line이라도 압축시 bzip2를 사용해야만 splittable
입력 데이터의 전체 크기 (모든 파일크기의 합)
내가 리소스 매니저에게 요청한 CPU의 수
- executor의 수 x executor 별 CPU 수
입력 데이터 파티션 수와 크기 결정 방식
- 먼저 아래 공식으로 maxSplitBytes를 결정
- bytesPerCore = (데이타파일들 전체크기+파일수*OpenCostInBytes) / default.parallelism
- maxSplitBytes = Min(maxPartitionBytes, bytesPerCore)
= Min(maxPartitionBytes, Max(bytesPerCore, OpenCostInBytes))
- 입력 데이터를 구성하는 각 파일에 대해 다음을 진행
- Splittable하다면 maxSplitBytes 단위로 분할하여 File Chunk 생성
- Splittable하지 않거나 크기가 maxSplitBytes보다 작다면 하나의 File chunk 생성
- 다음으로 위에서 만들어진 File Chunk들로부터 파티션 생성
- 기본적으로 한 파티션은 하나 혹은 그 이상의 File Chunk들로 구성
- 한 파티션에 다음 File Chunk의 크기 + openCostInBytes를 더했을 때 이 값이 maxSplitBytes를 넘어가지 않을 때까지 계속해서 머지함 (파일들이 하나의 파티션으로 패킹됨)
Spark Scala 코드 : maxSplitBytes

Spark Scala 코드 : 파티션 생성 코드

요약
- 코어별로 처리해야할 크기가 너무 크다면 maxPartitionBytes로 제약
- 데이터의 크기가 적당하면 코어수 만큼의 파티션를 생성
- 데이터의 크기가 너무 작으면 openCostInBytes로 설정한 크기로 파티션 생성
Bucketing (bucketBy)
- 데이터를 자주 사용되는 컬럼 기준으로 미리 저장해두고 활용
- 다양한 최적화 가능
- 조인 대상 테이블들이 조인 키를 갖고 bucketing된 경우 shuffle free join 가능
- 한쪽만 bucketing되어있는 경우 one-side shuffle free join 가능 (bucket의 크기에 달림)
- Bucket pruning를 통한 최적화 가능
- Shuffle free aggregation
- Bucket 정보가 metastore에 저장되고 Spark Compiler는 이를 활용
- 이 경우 sortBy를 통해 순서를 미리 정해주기도 함
- Spark 테이블로 저장하고 로딩해야지만 이 정보를 이용 가능함
◦ aveAsTable, spark.table()
Bucketing (bucketBy) 저장 방식
- Bucket의 수 x Partition의 수 만큼의 파일이 만들어짐
- e.g.) DataFrame의 Partition 수가 10이고 Bucket의 수가 4라면 40개의 파일 생성
- 다시 읽어들일 때 10개의 Partition으로 읽혀짐
- 다시 읽어들일 때 원래 Partition의 수만큼으로 재구성됨
- Bucketing 키를 기반으로 작업시 셔플이 없어짐
Small Files 신드롬
왜 작은 크기의 많은 파일이 문제가 되는가?
- 64MB의 파일 하나를 읽는 것 vs. 64Byte의 파일 백만개를 읽는 것
- 이 API 콜은 모두 네트웍 RPC 콜
- 파일 시스템 접근 관련 오버헤드
- 파일 하나를 접근하기 위해서 다수의 API 콜이 필요
- 그래서 앞서 openCostInBytes라는 오버헤드가 각 파일마다 부여됨
읽어들이면서 파티션의 수를 줄일 수 있지만 오버헤드가 큼 : 파일로 쓸 때 어느 정도 정리를 해주는 것이 필요
데이터를 저장할 때 파티션 수와 크기
기본
- bucketBy나 partitionBy를 사용하지 않는 경우
- 각 파티션이 하나의 파일로 쓰여짐
- saveAsTable vs. save
- 적당한 크기와 수의 파티션을 찾는 것이 중요
- 작은 크기의 다수의 파티션이 있다면 문제
- 큰 크기의 소수의 파티션도 문제 (splittable하지 않은 포맷으로 저장이 될 경우)
- Repartition 혹은 coalesce를 적절히 사용
- 이 경우 AQE의 Coalescing가 도움이 될 수 있음 (repartition)
- PARQUET 포맷 사용
- Snappy compression 사용
bucketBy
- 데이터 특성을 잘아는 경우 특정 ID를 기준으로 나눠서 테이블로 저장
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
◦ Bucket의 수와 기준 ID 지정
- 데이터의 특성을 잘 알고 있는 경우 사용 가능
- bucket의 수와 키를 지정해야함
- df.write.mode("overwrite").bucketBy(3, key).saveAsTable(table)
- sortBy를 써서 순서를 정하기도 함
- 이 정보는 metastore에 같이 저장됨
partitionBy
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
- 데이터 자체를 연도-월-일의 폴더 구조로 저장
◦ 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
◦ 데이터 관리도 쉬워짐 (Retention Policy 적용시)
- 하지만 Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
◦ Cardinality가 높은 컬럼을 키로 사용하면 안됨
- partitioning할 키를 지정해야함
bucketBy & partitionBy
- partitionBy 후에 bucketBy사용
- 필터링 패턴 기준 partitionBy 후 그룹핑/조인 패턴 기준 bucketBy 사용
파일크기를 레코드 수로도 제어 가능
spark.sql.files.maxRecordsPerFile : 기본값은 0이며 레코드수로 제약하지 않는다.