๐ Repartition and Coalesce
Repartition์ ํ๋ ์ด์
- ์ ์ฒด์ ์ผ๋ก Partition์ ์๋ฅผ ๋๋ ค ๋ณ๋ ฌ์ฑ์ ์ฆ๊ฐ์ํค๊ธฐ ์ํด์์ด๋ค.
- ๊ต์ฅํ ํฐ ํํฐ์
์ด๋ Skew ํํฐ์
์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํ๊ธฐ ์ํด์์ด๋ค.
- ํํฐ์
์ ๋ถ์ ํจํด์ ๋ง๊ฒ ์ฌ๋ถ๋ฐฐํ๊ธฐ ์ํด์์ด๋ค. -> Write once, Read many
- ์ด๋ค DataFrame์ ํน์ ์ปฌ๋ผ ๊ธฐ์ค์ผ๋ก ๊ทธ๋ฃนํ์ ํ๊ฑฐ๋ ํํฐ๋ง์ ์์ฃผ ํ๋ ๊ฒฝ์ฐ
-> ๋ฏธ๋ฆฌ ๊ทธ ์ปฌ๋ผ ๊ธฐ์ค์ผ๋ก ์ ์ฅํด ๋์๋ค๋ฉด ๊ทธ๊ฒ Bucketing์ธ ๊ฒ์ด๋ค.
Repartition์ ๋ฐฉ์
-
2๊ฐ์ง ๋ฐฉ์์ด ์กด์ฌํ๋ค.
- repartition
- repartitionByRange
-
Shffling์ด ๋ฐ์ํ๋ค. -> ๋๋ฌธ์ ๋ถ๋ช
ํ ์ด์ ๋ฅผ ๊ฐ์ง๊ณ Repartition์ ์ฌ์ฉํด์ผํ๋ค.
- ๋ง์ ๊ฒฝ์ฐ repartition์ด ๋ณ ์ด์ ์์ด ์ฌ์ฉ๋์ด ์คํ๋ ค ์๊ฐ๊ณผ ๋น์ฉ์ด ์ฆ๊ฐํ๋ค.
-> ๋น์ทํ๊ฒ ๋ถํ์ํ Counting๊ณผ Distinct Counting๊ณผ Duplicate์ ๊ฑฐ ๋น์ฉ์ด ๋ฐ์ํ๋ค.
-
Column์ด ์ฌ์ฉ๋๋ฉด ๊ท ๋ฑํ Partition ํฌ๊ธฐ๋ฅผ ๋ณด์ฅํ ์์๋ค.
-
Partition์๋ฅผ ์ค์ด๋ ์ฉ๋๋ก๋ ์ฌ์ฉ๋ถ๊ฐํ๋ค.
-> ์ค์ด๋ ๊ฒฝ์ฐ์๋ Coalesce๋ฅผ ์ฌ์ฉํด์ผํ๋ค.
-
repartition(numPartitions, *cols)
- Hash ๊ธฐ๋ฐ Partitioning
- repartition(5)
- repartition(5, "city")
- repartition(5, "city", "zipcode")
- repartition("city")
- repartition("city", "zipcode")
-
repartitionByRange(numPartitions, *cols)
- ์ง์ ๋ ์ปฌ๋ผ ๊ฐ์ ๋ฒ์๋ฅผ ๊ธฐ์ค์ผ๋ก ํํฐ์
์ ๋๋๋ ๋ฐฉ์์ด๋ค.
- ๋ฐ์ดํฐ ์ํ๋ง ๊ธฐ๋ฐ์ผ๋ก ํํฐ์
์ ๋๋๊ธฐ์ ๊ฒฐ๊ณผ๊ฐ ๋งค๋ฒ ๋ค๋ฅผ ์ ์๋ค.
- ์ฌ์ฉ๋ฒ ์์ฒด๋ ์์ repartition๊ณผ ๋์ผํ๋ค.
Coalesce ๊ฐ ํ์ํ ๊ฒฝ์ฐ
- ํํฐ์
์ ์๋ฅผ ์ค์ด๋ ์ฉ๋(๋๋ฆฌ์ง ์๋๋ค.)
- Shiffling์ด ๋ฐ์์ํค์ง ์๊ณ ๋ก์ปฌ ํํฐ์
๋ค์ Mergeํ๋ค.
-> ๋ฐ๋ผ์ Skew ํํฐ์
์ ๋ง๋ค์ด๋ผ ์ ์๋ค.
- Column์ด ์ฌ์ฉ๋๋ฉฐ ๊ท ๋ฑํ ํํฐ์
ํฌ๊ธฐ๋ฅผ ๋ณด์ฅํ ์ ์๋ค.
DataFrame๊ด๋ จ Hint
-
Spark SQL Optimizer์๊ฒ Execution Plan์ ๋ง๋ฌ์ ์์ด์ ํน์ ํ ๋ฐฉ์์ ์ฌ์ฉํ๋๋ก ์ ์
-> ์ต์ ํ๋ ๋ฐฉ์์ ๋ณ๊ฒฝํ๊ธฐ ์ํด ์ฌ์ฉํ๋ค.
-
๋ ์ข
๋ฅ์ ํํธ๋ค์ด ์กด์ฌํ๋ค.
- Partition ๊ด๋ จ Hint
- Join ๊ด๋ จ Hint
์ฐธ๊ณ
Partition ๊ด๋ จ Hint
- COALESCE
- REPARTITION
- REPARTITION_BY_RANGE
- REBALANCE
- DataFrame์ ํ
์ด๋ธ๋ก ์ ์ฅํ ๋ ์์ฃผ ์ ์ฉํ๋ค.
- ํ์ผ์ ํฌ๊ธฐ๋ฅผ ์ต๋ํ ๋น์ทํ๊ฒ ๋ง๋ค์ด์ ์ ์ฅํ๋ค.(AQE๊ฐ ํ์ํจ)
df1.join(df2, "id", "inner").hint("COALESCE", 3)
Join ๊ด๋ จ Hint
-
BROADCAST, BROADCASTJOIN, MAPJOIN
- Broadcast Join ์ฌ์ฉ ์ ์
-
MERGE, SHUFFLE_MERGE, MERGEJOIN
- Shuffle Merge Join ์ฌ์ฉ ์ ์
- Spark์ ๊ธฐ๋ณธ ์กฐ์ธ ์ ๋ต
-
SHUFFLE_HASH
- Shuffle Hash Join ์ฌ์ฉ ์ ์
- Full Outer Join์๋ ์ฌ์ฉ ๋ถ๊ฐ
-
SHUFFLE_REPLICATE_NL
- Shuffle-and-replicate (Cross Join) Join ์ฌ์ฉ ์ ์
-
์ฌ๋ฌ ๊ฐ๊ฐ ๋์์ ์ฌ์ฉ๋ ๊ฒฝ์ฐ BROADCAST -> SHFFLE_REPLICATE_NL ์์ผ๋ก ์ฐ์ ์์๊ฐ ๋ฎ์์ง๋ค.
SELECT *
FROM df1 JOIN df2 ON df1.order_month = df2.year_month
์ฐธ๊ณ
DataFrame Hint ์ฌ์ฉ๋ฒ
- Spark SQL
-> /* + hint[,...] */
SELECT * FROM TABLE
SELECT * FROM table1
JOIN table2 ON table1.key = table2.key
- DataFrame API
-> .hint ๋ฉ์๋ ์ฌ์ฉ
join_df = df1.join(df2, "id", "inner").hint("COALESCE", 3)
join_df = df1.join(df2.hint("broadcast"), "id", "inner").hint("COALESCE", 3)
๐ AQE (Adaptive Query Execution)
Spark Optimization์ ์ญ์ฌ
spark.sql.shuffle.partitions
SELECT sku, SUM(price) sales
FROM order
GROUP BY sku;
-
์ด ๋ณ์ ํ๋๋ก ๋ค์ํ ์ํฉ์ Shuffling์ ํด๊ฒฐํ๊ธฐ๋ ์ฝ์ง ์๋ค.
->MapReduce ์ธ์์์ mapreduce.job.reduces์ ๋์ผํ๋ค.
-
์ ์ ์์ Partition์ ๋ณ๋ ฌ์ฑ์ ๋ฎ์ถ๊ณ OOM๊ณผ disk spill์ ๊ฐ๋ฅ์ฑ์ ๋์ธ๋ค.
-
๋ง์ ์์ Partition์ task scheduler์ task ์์ฑ๊ณผ ๊ด๋ จ๋ ์ค๋ฒํค๋๊ฐ ์๊ธฐ๋ฉฐ ๋๋ฌด ํํ ๋คํธ์ํฌ I/O ์์ฒญ์ผ๋ก ๋ณ๋ชฉํ์์ ์ด๋ํ๋ค.
์ด์ ํด๊ฒฐ
- ๋์ฉ๋ DB ๋ถ์ผ์์๋ ์ด ๋ฌธ์ ๋ ์ ์ฐ๊ตฌ๋ ๋ฌธ์ ์ด๋ค.
- Intel Big Dataํ์ด ํ๋กํ ํ์
์ ๊ฐ๋ฐํ ํ DataBricks์ ํ์
์ ํ์๋ค.(Spark 3.0)
- ๊ธฐ๋ณธ์ ์ธ ์์ด๋์ด๋ parsing time ์ต์ ํ์ runtime ์ต์ ํ์ ๋ณํ์ด๋ค.
- Parsing time ์ ๋ณด๋ก ์ ํ๋ physical plan๊ณผ ์ฝ๋ ์ต์ ํ๋ง์ผ๋ก๋ ๋ถ์ถฉ๋ถํ๋ค.
- ํนํ UDF๊ฐ ๋ง์ด ์ฌ์ฉ๋๋ ๊ฒฝ์ฐ ์ด ๋ฌธ์ ๋ ๋์ฑ ์ฌ๊ฐํด์ง๋ค.
AQE - Adaptive Query Execution
-
๋ฐํ์ ํต๊ณ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ฟผ๋ฆฌ ์คํ ์ค์ ๋ฐ์ํ๋ ๋์ ์ฟผ๋ฆฌ ์ต์ ํ์ด๋ค.
-
AQE๋ ์ ํํ ๋ฐํ์ ํต๊ณ๋ฅผ ๋ฐํ์ผ๋ก ๋ชจ๋ ์ต์ ํ ๊ฒฐ์ ์ ๋ด๋ฆฐ๋ค.
-
๊ทธ๋ ๋ค๋ฉด ์ธ์ ์ด๋ฐ ์คํ์๊ฐ ํต๊ณ ์ ๋ณด๋ฅผ ๋ฝ๊ณ ์ต์ ํ ๋ฐฉ์์ ๋ณ๊ฒฝ์ ์ค ์ ์๋ ์ต์ ์ ์์ ์ผ๊น?
- ์ ๋ต์ Stage์ด๋ค.
- Query -> Job -> Stage -> Task
Stage๊ฐ ๊ฐ์ฅ ์ข์ ์ต์ ํ ๋ฐฉ์ ๋ณ๊ฒฝ
- Shuffling/Broadcasting์ด Job์ Stage๋ค๋ก ๋๋๋ค.
- ๋ํ ์ด ๋ ์ค๊ฐ ๊ฒฐ๊ณผ๋ค์ด materialize๋๋ค.
-> ๋ฐ๋ผ์ ๊ฐ์ฅ ์ข์ ์์ ์ด๋ฉฐ ๋ํ Partition์ ์์ ํฌ๊ธฐ ์ ๋ณด๋ค๋ ์ ์ ์๋ค.

