[데이터 엔지니어링 데브코스 2기] TIL-17주차 대용량 데이터 훈련 대비 Spark, SparkML 실습 (3)

이재호·2024년 2월 7일
0

1. skew join 최적화.


Dynamically optimizing skew join.

  • Skew 파티션으로 인한 성능 문제를 해결하기 위함.
  • 한두 개의 오래 걸리는 테스크들로 인한 전체 Job/Stage 종료 지연 방지.
  • 먼저 skew 파티션의 존재 여부 파악 -> skew 파티션을 작게 나눔 -> 상대 조인 파티션을 중복하여 만들고 조인 수행.
  • spark.sql.adaptive.skewJoin.skewedPartitionFactor (기본값: 5)
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (기본값: 256MB)
  • 위 두 조건으로 스큐 파티션 여부를 판단함. (파티션 크다/작다 기준)
  • spark.sql.adaptive.skewJoin.enabled (디폴트: 트루)

실습.

  • pyspark --driver-memory 2g --executor-memory 2g
  • spark.conf.set("spark.sql.adaptive.skewJoin.enabled", False)
df_f = spark.sql("""
SELECT date, sum(quantity * price) AS total_sales FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
""")
  • df_f.count() : lazy 실행이기에 메모리로 불러오기 위해 키운트 호출.
  • Web UI의 Active Stage를 보면 kill와 199/200 처럼 테스크가 실행 중인 것을 확인할 수가 있는데, 이는 데이터 1개의 테스크에 skew가 있으며 재실행되고 있다는 것을 의미함.
  • 만약 계속해서 잡 실행이 실패하면, executor-memory를 늘려주거나, AQE를 활용하는 방법이 있음.
  • spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True) : aqe 사용.
df_t = spark.sql("""
SELECT date, sum(quantity * price) AS total_sales FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
""")
  • df_t.count()
  • enabled된 경우, 실행 속도가 4배나 빠른 것을 확인 가능. Details에서 특정 파티션의 skewed and coalesced 등과 같은 것도 확인 가능.

SQL 정리.

CREATE TABLE items
USING parquet
AS
SELECT id, 
CAST(rand() * 1000 AS INT) <!-- AS price # 0~999의 값을 갖는 price 칼럼 -->
FROM RANGE(30000000); <!-- 총 3천만 개의 레코드 생성 -->

CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS item_id, CAST(rand() * 100 AS INT) AS quantity, <!-- 일부러 80%는 100이라는 값을 갖도록 skew 생성. -->
DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS date FROM RANGE(1000000000); <!-- 10억 개의 레코드 -->

<!-- shuffling 후에 coalescing하는 예시. (파티션 수 줄이기) -->
SELECT date, sum(quantity) AS q 
FROM sales
GROUP BY 1
ORDER BY 2 DESC;

<!-- 조인 전략. 동적으로 리플래닝하여 통계정보를 알고 브로드캐스트 조인함. -->
SELECT date, sum(quantity * price) AS total_sales 
FROM sales s
JOIN items i ON s.item_id = i.id
WHERE price < 10
GROUP BY 1 ORDER BY 2 DESC;

<!-- skew join 예시. 드라이버와 엑시큐터 메모리가 더 필요했음. -->
SELECT date, sum(quantity * price) AS total_sales 
FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;

2. Salting을 사용한 Data Skew 처리


AQE가 나오기 전 data skew 처리 방법: salting.

환경 변수.

  • spark.sql.adaptive.enabled (True)
  • spark.sql.shuffle.partitions (200)
  • spark.sql.files.maxPartitionBytes (128MB)
  • spark.sql.files.openCostInBytes (4MB)
  • spark.sql.files.minPartitionNum (default parallelism)
  • spark.default.parallelism

spark.sql.shuffle.partitions.

  • 클러스터 자원과 처리 데이터의 크기를 고려하여 잡마다 바꿔서 설정.
  • 큰 데이터를 처리하는 거살면 클러스터 전체 코어의 수로 설정.
  • AQE를 사용하는 관점에서는 조금 더 크게 잡는 것이 좋음.

Salting.

  • Skew Partition을 처리하기 위한 테크닉.
  • AQE의 등장 이후로는 잘 사용되지 않음.
  • 랜덤 필드를 만들고 그 기준으로 파티션을 새로 만들어서 처리함.

Spill이란,

  • 파티션의 크기가 너무 커서 메모리가 부족한 경우 그 여분을 디스크에 쓰는 것.
  • Spill이 발생하는 경우 (메모리가 부족해지는 경우)
    .
    • Skew Partition 대상 Aggregation.
    • Skew Partition 대상 JOIN.
    • 굉장히 큰 explode 작업.
    • 큰 파티션이라면 위의 작업 시 spill 가능성이 더 높아짐.
  • 종류)
    .
    • spill (memory) : 디스크로 spill된 데이터가 메모리에 있을 때의 크기. (압축 해제된 상태라 용량이 더 큼.)
    • spill (disk) : 메모리에서 spill된 데이터가 디스크에서 차지하는 크기. (압축된 상태라 용량이 더 작음.)

