[프로그래머스] 데브코스 데이터엔지니어링 TIL Day 78

주재민·2024년 2월 7일
0
post-thumbnail

📖 학습주제

대용량 데이터 훈련 대비 Spark, SparkML 실습 (3)


Dynamically optimizing skew joins

Dynamically optimizing skew joins

왜 필요한가?

  • Skew 파티션으로 인한 성능 문제를 해결하기 위함
    - 한 두개의 오래 걸리는 태스크들로 인한 전체 Job/Stage 종료 지연
    - 이 때 disk spill이 발생한다면 더 느려지게 됨

AQE의 해법

  • 먼저 skew 파티션의 존재 여부 파악
  • 다음으로 skew 파티션을 작게 나누고
  • 다음으로 상대 조인 파티션을 중복하여 만들고 조인 수행

Dynamically optimizing skew joins 동작방식

example

Dynamically optimizing skew joins 환경 변수들 (3.3.1)

환경 변수 이름기본값설명
spark.sql.adaptive.skewJoin.enabledTrue이 값과 spark.sql.adaptive.enabled가 true인 경우, Sort Merge Join 수행시 skew된 파티션들을 먼저 크기를 줄이고 난 다음에 조인 수행
spark.sql.adaptive.skewJoin.skewedPartitionFactor5파티션의 크기가 중간 파티션 크기보다 5배 이상크고 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes의 값보다 크면 skew 파티션으로 간주
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB파티션의 크기가 이 값보다 크고 중간 파티션 크기보다 5배 이상 크면 skew 파티션으로 간주. 이 값은 spark.sql.adaptive.advisoryPartitionSizeInBytes보다 커야함.

Salting을 통한 Data Skew 처리

파티션 관련 환경 설정 변수들 (3.3.1)

환경 변수 이름기본값설명
spark.sql.adaptive.enabled TrueSpark 3.2부터 기본으로 적용되기 시작 (처음 도입은 1.6)
spark.sql.shuffle.partitions200Shuffle 후 만들어질 파티션의 기본 수
spark.sql.files.maxPartitionBytes128MB파일 기반 데이터 소스(Parquet, JSON, ORC 등등)에서 데이터프레임을 만들 때 한 파티션의 최대 크기
spark.sql.files.openCostInBytes4MB파일 기반 데이터소스에서만 의미가 있으며 입력 데이터를 읽기 위해 몇 개의 파티션을 사용할지 결정하는데 사용됨. 대부분의 경우 별 의미 없는 변수이지만 기본적으로 코어별 입력 데이터 크기를 계산했을 때 이 변수의 값보다 작은 경우 파티션의 크기는 이 크기로 맞춰서 만들어짐
참고 : https://stackoverflow.com/questions/70985235/what-is-opencostinbytes
spark.sql.files.minPartitionNumDefault parallelism파일 기반 데이터소스에서만 의미가 있으며 보장되지는 않지만 파일을 읽어들일 때 최소 몇 개의 파티션을 사용할지 결정하는 변수. 설정되어 있지 않다면 기본값은 spark.default.parallelism과 동일
spark.default.parallelism 256MBRDD를 직접 조작하는 경우 의미가 있는 변수로 shuffling 후 파티션의 수를 결정. 단순하게 이야기하자면 클러스터 내 총 코어의 수에 해당.

spark.sql.shuffle.partitions

  • 클러스터 자원과 처리 데이터의 크기를 고려해서 잡마다 바꿔서 설정
  • 큰 데이터를 처리하는 거라면 클러스터 전체 코어의 수로 설정
  • AQE를 사용하는 관점에서는 조금더 크게 잡는 것이 좋음

Salting

  • Skew Partition을 처리하기 위한 테크닉
    - AQE의 등장으로 인해 그렇게 많이 쓰이지 않음
        ◦ 단 AQE만으로 skew partition 이슈가 사라지지 않는다면 사용이 필요할 수도 있음
  • 랜덤 필드를 만들고 그 기준으로 파티션을 새로 만들어서 처리
    - Aggregation 처리의 경우에는 효과적일 수 있음
    - Join의 경우에는 그리 효과적이지 않음
        ◦ AQE에 의존하는 것이 좋음
        ◦ 사실상 Join시 사용 Salting 테크닉을 일반화한 것이 AQE의 skew JOIN 처리방식

