Spark application tuning

Minseop Jeong·2024년 2월 12일
0

Apache Spark 기반의 배치잡 및 데이터 애플리케이션을 개발 하면서 리소스 최적화에 대한 고민을 많이 하게된다. 리소스 최적화를 위해 작업했던 내용들을 정리해보았다.

코드/쿼리 최적화

필요 없는 연산을 줄여서 성능을 높이는 것은 모든 프로그래밍의 기본이다. Spark 도 마찬가지로 불필요한 연산을 줄이기 위한 몇 가지 테크닉들이 있다. 하지만 당연히, 상황에 맞게 적용해야 한다.

coalesce

파티션 개수를 조정 해야할 때 repartition 또는 coalesce 를 사용 한다. 파티션 개수를 줄일 때는 coalesce 를 사용 하는 것이 보통 유리하다. repartition 의 경우는 shuffle 을 수행하여 파티션 단위로 데이터를 재분배한다. 반면 coalesce 는 파티션을 이어 붙여서 타깃 파티션 개수로 조정한다. 따라서 파티션 개수 조정의 비용 측면에서는 coalesce 를 사용 하는 것이 유리하다. 하지만, repartition 은 파티션 단위로 데이터를 재분배 해주기 때문에 특정 파티션에 데이터가 skew 되는 현상을 해소할 수 있다.

broadcast join

두 데이터셋을 조인 할 때 Spark 는 기본적으로 sort-merge join 을 한다. sort-merge join 은 두 개의 데이터셋을 정렬한 후에 조인 조건에 맞는 행들을 찾아 병합하는 방식이다. 대용량의 두 데이터셋을 조인할 때는 효율적으로 동작한다. 하지만, 한 데이터셋의 크기가 다른 하나에 비해 작을 때에는 sort-merge join 보다 broadcast join 이 효율적이다. broadcast join 은 크기가 작은 데이터셋을 broadcast 하여 각 워커노드에 복제한 후, 큰 데이터셋과 비교 후 조인을 수행한다. 이 경우 노드간 통신 비용을 절약할 수 있다. 기본 설정으로도 존재하며, spark.sql.autoBroadcastJoinThreshold 에 따라 10MB 이하의 데이터셋은 자동으로 broadcasting 되고 있다.

persist

In-memory data processing framework 라는 특성을 살려서, 재사용되는 데이터셋은 memory 에 persist 해두는 것이 좋다. shuffle 된 데이터는 spill 되지 않는 이상 메모리에 남아 있으므로, shuffle 이후에 재사용되는 데이터셋에만 persist 하면 된다. 이후 코드에서 더이상 사용되지 않는 데이터셋은 unpersist 를 호출하여 storage memory 사용 대상에서 제거해준다.

filter 후 aggregation, join

aggregation 이나 join 은 shuffle 을 동반하기 때문에 비용이 비싼 작업이다. 이러한 작업들 전에 filter 연산을 먼저 적용하여 최대한 shuffle 되는 데이터의 양을 줄인다.

설정 변경

코드/쿼리를 최적화 하고 나서 Spark configuration 의 변경을 통해 성능을 최적화 할 수 있다.

executor core

executor core 는 executor 당 동시에 수행되는 task 의 개수, JVM process 하나당 thread 개수를 의미한다. 너무 적은 core 개수는 parallelism 이 낮아져 병렬처리에 어려움이 생기고, 너무 많은 core 개수는 파티션 개수에 비해 많음으로 인해 parallelism 을 fully 활용하지 못하거나 리소스가 낭비 될 수 있다.

일반적으로 4~5 개가 적당하다고 알려져 있다. 셔플이 대부분인 작업에 대해서는 core 개수를 4~5 개 보다 더 늘리는게 좋은 성능을 보일 때도 있다.

executor

애플리케이션의 성격에 따라 목표로 하는 duration 을 설정하고, 클러스터 리소스 상황을 고려하여 애플리케이션에 할당할 vCPU 개수를 결정하고, 그에 따라 executor 개수를 결정할 수 있다. 할당할 vCPU 개수와 전체 executor core 개수를 동일하게 하여 executor 개수를 결정할 수 있다.

executor memory

executor memory 는 JVM heap memory 다. 너무 작게 설정할 경우, execution 시에 OOM 이 발생하거나 storage memory 의 부족으로 인한 disk spill 이 발생할 수 있다. 기본적으로 executor 당 reserved memory 로 300MB 를 사용하기 때문에, executor memory 가 작으면 작을수록 reserved memory 로 인한 낭비가 크다. 필요 이상으로 크게 설정할 경우, GC가 시작되는 시점을 지연시켜 후에 일어날 Full GC 로 인한 지연이 매우 길어 질 수 있다.

애플리케이션 성격에 따라 다르지만, executor 당 memory 를 4~5GB 정도로 처음 설정하고, 상황에 맞춰 조금씩 늘리거나 줄이고 있다.

shuffle partition

shuffle 연산으로 생성되는 partition 의 개수이다. spark.sql.shuffle.partitions 설정을 통해 변경할 수 있으며, 기본값은 200 이다. shuffle partition 이 기본적으로 전체 executor core 개수의 배수로 설정이 되야 한다. 그렇지 않으면 노는 core 가 생길 수 있기 때문이다. 최소 전체 core 개수 대비 2~3배 정도의 설정이 좋다. 그 다음은 shuffle partition 의 사이즈를 고려해야 하는데, 일반적으로 100MB ~ 200MB 정도가 적당하다고 한다. 인풋 파일 사이즈가 크지 않아 100MB 를 맞출 수 없다면, 전체 core 개수에 비례하게 partition 개수를 설정해준다.

정적인 spark.sql.shuffle.partitions 설정은 인풋 파일 사이즈가 크게 변화할 때 적절히 대응 하기가 어려운데, Spark 3.0 부터는 shuffle 후의 파티션 개수를 파티션 사이즈의 관점에서 adaptive 하게 설정해주는 옵션들 또한 존재한다.

Reference

https://tech.kakao.com/2021/10/08/spark-shuffle-partition/

https://spark.apache.org/docs/latest/sql-performance-tuning.html

profile
Data Engineer

0개의 댓글