요약.

  • Repartition과 Coalesce를 사용한 파티션 크기와 분포 조절.
  • AQE (Adaptive Query Execution) 소개와 데모. (최적화 케이스: 1.셔플링 후 coalesce로 파티션 크기 축소 2.동적 JOIN(브로드캐스트 조인) 3.Skew 조인)
  • Salting을 통한 Data Skew 처리와 Spill

3. 모든 파티션


세 종류의 파티션.

  • 입력 데이터를 로드할 때의 파티션.
  • 셔플링 후 만들어지는 파티션.
  • 데이터를 최종적으로 저장할 때의 파티션.
  • 파티션의 크기는 128MB ~ 1GB가 좋음.

입력 데이터를 로드할 때 파티션의 수와 크기.

  • 기본적으로 파티션의 최대 크기(spark.sql.files.maxPartitionBytes)에 의해 결정.
  • 결국 해당 데이터가 어떻게 저장되는 지와 연관이 많이 됨.
    -
    • 파일 포맷과 압축 여부 등.
    • 기타 Spark 환경 변수.

입력 데이터 파티션 수와 크기를 결정하는 변수들.

  • bucketBy로 저장된 데이터를 읽는 경우 (버킷의 수와 버킷 기준 칼럼과 정렬 기준 칼럼)
  • 읽어들이는 데이터 파일이 splitable한지. (parquet/avro: 항상 splitable, json/csv: 경우에 따라 다름)
  • 입력 데이터의 전체 크기 (입력 데이터를 구성하는 파일의 수).
  • 내가 리소스 매니저에게 요청한 CPU의 수.
  • spark.sql.files.maxPartitionBytes (128MB)
  • spark.sql.files.openCostInBytes (4MB)
  • spark.sql.files.minPartitionNum
  • spark.default.parallelism

절차.

  • bytesPerCore = (데이터파일들 전체 크기 + 파일 수*OpenCostInBytes) / default.parallelism.
  • maxSplitBytes = Min(maxPartitionBytes, bytesPerCore)
  • Splitable하다면 maxSplitBytes 단위로 분할하여 File Chunk 생성. (else 크기가 maxSplitBytes보다 작다면 하나의 File Chunk로 생성.)
  • 위에서 만들어진 File Chunk들로부터 파티션 생성.
  • 한 파티션은 하나 혹은 그 이상의 File Chunk들로 구성됨.
  • 정리) maxSplitBytes 결정 -> File Chunk 생성 -> File Chunk로부터 파티션 생성.

입력 데이터 파티션 수와 크기 설정 예제 1.

  • 파일 수 : 50
  • 파일 포맷 : parquet
  • 파일 크기 : 65MB
  • Spakr App에 할당된 코어 수 : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  1. bytesPerCore = (5065MB + 504MB) / 10 = 345MB.
  2. maxSplitBytes = Min(128MB, 345MB) = 128MB.
  3. 최종 파티션 수 : 50. (50개의 파일 청크 생성 -> 128MB에는 (65MB+4MB)가 하나만 들어가기에 각 파일 청크당 하나의 파티션 생성.)

입력 데이터 파티션 수와 크기 설정 예제 2.

  • 파일 수 : 50
  • 파일 포맷 : parquet
  • 파일 크기 : 63MB (아마 압축된 크기일 것.)
  • Spakr App에 할당된 코어 수 : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  1. bytesPerCore = (5063MB + 504MB) / 10 = 335MB.
  2. maxSplitBytes = Min(128MB, 335MB) = 128MB.
  3. 최종 파티션 수 : 50. (50개의 파일 청크 생성 -> 128MB에는 (63MB+4MB)가 하나만 들어가기에 각 파일 청크당 하나의 파티션 생성.)

입력 데이터 파티션 수와 크기 설정 예제 3.

  • 파일 수 : 50
  • 파일 포맷 : parquet
  • 파일 크기 : 40MB
  • Spakr App에 할당된 코어 수 : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  1. bytesPerCore = (5040MB + 504MB) / 10 = 220MB.
  2. maxSplitBytes = Min(128MB, 220MB) = 128MB.
  3. 최종 파티션 수 : 17. (50개의 파일 청크 생성 -> 128MB에는 (40MB+4MB+40MB+4MB+40MB)가 들어갈 수 있음.)

입력 데이터 파티션 수와 크기 설정 예제 4.

  • 파일 수 : 10
  • 파일 포맷 : csv (splitable하지 않은 gzip으로 압축.)
  • 파일 크기 : 335MB
  • Spakr App에 할당된 코어 수 : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  1. bytesPerCore = (10335MB + 104MB) / 10 = 339MB.
  2. maxSplitBytes = Min(128MB, 339MB) = 128MB.
  3. 최종 파티션 수 : 10. (10개의 파일 청크 생성 -> 128MB에는 (335MB + 4MB)가 1개 이상 들어갈 수 없음.)
  • CSV나 JSON 파일의 경우 splitable하지 않게 압축된 경우가 많으므로 되도록 parquet/avro 활용.

