Spark을 사용하여 작업 후, 결과물을 write 할 파티션 개수를 지정할 수 있다. coalesce와 repartition을 주로 사용하는데, 두 개의 차이를 알아보았다.
coalesce 은 아래와 같이 구현이 되어 있다. 인자로 파티션 개수(numPartitions: Int), 셔플 여부(shuffle: Boolean), 파티션 coalesce를 어떻게 할지 define 한 정보 (PartitionCoalescer)를 인자로 받는다.
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
아래 인용처럼 coalesce는 파티션 개수를 줄일 때 사용된다. 요청한 변경 파티션의 개수가 현재의 파티션 개수 보다 큰 경우, 원래의 파티션 개수를 유지한다고 한다 (default, shuffle=false
). 원래의 파티션 개수보다 파티션 개수를 늘리고 싶은 경우, shuffle=true
를 인자로 넘겨주면 된다.
if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.
아래 인용을 보면 급격하게 적은 파티션으로 coalesce를 하는 경우, 노드를 적게 쓸 수 있다고 나온다. 예를 들어 worker node 10개, 파티션 100개 인 경우에 coalesce(1)을 해버리면 파티션이 하나가 되어야 하고, worker node 10개에 파티션을 분산시킬 수 없기 때문에 원래 쓰려던 worker node 만큼 쓰지 못하는 것이다. 따라서 이 경우에는 shuffle=true
로 해주는 것이 job performance에 더 도움이 된다.
However, if you're doing a drastic coalesce, (e.g. to numPartitions = 1), this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true.
repartition 은 아래와 같이 coalesce를 사용하여 구현되어 있다. coalesce를 shuffle=true
인자를 넣어서 실행한다.
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}