AQE ์ดํ ์ธ์
- ์์ Group By ์ฟผ๋ฆฌ 2๋ฒ์งธ Stage ์์๋ถ์ AQEShufflingRead๊ฐ ์ฌ์ฉ๋๋ค.

AQE๊ฐ ํ์ํ ๊ฒฝ์ฐ๋ค
- Dynamically coalescing (Post) shuffle partitions(Spark 3)
- Dynamically switching join strategies (Spark 3.2)
- Dynamically optimizing skew joins(Spark 3)
๐ Dynamically coalescing (Post) shuffle partitions
AQE์ ๋์ ๋ฐฉ์
- Stage Dag๋ฅผ ์์ฐจ์ ์ผ๋ก ์คํ
- ๋งค๋ฒ ์๋ก์ด ์ต์ ํ ๊ธฐํ๊ฐ ์๋์ง ์กฐ์ฌ
-> ํ์ํ๋ค๋ฉด ๋ค์ ์คํํ๊ฑฐ๋ ์ฟผ๋ฆฌ ํ๋ ์ ๋ณ๊ฒฝํ๋ค.


Dynamically coalescing shuffle partitions ๋์ ๋ฐฉ์
-
AQE์ ํด๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค.
-
๋ด๋ถ์ ์ผ๋ก ๋ง์ ์์ ํํฐ์
์ ๊ณ ์๋ก ์์ฑํ๋ค.
-> spark.sql.adaptive.coalescePartitions.initialPartitionNum(200)
-
๋งค Stage๊ฐ ์ข
๋ฃ๋ ๋ ํ์ํ๋ค๋ฉด ์๋์ผ๋ก Coalesce๋ฅผ ์ํํ๋ค.
-> spark.sql.adaptive.coalescePartitions.enabled
-
์ค์ ์ ๋ฐ๋ผ ํํฐ์
์ ํฌ๊ธฐ๋ ์ต์ ํฌ๊ธฐ ํน์ ๋ชฉํ ํฌ๊ธฐ๋ฅผ ๋ง์ถ๋ ค ๋์ํ๋ค.
- spark.sql.adaptive.advisoryPartitionSizeInBytes
- spark.sql.adaptive.coalescePartitions.minPartitionSize
- ๋ฌด์์ ์ ํํ ์ง๋ spark.sql.adaptive.coalescePartitions.parallelismFirst์ ์ํด์ ๊ฒฐ์ ๋๋ค.

