Partition이란?
- Partition은 RDDs나 Dataset를 구성하고 있는 최소 단위 객체이며, 스파크의 성능과 리소스 점유량을 크게 좌우할 수 있는 RDD의 가장 기본적인 개념입니다.
- 데이터 파티셔닝은 데이터를 여러 클러스터 노드로 분할하는 메커니즘을 의미합니다.
- 각 Partition은 서로 다른 노드에서 분산 처리됩니다.즉, 1 Core = 1 Task = 1 Partition입니다.
- Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리됩니다. 또한, 하나의 Task는 하나의 Core가 연산 처리합니다.
- 이처럼 설정된 Partition 수에 따라 각 Partition의 크기가 결정됩니다. 그리고 이 Partition의 크기가 결국 Core 당 필요한 메모리 크기를 결정하게 됩니다.
- Partition 수 → Core 수
- Partition 크기 → 메모리 크기
따라서, Partition의 크기와 수가 Spark 성능에 큰 영향을 미치는데, 통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고, Partition의 수가 많을수록 Core가 더 필요합니다.
- 적은 수의 Partition = 크기가 큰 Partition
- 많은 수의 Partition = 크기가 작은 Partition
즉, Partition의 수를 늘리는 것은 Task 당 필요한 메모리를 줄이고 병렬화의 정도를 늘립니다.
Partition의 종류
동일한 Partition이지만 쓰이는 때에 따라 분류를 합니다.
- Input Partition
- Output Partition
- Shuffle Partition
1) input partition
- spark.conf.set( "spark.sql.files.maxPartitionBytes", 134217728 )
2) output partition
- coalesce()와 repartition()처럼 write시에 partition을 조정하는 옵션
3) shuffle partition
- spark.conf.set("spark.sql.shuffle.partitions", 1800) 처럼 셔플에 사용되는 파티션 수를 설정하는 옵션,
spark.conf.set("spark.sql.files.maxPartitionBytes",bytes)
- 처음 파일을 읽을 때 생성하는 파티션
- 기본값은 134217728(128MB)
- 파일(HDFS상의 마지막 경로에 존재하는 파일)의 크기가 128MB보다 크다면 Spark에서 128MB만큼 쪼개면서 파일을 읽는다.
- 파일의 크기가 128MB보다 작다면 그대로 읽어 들여, 파일 하나당 Partition하나가 된다.
- file-based sources인 JSON,parquet,ORC 같은 것에만 적용된다.
spark.conf.set("spark.sql.shuffle.partitions",num of partition)
- spark.sql.shuffle.partitions 옵션은 join,groupBy 혹은 aggregation 같은 연산을 할 시 data shuffling되는 파티션 수를 나타냅니다.
- 파티션의 개수는 default로는 200으로 지정 되어 있으며, spark.conf.set("spark.sql.shuffle.partitions", 1800)와 같은 형태로 조정이 가능합니다.
- Core 수에 맞게 설정하라고 하기보단, Partition의 크기에 맞추어서 설정
- Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어남 -> Task가 지연되고 에러가 발생 -> 하둡클러스터에 에러가 나고 spark 강제종료
- Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결
- 일반적으로, 하나의 Shuffle Partition 크기가 100~200MB정도 나올 수 있도록 수를 조절하는 것이 최적
- 클러스터의 익스큐터 수보다 파티션의 수를 더 크게 지정하는 것이 대체로 좋다. executor num > partition num
- 로컬 머신에서 처리할 경우 병렬로 처리할 수 있는 task가 제한적이므로 이 값을 작게 설정
coalesce(), repartition()은 언제 사용하는지?
- filter()와 같은 transformation 연산을 수행하다 보면 최초에 설정된 파티션 개수가 적합하지 않은 경우가 있는데, 이럴 때 파티션 수를 조정하는 option을 사용합니다.
- 파일을 저장할 때 생성하는 Partition, 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정
- 보통 groupBy 집계 후 저장할 때 데이터의 크기가 작아짐 -> spark.sql.shuffle.partitions 설정에 따라 파일 수가 지정되는데, 이때 파일의 크기를 늘리기 위해 repartition와 coalesce을 사용해 Partition 수를 줄임으로서 파일의 크기를 늘림
- df.where()를 통해 필터링을 하고 나서 그대로 저장한다면 파편화가 생김 -> repartition(cnt) (셔플)을 한 후 저장
coalesce() vs repartition()
- 이 2개의 가장 큰 차이는 셔플을 하냐 안하냐의 차이입니다(default로 봤을 때)
- coalesce는 파티션을 줄일 때 사용하고, repartition()은 파티션 수를 늘리거나 줄일 때 사용합니다.
- coalesce(numofpartition,true) 처럼 true 옵션을 줬을 때에는 강제로 셔플을 할 수 있습니다. -> true옵션을 주게 되면 파티션을 늘릴 수도 있습니다.
- default값으로 사용하려면 파티션 수를 줄일 땐 coalesce(), 파티션 수를 늘릴 땐 repartition()사용
spark.conf.set("spark.sql.files.maxRecordsPerFile",num of records)
- spark write시에 파일의 record수를 지정하여 해당 record수 별로 파일을 생성 할 수 있는 config
- write시에 .option("maxRecordsPerFile",100000) 으로도 지정 가능
df write시에 repartition(),coalesce() vs partitionBy()
-
Pyspark repartition(),coalesce()는 Dataframe method로 disk에 쓸 때 메모리에 있는 파티션의 수를 늘리고 줄이는데 사용, 모든 부분의 파일을 하나의 디렉토리에 담겨서 생성
- df.coalesce(10).write.format("parquet").mode("overwrite").save("path")
- df.repartition(10).write.format("parquet").mode("overwrite").save("path")
-
Pyspark partitionBy()는 DataframeWriter class의 method로 disk에 Dataframe을 Partition Column의 유니크한 값으로 sub-directory로써 파티셔닝 할 때 사용
- df.write.partitionBy("columnname").format("parquet").mode("overwrite").save("path")
- df.write.format("parquet").partitionBy("columnname").mode("overwrite").save("path")
- df.write.format("parquet").mode("overwrite").partitionBy("columnname").save("path")
참고