DevCourse TIL Day3 Week17 - sparkml

김태준·2023년 7월 26일
0

Data Enginnering DevCourse

목록 보기
79/93
post-thumbnail

AQE가 제공해주는 Skew join에 대해 학습을 진행하고 파티션에 대해 알아보고자 한다.

✅ Dynamic optimizing skew joins

skew 파티션이 존재한다면 성능 문제가 발생하게 된다. 이를 해결하기 위해 BROADCAST JOIN을 사용했는데, 만일 2개의 테이블에 BROADCAST JOIN을 하려면, 두 테이블 중 하나는 작아야 한다고 학습하였다. 그러나 만약 2개의 테이블이 모두 크다면 BROADCAT JOIN은 사용할 수 없게 되는데, 어떻게 해결할 수 있는가 ?

-> skew 파티션으로 1,2개의 태스크가 오래걸리는 경우 해결하는 방법.
-> AQE 해법

  • 먼저 SKEW 파티션 존재여부 파악
  • 다음, Skew 파티션을 여러 파티션으로 작게 나누어 상대 조인 파티션을 중복해 만들어 JOIN 수행

🎈 dynamic optimizing skew joins 동작방식

1. leaf stage 실행
2. skew 파티션 찾고 셔플이후 skew reader stage에서 다수의 파티션으로 재분배
3. 조인 대상이 되는 반대션 파티션은 다수의 부분 파티션으로 중복 생성

🎈 salting

AQE 등장 이전, data skew 처리 방식으로 skew 파티션을 처리하기 위한 테크닉 중 하나.

  • 랜덤 필드 생성 후 그 기준으로 파티션을 새로 만들어 처리 (aggregation 처리의 경우 효과적일 수 있고 join의 경우에는 그리 효과적이지 않음)
  • 사실 상 JOIN 시 salting 테크닉을 일반화한 것이 AQE skew JOIN 처리방식.

< 예제 코드 >

SELECT date, sum(quantity * price) AS total_sales
FROM (
	SELECT *, FLOOR(RAND()*20) AS salt
    FROM sales
) s
JOIN (
	SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) AS salt 
    FROM items
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC;

이어, 어제 학습한 repartition에서 나온 spill을 알아보고자 한다.

🎈 Spill

  • 파티션 크기가 너무 커 메모리가 부족한 경우 여분을 디스크에 쓰는 것. (spil 발생 시 실행시간 늘어나고, oom 발생 가능성 올라감.)
    spill이 발생하는 경우는 다음과 같다.
  • skew 파티션 대상 aggregation / join
  • 굉장히 큰 explode 작업 / 큰 파티션인 경우

spill의 종류는 메모리 / 디스크로 나뉜다.

  • 메모리 : 디스크로 spill된 데이터가 메모리에 있을 때의 크기를 의미하며 deserialized 형태라 크기가 보통 8~10배 정도 더 크다.
  • 디스크 : 메모리에서 spill된 데이터가 디스크에서 차지하는 크기를 의미하며 serialized 형태라 보통 크기가 훨씬 더 작다.
    -> 결국 spill (메모리 / 디스크) 모두 같은 데이터를 가르키는데 위치가 메모리에 있는가, 디스크에 있는가의 차이로 크기와 종류가 나누어지게 된다.

✅ Partition

아래 3종류의 파티션이 핵심.
1. 입력 데이터 로드 시 파티션 수와 크기

  • 기본적으로 파티션의 최대 크기에 의해 결정되고, 결국 해당 데이터가 어떻게 저장되었는지와 연관성이 매우 높다. (데이터 파일 형태, 입력 데이터 크기, 리소스 매니저에게 요청한 CPU 수 등)
    -> maxSplitBytes (단위)를 결정한 다음 각 파일에 대해 단위로 분할하여 file chunk 생성한 다음 chunk들로부터 파티션을 생성하는 방식
    -> bytesPerCore : (데이터파일들 전체 크기 + 파일수 * OpenCostInBytes) / default.parallelism
    -> maxSplitBytes : Min(maxPartitionBytes, Max(bytesPerCore, OpenCostInBytes))
    -> 코어별 처리할 크기가 너무 큰 경우 maxPartitionBytes로 제약
    -> 데이터 크기가 적당한 경우 코어 수 만큼 파티션 생성
    -> 데이터 크기가 너무 작다면 openCostInBytes로 설정한 크기로 파티션 생성
  1. 셔플링 후 만들어지는 파티션 수와 크기
  • spark.sql.shufflepartitions
    -> AQE를 반드시 사용
  1. 데이터를 최종적으로 저장할 때 파티션 수와 크기
  • 기본 / bucketBy / partitionBy 로 3가지 방식 존재
  • 파일크기를 레코드 수로도 제어 가능

3-1) 기본

  • 각 파티션이 하나의 파일로 쓰여지고 repartition, coalesce를 적절히 사용할 것, parquet포맷 사용

3-2) bucketBy

  • 데이터를 자주 사용하는 컬럼 기준으로 미리 저장해두고 활용 (bucket 정보가 metastore에 저장되고 spark compiler가 이를 활용)
  • bucket 수 * 파티션 수 만큼의 파일이 생성 (다시 읽을 경우 원래 파티션 수만큼 재구성)
  • 데이터 특성을 잘 아는 경우, 특정 ID 기준 나누어 테이블로 저장. (bucket 수와 키를 지정할 必)

3-3) partitionBy

  • 굉장히 큰 로그파일을 데이터 생성 시간을 기준으로 데이터 읽기를 많이하는경우. 파티션 키를 잘 지정하여 효율적으로 처리 가능
profile
To be a DataScientist

0개의 댓글