Dynamically coalescing shuffle partitions๋?
- ์ ํ์ํ๊ฐ?
- ์ ๋นํ Partition์ ํฌ๊ธฐ์ ์๋ ์ฑ๋ฅ์ ์ง๋ํ ์ํฅ์ ๋ผ์น๋ค.
- ๋๋ฌด ๋ง์ ์์ ์์ Partition
- ์ค์ผ์ฅด๋ฌ ์ค๋ฒํค๋
- ํ
์คํธ ์ค๋น ์ค๋ฒํค๋
- ๋นํจ์จ์ ์ธ I/O (ํ์ผ ์์คํ
/๋คํธ์ํฌ)
- ์ ์ ์์ ํฐ Partition
- GC์
๋ชฝ -> OOM = Out Of Memory
- Disk Spill
- spark.sql.shuffle.partitions๋ผ๋ ํ๋์ ๋ณ์๋ก๋ ๋ถ์ถฉ๋ถํ๋ค.
Dynamically coalescing shuffle partitions ๊ด๋ จ ๋ณ์๋ค(3.3.1)
| ํ๊ฒฝ ๋ณ์ ์ด๋ฆ | ๊ธฐ๋ณธ ๊ฐ | ์ค๋ช
|
|---|
| spark.sql.adaptive.coalescePartitions.enabled | True | spark.sql.adaptive.enabled๋ true์ธ ๊ฒฝ์ฐ ์
ํ ํ ํํฐ์
์๋ฅผ ๋์ ์ผ๋ก ์ค์ด๋ฉฐ ํํฐ์
์ ํฌ๊ธฐ๋ ์๋ ๋ณ์ (advisoryPartitionSizeInBytes)๋ก ๋ง์ถ๋ ค ์๋ํ๋ค. |
| spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | ์
ํ๋ง ํ ํํฐ์
์๋ฅผ ์ค์ผ ๋ ๋ชฉํ๋ก ํ๋ ํํฐ์
์ ํฌ๊ธฐ |
| spark.sql.adaptive.coalescePartitions.parallelismFirst | True | ์ด ๊ฐ์ด True์ด๋ฉด ๋ณ๋ ฌ์ฑ ๋ณด์ฅ์ ์ํด ์์ ๋ชฉํ ํฌ๊ธฐ๊ฐ ๋ฌด์๋๊ณ ์๋ minPartitionSize๋ง ๋ณด์ฅ๋๋ค. |
| spark.sql.adaptive.coalescePartitions.initialPartitionNum | None | Coalespcing ์ ์ ํํฐ์
์, ์์ผ๋ฉด spark.sql.shuffle.partitions๋ก ์ค์ ํ๋ค. |
Dynamically coalescing shuffle partitions
AQE ๋ฐ๋ชจ ํ๊ฒฝ
-
2๊ฐ์ ํ
์ด๋ธ์ ๋ง๋ค์ด์ 3๊ฐ์ง์ AQE ๊ธฐ๋ฅ์ ํ
์คํธํ๋๋ฐ ์ฌ์ฉํ๋ค.
-
Table_1 (items) : ์ผ์ข
์ dimension ํ
์ด๋ธ
- 30,000,000๊ฐ์ ๋ ์ฝ๋๊ฐ ์กด์ฌ.
- id์ price ํ๋๋ก ๊ตฌ์ฑ
-
Table_2(sales) : ์ผ์ข
์ Fact ํ
์ด๋ธ
- 1,000,000,000๊ฐ์ ๋ ์ฝ๋๊ฐ ์กด์ฌ.
- item_id์ quantity์ date ํ๋๋ก ๊ตฌ์ฑ
-
์ ๋ ๊ฐ์ ํ
์ด๋ธ์ Spark SQL์ ์ฌ์ฉํ์ฌ ์์ฑํ๋ค.
๐ Dynamically switching join strategies
Dynamically switching join strategies๋
-
์ ํ์ํ๊ฐ
- Static Query Plan์ด ์ฌ๋ฌ ์ด์ ๋ก BHJ(Broadcast Hash Join)๊ธฐํ๋ฅผ ๋์น ๊ฒฝ์ฐ
- ์กฐ์ธ๋์ DataFrame๋ค์ ๋ํ ํต๊ณ ์ ๋ณด ๋ถ์กฑ(ํํฐ๋ง ๋ฑ๋ฑ)
- UDF๊ฐ ์ฌ์ฉ๋ ๊ฒฝ์ฐ
-
AQE์ ํด๋ฒ
- Runtime ํต๊ณ์ ๋ณด๋ฅผ ๋ฐํ์ผ๋ก ์กฐ์ธ ์ ๋ต์ ๋ณ๊ฒฝํ๋ค.
-> ์ด๋ Stage๋ค์ด ๋๋๊ณ ์กฐ์ธ๋๊ธฐ ์ ์ ๋ค์ ์ฟผ๋ฆฌ ํ๋๋์ ์ํํ๋ค.
- ์๋ ๋ ๊ฐ์ง ์ต์
์ด ์กด์ฌํ๋ค.
- Broadcast Join (์ถ์ฒ๋๋ฉฐ ์ฐ์ ์์๋ฅผ ๊ฐ๋๋ค.)
- Shuffle Hash Join
Dynamically switching join strategies ๋์๋ฐฉ์
- Leaf stage ์คํ
- ๊ฒฐ๊ณผ๋ฅผ ๋ณด๊ณ ๋ค์ query planning(์ต์ ํ)
-> B๊ฐ ํํฐ๋ง ํ์ Broadcastr ํํฐ๋ง ๋ฐ์ผ๋ก ๋ด๋ ค๊ฐ๋ ๊ฒฝ์ฐ B๋ก Broadcast์กฐ์ธ์ ์ํํ๋ค.
- Static query planning๋ณด๋ค๋ ์ฑ๋ฅ์ด ๋จ์ด์ง๋ค.(ํ์ง๋ง ์ฌ์ ํ ๋์์ด ๋๋ค.)
spark.sql.adaptive.autoBroadcastJoinThreshold
spark.autoBroadcastJoinThreshold