example

Aggregation에 사용

Skew JOIN에 사용-1

Skew JOIN에 사용-2

SELECT date, sum(quantity * price) AS total_sales
FROM (
 SELECT *, CASE WHEN item_id = 100 THEN FLOOR(RAND()*20) ELSE 1 END AS salt
 FROM sales
) s
JOIN (
 SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) AS salt
 FROM items
 WHERE id = 100
 UNION
 SELECT *, 1 AS salt
 FROM items
 WHERE id <> 100
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC

Spill

  • 파티션의 크기가 너무 커서 메모리가 부족한 경우 그 여분을 디스크에 쓰는 것
    - Spill이 발생하면 실행시간이 늘어나고 OOM이 발생할 가능성이 올라감
  • Spill이 발생하는 경우 (즉 메모리가 부족해지는 경우)
    - Skew Partition 대상 Aggregation
    - Skew Partition 대상 JOIN
    - 굉장히 큰 explode 작업
    - 큰 파티션(spark.sql.files.maxPartitionBytes)이라면 위의 작업시 spill 가능성이 더 높아짐

Spill의 종류

Spill (memory)

  • 디스크로 spill된 데이터가 메모리에 있을 때의 크기
  • Deserialized 형태라 크기가 보통 8-10배 정도 더 큼

Spill (disk)

  • 메모리에서 spill된 데이터가 디스크에서 차지하는 크기
  • Serialized 형태라 보통 크기가 훨씬 더 작음

All about Partitions

Partition의 종류 (Life cycle of Partitions)

  • 입력 데이터를 로드할 때 파티션 수와 크기
  • 셔플링 후 만들어지는 파티션 수와 크기
  • 데이터를 최종적으로 저장할 때 파티션 수와 크기

파티션의 크기는 128MB ~ 1GB가 좋음

입력 데이터를 로드할 때 파티션 수와 크기

  • 기본적으로는 파티션의 최대 크기에 의해 결정
    - spark.sql.files.maxpartitionbytes (128MB)
  • 해당 데이터가 어떻게 저장되었는지와 연관이 많이 됨
    - 파일포맷이 무엇이고 압축되었는지? 압축되었다면 무슨 알고리즘으로?
        ◦ Splittable한가 -> 한 큰 파일을 다수의 파티션으로 나눠 로드할 수 있는가?
    - 기타 관련 Spark 환경변수들

입력 데이터 파티션 수와 크기를 결정해주는 변수들

bucketBy로 저장된 데이터를 읽는 경우

  • Bucket의 수와 Bucket 기준 컬럼들과 정렬 기준 컬럼들

읽어들이는 데이터 파일이 splittable한지?

  • PARQUET/AVRO등이 좋은 이유가 됨: 항상 splittable
  • JSON/CSV등의 경우 한 레코드가 multi-line이라면 splittable하지 않음
    - Single line이라도 압축시 bzip2를 사용해야만 splittable

입력 데이터의 전체 크기 (모든 파일크기의 합)

  • 입력 데이터를 구성하는 파일의 수

내가 리소스 매니저에게 요청한 CPU의 수

  • executor의 수 x executor 별 CPU 수

입력 데이터 파티션 수와 크기 결정 방식

  1. 먼저 아래 공식으로 maxSplitBytes를 결정
  • bytesPerCore = (데이타파일들 전체크기+파일수*OpenCostInBytes) / default.parallelism
  • maxSplitBytes = Min(maxPartitionBytes, bytesPerCore)
                       = Min(maxPartitionBytes, Max(bytesPerCore, OpenCostInBytes))
  1. 입력 데이터를 구성하는 각 파일에 대해 다음을 진행
  • Splittable하다면 maxSplitBytes 단위로 분할하여 File Chunk 생성
  • Splittable하지 않거나 크기가 maxSplitBytes보다 작다면 하나의 File chunk 생성
  1. 다음으로 위에서 만들어진 File Chunk들로부터 파티션 생성
  • 기본적으로 한 파티션은 하나 혹은 그 이상의 File Chunk들로 구성
  • 한 파티션에 다음 File Chunk의 크기 + openCostInBytes를 더했을 때 이 값이 maxSplitBytes를 넘어가지 않을 때까지 계속해서 머지함 (파일들이 하나의 파티션으로 패킹됨)

