Pipelining and Stage Skip

이상민·2023년 3월 24일
0

spark

목록 보기
6/17
$ ./bin/spark-shell --master local[1]

1개의 thread에서 동작

scala> val data = sc.parallelize(1 to 10)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> val data1 = data.map{x => println("map1_" + x); x}
data1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23

scala> val data2 = data1.map{x => println("map2_" + x); x}
data2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:23

scala> val data3 = data2.map{x => println("map3_" + x); x}
data3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:23

scala> data3.getNumPartitions
res0: Int = 1

local에서 동작하더라도 driver와 executor가 동시에 존재. map 함수는 executor 안에서 동작

scala> val data4 = data3.repartition(10)
data4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at repartition at <console>:23

scala> val data5 = data4.map{x => println("map5_" + x); x}
data5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:23

scala> data5.toDebugString
res1: String =
(10) MapPartitionsRDD[8] at map at <console>:23 []
 |   MapPartitionsRDD[7] at repartition at <console>:23 []
 |   CoalescedRDD[6] at repartition at <console>:23 []
 |   ShuffledRDD[5] at repartition at <console>:23 []
 +-(1) MapPartitionsRDD[4] at repartition at <console>:23 []
    |  MapPartitionsRDD[3] at map at <console>:23 []
    |  MapPartitionsRDD[2] at map at <console>:23 []
    |  MapPartitionsRDD[1] at map at <console>:23 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:23 []
scala> data5.count
map1_1 0:>                                                          (0 + 1) / 1]
map2_1
map3_1
map1_2
map2_2
map3_2
map1_3
map2_3
map3_3
map1_4
map2_4
map3_4
map1_5
map2_5
map3_5
map1_6
map2_6
map3_6
map1_7
map2_7
map3_7
map1_8
map2_8
map3_8
map1_9
map2_9
map3_9
map1_10
map2_10
map3_10
map5_10
map5_1
map5_2
map5_3
map5_4
map5_5
map5_6
map5_7
map5_8
map5_9
res2: Long = 10

map5부터는 shuffle되어 순서가 제대로 나오지 않음

scala> data5.count
map5_10
map5_1
map5_2
map5_3
map5_4
map5_5
map5_6
map5_7
map5_8
map5_9
res3: Long = 10

동일한 코드를 반복하면 stage0(map1~map3)의 shuffle 과정을 반복할 필요가 없으므로 skip함

0개의 댓글