Spark coalesce v.s. repartition

Minseop Jeong·2022년 1월 22일
0

Spark을 사용하여 작업 후, 결과물을 write 할 파티션 개수를 지정할 수 있다. coalesce와 repartition을 주로 사용하는데, 두 개의 차이를 알아보았다.

coalesce

coalesce 은 아래와 같이 구현이 되어 있다. 인자로 파티션 개수(numPartitions: Int), 셔플 여부(shuffle: Boolean), 파티션 coalesce를 어떻게 할지 define 한 정보 (PartitionCoalescer)를 인자로 받는다.

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](
          mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
          new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

아래 인용처럼 coalesce는 파티션 개수를 줄일 때 사용된다. 요청한 변경 파티션의 개수가 현재의 파티션 개수 보다 큰 경우, 원래의 파티션 개수를 유지한다고 한다 (default, shuffle=false). 원래의 파티션 개수보다 파티션 개수를 늘리고 싶은 경우, shuffle=true를 인자로 넘겨주면 된다.

if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.

아래 인용을 보면 급격하게 적은 파티션으로 coalesce를 하는 경우, 노드를 적게 쓸 수 있다고 나온다. 예를 들어 worker node 10개, 파티션 100개 인 경우에 coalesce(1)을 해버리면 파티션이 하나가 되어야 하고, worker node 10개에 파티션을 분산시킬 수 없기 때문에 원래 쓰려던 worker node 만큼 쓰지 못하는 것이다. 따라서 이 경우에는 shuffle=true 로 해주는 것이 job performance에 더 도움이 된다.

However, if you're doing a drastic coalesce, (e.g. to numPartitions = 1), this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true.

repartition

repartition 은 아래와 같이 coalesce를 사용하여 구현되어 있다. coalesce를 shuffle=true 인자를 넣어서 실행한다.

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

정리

  • 파티션 개수를 늘릴 때는 repartition (또는 coalesce(numPartitions, shuffle=true) 를 사용 한다.
  • 파티션 개수를 줄일 때는 coalesce 를 사용한다. 단 급격하게 적은 파티션 개수로 줄일 때 coalesce 는 worker node를 fully 사용하지 못할 수 있으므로 repartition (또는 coalesce(numPartitions, shuffle=true) 를 사용 한다.

Reference

profile
Data Engineer

0개의 댓글