Spark Scala 코드 : maxSplitBytes

Spark Scala 코드 : 파티션 생성 코드

요약

  • 코어별로 처리해야할 크기가 너무 크다면 maxPartitionBytes로 제약
  • 데이터의 크기가 적당하면 코어수 만큼의 파티션를 생성
  • 데이터의 크기가 너무 작으면 openCostInBytes로 설정한 크기로 파티션 생성

Bucketing (bucketBy)

  • 데이터를 자주 사용되는 컬럼 기준으로 미리 저장해두고 활용
  • 다양한 최적화 가능
    - 조인 대상 테이블들이 조인 키를 갖고 bucketing된 경우 shuffle free join 가능
    - 한쪽만 bucketing되어있는 경우 one-side shuffle free join 가능 (bucket의 크기에 달림)
    - Bucket pruning를 통한 최적화 가능
    - Shuffle free aggregation
  • Bucket 정보가 metastore에 저장되고 Spark Compiler는 이를 활용
    - 이 경우 sortBy를 통해 순서를 미리 정해주기도 함
    - Spark 테이블로 저장하고 로딩해야지만 이 정보를 이용 가능함
        ◦ aveAsTable, spark.table()

Bucketing (bucketBy) 저장 방식

  • Bucket의 수 x Partition의 수 만큼의 파일이 만들어짐
    - e.g.) DataFrame의 Partition 수가 10이고 Bucket의 수가 4라면 40개의 파일 생성
    - 다시 읽어들일 때 10개의 Partition으로 읽혀짐
  • 다시 읽어들일 때 원래 Partition의 수만큼으로 재구성됨
  • Bucketing 키를 기반으로 작업시 셔플이 없어짐

Small Files 신드롬

왜 작은 크기의 많은 파일이 문제가 되는가?

  • 64MB의 파일 하나를 읽는 것 vs. 64Byte의 파일 백만개를 읽는 것
  • 이 API 콜은 모두 네트웍 RPC 콜
  • 파일 시스템 접근 관련 오버헤드
    - 파일 하나를 접근하기 위해서 다수의 API 콜이 필요
    - 그래서 앞서 openCostInBytes라는 오버헤드가 각 파일마다 부여됨

읽어들이면서 파티션의 수를 줄일 수 있지만 오버헤드가 큼 : 파일로 쓸 때 어느 정도 정리를 해주는 것이 필요

데이터를 저장할 때 파티션 수와 크기

기본

  • bucketBy나 partitionBy를 사용하지 않는 경우
    - 각 파티션이 하나의 파일로 쓰여짐
    - saveAsTable vs. save
  • 적당한 크기와 수의 파티션을 찾는 것이 중요
    - 작은 크기의 다수의 파티션이 있다면 문제
    - 큰 크기의 소수의 파티션도 문제 (splittable하지 않은 포맷으로 저장이 될 경우)
  • Repartition 혹은 coalesce를 적절히 사용
    - 이 경우 AQE의 Coalescing가 도움이 될 수 있음 (repartition)
  • PARQUET 포맷 사용
    - Snappy compression 사용

bucketBy

  • 데이터 특성을 잘아는 경우 특정 ID를 기준으로 나눠서 테이블로 저장
    - 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
        ◦ Bucket의 수와 기준 ID 지정
    - 데이터의 특성을 잘 알고 있는 경우 사용 가능
  • bucket의 수와 키를 지정해야함
    - df.write.mode("overwrite").bucketBy(3, key).saveAsTable(table)
    - sortBy를 써서 순서를 정하기도 함
    - 이 정보는 metastore에 같이 저장됨

partitionBy

  • 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
    - 데이터 자체를 연도-월-일의 폴더 구조로 저장
        ◦ 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
        ◦ 데이터 관리도 쉬워짐 (Retention Policy 적용시)
    - 하지만 Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
        ◦ Cardinality가 높은 컬럼을 키로 사용하면 안됨
  • partitioning할 키를 지정해야함

bucketBy & partitionBy

  • partitionBy 후에 bucketBy사용
  • 필터링 패턴 기준 partitionBy 후 그룹핑/조인 패턴 기준 bucketBy 사용

파일크기를 레코드 수로도 제어 가능

  • spark.sql.files.maxRecordsPerFile : 기본값은 0이며 레코드수로 제약하지 않는다.

0개의 댓글