Dynamically switching join ๊ด๋ จ ํ๊ฒฝ๋ณ์(Spark 3.3.1)
| ํ๊ฒฝ ๋ณ์ ์ด๋ฆ | ๊ธฐ๋ณธ ๊ฐ | ์ค๋ช
|
|---|
| spark.sql.join.preferSortMergeJoin | True | ๋ฐ์ดํฐํ๋ ์ ์กฐ์ธ์ Sort Merge Join์ ๊ธฐ๋ณธ์ผ๋ก ์ฌ์ฉํ ์ง ์ฌ๋ถ, ํญ์ Sort Merge Join์ ์ด๋ค๋ ์๋ฏธ๋ ์๋๋ค. |
| spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold | 0 | Hash Join์ ํํฐ์
๋ณ๋ก ํด์๋งต ์์ฑ์ ์ฌ์ฉ๊ฐ๋ฅํ ์ต๋ ํฌ๊ธฐ ์ง์ . ์ด ๊ฐ์ด spark.sql.adaptive.advisorPartitionSizeInBytes๋ณด๋ค ํฌ๊ณ ๋ชจ๋ ํํฐ์
ํฌ๊ธฐ๊ฐ ์ด ๊ฐ๋ณด๋ค ์๋ค๋ฉด Hash Join์ ์ ํํ์ฌ ์กฐ์ธ์ ์งํํ๋ค. spark.sql.join.preferSortMergeJoin์ ๊ฐ์ ๋ฌด์๋๋ค. |
| spark.sql.adaptive.autoBroadcastJoinThreshold | ์์ | ๋ธ๋ก๋์บ์คํธ ๊ฐ๋ฅํ ๋ฐ์ดํฐํ๋ ์์ ์ต๋ ํฌ๊ธฐ. ์ด ๊ฐ์ -1๋ก ์ค์ ํ๋ฉด ๋ธ๋ก๋์บ์คํธ๋ ์ฌ์ฉ๋์ง ์๋๋ค. ๊ธฐ๋ณธ๊ฐ์ spark.sql.autoBroadcastJoinThreshold์ ๋์ผํ๋ฉฐ AQE๊ฐ ํ์ฑํ๋ ๊ฒฝ์ฐ์๋ง ์ฌ์ฉ๋๋ค. |