입력 데이터 파티션 수와 크기 설정 예제 5.

  • 파일 수 : 10
  • 파일 포맷 : csv (splitable한 bzip2로 압축)
  • 파일 크기 : 311MB
  • Spakr App에 할당된 코어 수 : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  1. bytesPerCore = (10311MB + 104MB) / 10 = 315MB.
  2. maxSplitBytes = Min(128MB, 315MB) = 128MB.
  3. 최종 파티션 수 : 25. (103(128짜리 20개, 55짜리 10개)개의 파일 청크 생성 -> 128에는 그대로, 55는 2 + 4MB*2 하여 118짜리로.)

입력 데이터 파티션 수와 크기 설정 예제 6.

  • 파일 수 : 100
  • 파일 포맷 : csv (splitable하지 않은 gzip으로 압축.)
  • 파일 크기 : 10MB
  • Spakr App에 할당된 코어 수 : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  1. bytesPerCore = (10010MB + 1004MB) / 10 = 140MB.
  2. maxSplitBytes = Min(128MB, 140MB) = 128MB.
  3. 최종 파티션 수 : 12. (100개의 파일 청크 생성 -> 128MB에는 (14MB*9)가 들어갈 수 있음.)

입력 데이터 파티션 수와 크기 결정 방식 요약.

  • bytesPerCore -> maxSplitBytes -> File Chunk -> Partitioning.
  • 코어별로 처리해야 할 크기가 너무 크다면 maxPartitionBytes로 제약.
  • 데이터의 크기가 적당하면 코어 수만큼의 파티션을 생성.
  • 데이터의 크기가 너무 작으면 openCostInBytes로 설정한 크기로 파티션 생성.

Bucketing 소개.

  • 데이터를 자주 사용되는 칼럼 기준으로 미리 저장해 두고 활용.
  • 다양한 최적화 가능.
  • 버킷 정보가 metastore에 저장되고 Spark Complier는 이것을 활용함.
  • 버킷의 수 * 파티션의 수만큼의 파일이 만들어 짐.
  • 다시 읽어들일 때 원래 파티션의 수만큼으로 재구성됨.
  • 버켓팅 키를 기반으로 작업 시 셔플이 없어짐.
spark = SparkSession.builder...

df1 = spark.range(1, 10 ** 6)
print(df1.rdd.getNumPartitions())
df1.write.format('parquet').mode('overwrite').bucketBy(
100, 'id').saveAsTable('spark_bucket_table1')

df2 = spark.range(1 10 ** 5)
print(df2.rdd.getNumPartitions())
df2.write.format('parquet').mode('overwrite').bucketBy(
100, 'id').saveAsTable('spark_bucket_table2')


> small file 이슈.
- 64MB 파일을 하나 읽는 것 vs. 64Byte의 파일을 백만 개 읽는 것.
- 파일 시스템 접근 관련 오버헤드 발생.
- 읽어들이면서 파티션의 수를 줄일 수 있지만 오버헤드가 큼.

> 데이터를 저장할 때 파티션 수와 크기.
- 기본 방식.
- bucketBy.
- partitionBy.
- 파일 크기를 레코드 수로도 제어 가능. (spark.sql.files.maxRecordsPerFile)

> 기본 방식.
- 각 파티션이 하나의 파일로 쓰여짐.
- saveAsTable vs. save
- 적당한 크기와 수의 파티션을 찾는 것이 중요.
- Repartition 혹은 coalesce를 적절히 사용.
- parquet 포맷 사용.

> bucketBy 방식.
- 데이터 특성을 잘 아는 경우 특정 ID를 기준으로 나눠서 테이블로 저장.
- 다음에 재사용 시 이를 로딩하여 반복 처리 시 시간 단축.
- 버킷 수와 ID 지정.
- df.write.mode('overwrite').bucketBy(3, key).saveAsTable(table)
- metastore에 정보가 같이 저장됨.

> partitionBy 방식.
- 굉장히 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기를 많이 한다면?
	-
    - 데이터 자체를 연도-월-일의 폴더 구조로 저장.
    - 이를 통해 데이터 읽기 과정을 최적화.
    - 데이터 관리의 유용함.
    - 하지만 Partition Key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨.
- partitioning할 키를 지정해야 함.
	-
    - df.write.mode('overwrite').partitionBy('order_month').saveAsTable('order')
    - df.write.mode('overwrite').partitionBy('year', 'month', 'day').saveAsTable('appl_stock')
    
> buckeyBy & partitionBy.
- df.write.mode('overwrite').partitionBy('dept').bucketBy(5, 'employeeId')
profile
천천히, 그리고 꾸준히.

0개의 댓글