๐Ÿ“’ Spark(19)

Kimdongkiยท2024๋…„ 7์›” 10์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
19/22

๐Ÿ“Œ Repartition and Coalesce

Repartition์„ ํ•˜๋Š” ์ด์œ 

  • ์ „์ฒด์ ์œผ๋กœ Partition์˜ ์ˆ˜๋ฅผ ๋Š˜๋ ค ๋ณ‘๋ ฌ์„ฑ์„ ์ฆ๊ฐ€์‹œํ‚ค๊ธฐ ์œ„ํ•ด์„œ์ด๋‹ค.
  • ๊ต‰์žฅํžˆ ํฐ ํŒŒํ‹ฐ์…˜์ด๋‚˜ Skew ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋ฅผ ์กฐ์ ˆํ•˜๊ธฐ ์œ„ํ•ด์„œ์ด๋‹ค.
  • ํŒŒํ‹ฐ์…˜์„ ๋ถ„์„ ํŒจํ„ด์— ๋งž๊ฒŒ ์žฌ๋ถ„๋ฐฐํ•˜๊ธฐ ์œ„ํ•ด์„œ์ด๋‹ค. -> Write once, Read many
    • ์–ด๋–ค DataFrame์„ ํŠน์ • ์ปฌ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ๊ทธ๋ฃนํ•‘์„ ํ•˜๊ฑฐ๋‚˜ ํ•„ํ„ฐ๋ง์„ ์ž์ฃผ ํ•˜๋Š” ๊ฒฝ์šฐ
      -> ๋ฏธ๋ฆฌ ๊ทธ ์ปฌ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ์ €์žฅํ•ด ๋‘์—ˆ๋‹ค๋ฉด ๊ทธ๊ฒŒ Bucketing์ธ ๊ฒƒ์ด๋‹ค.

Repartition์„ ๋ฐฉ์‹

  • 2๊ฐ€์ง€ ๋ฐฉ์‹์ด ์กด์žฌํ•œ๋‹ค.

    • repartition
    • repartitionByRange
  • Shffling์ด ๋ฐœ์ƒํ•œ๋‹ค. -> ๋•Œ๋ฌธ์— ๋ถ„๋ช…ํ•œ ์ด์œ ๋ฅผ ๊ฐ€์ง€๊ณ  Repartition์„ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.

    • ๋งŽ์€ ๊ฒฝ์šฐ repartition์ด ๋ณ„ ์ด์œ ์—†์ด ์‚ฌ์šฉ๋˜์–ด ์˜คํžˆ๋ ค ์‹œ๊ฐ„๊ณผ ๋น„์šฉ์ด ์ฆ๊ฐ€ํ•œ๋‹ค.
      -> ๋น„์Šทํ•˜๊ฒŒ ๋ถˆํ•„์š”ํ•œ Counting๊ณผ Distinct Counting๊ณผ Duplicate์ œ๊ฑฐ ๋น„์šฉ์ด ๋ฐœ์ƒํ•œ๋‹ค.
  • Column์ด ์‚ฌ์šฉ๋˜๋ฉด ๊ท ๋“ฑํ•œ Partition ํฌ๊ธฐ๋ฅผ ๋ณด์žฅํ•  ์ˆ˜์—†๋‹ค.

  • Partition์ˆ˜๋ฅผ ์ค„์ด๋Š” ์šฉ๋„๋กœ๋Š” ์‚ฌ์šฉ๋ถˆ๊ฐ€ํ•˜๋‹ค.
    -> ์ค„์ด๋Š” ๊ฒฝ์šฐ์—๋Š” Coalesce๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.

  • repartition(numPartitions, *cols)

    • Hash ๊ธฐ๋ฐ˜ Partitioning
      • repartition(5)
      • repartition(5, "city")
      • repartition(5, "city", "zipcode")
      • repartition("city")
      • repartition("city", "zipcode")
  • repartitionByRange(numPartitions, *cols)

    • ์ง€์ •๋œ ์ปฌ๋Ÿผ ๊ฐ’์˜ ๋ฒ”์œ„๋ฅผ ๊ธฐ์ค€์œผ๋กœ ํŒŒํ‹ฐ์…˜์„ ๋‚˜๋ˆ„๋Š” ๋ฐฉ์‹์ด๋‹ค.
    • ๋ฐ์ดํ„ฐ ์ƒ˜ํ”Œ๋ง ๊ธฐ๋ฐ˜์œผ๋กœ ํŒŒํ‹ฐ์…˜์„ ๋‚˜๋ˆ„๊ธฐ์— ๊ฒฐ๊ณผ๊ฐ€ ๋งค๋ฒˆ ๋‹ค๋ฅผ ์ˆ˜ ์žˆ๋‹ค.
      • Nondeterministic
    • ์‚ฌ์šฉ๋ฒ• ์ž์ฒด๋Š” ์•ž์„œ repartition๊ณผ ๋™์ผํ•˜๋‹ค.

Coalesce ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ

  • ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋ฅผ ์ค„์ด๋Š” ์šฉ๋„(๋Š˜๋ฆฌ์ง€ ์•Š๋Š”๋‹ค.)
  • Shiffling์ด ๋ฐœ์ƒ์‹œํ‚ค์ง€ ์•Š๊ณ  ๋กœ์ปฌ ํŒŒํ‹ฐ์…˜๋“ค์„ Mergeํ•œ๋‹ค.
    -> ๋”ฐ๋ผ์„œ Skew ํŒŒํ‹ฐ์…˜์„ ๋งŒ๋“ค์–ด๋‚ผ ์ˆ˜ ์žˆ๋‹ค.
  • Column์ด ์‚ฌ์šฉ๋˜๋ฉฐ ๊ท ๋“ฑํ•œ ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๋ฅผ ๋ณด์žฅํ•  ์ˆ˜ ์—†๋‹ค.

DataFrame๊ด€๋ จ Hint

  • Spark SQL Optimizer์—๊ฒŒ Execution Plan์„ ๋งŒ๋“ฌ์— ์žˆ์–ด์„œ ํŠน์ •ํ•œ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•˜๋„๋ก ์ œ์•ˆ
    -> ์ตœ์ ํ™”๋œ ๋ฐฉ์‹์„ ๋ณ€๊ฒฝํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•œ๋‹ค.

  • ๋‘ ์ข…๋ฅ˜์˜ ํžŒํŠธ๋“ค์ด ์กด์žฌํ•œ๋‹ค.

    • Partition ๊ด€๋ จ Hint
    • Join ๊ด€๋ จ Hint

์ฐธ๊ณ 

Partition ๊ด€๋ จ Hint

  • COALESCE
  • REPARTITION
  • REPARTITION_BY_RANGE
  • REBALANCE
    • DataFrame์„ ํ…Œ์ด๋ธ”๋กœ ์ €์žฅํ•  ๋•Œ ์•„์ฃผ ์œ ์šฉํ•˜๋‹ค.
    • ํŒŒ์ผ์˜ ํฌ๊ธฐ๋ฅผ ์ตœ๋Œ€ํ•œ ๋น„์Šทํ•˜๊ฒŒ ๋งŒ๋“ค์–ด์„œ ์ €์žฅํ•œ๋‹ค.(AQE๊ฐ€ ํ•„์š”ํ•จ)
df1.join(df2, "id", "inner").hint("COALESCE", 3)

Join ๊ด€๋ จ Hint

  • BROADCAST, BROADCASTJOIN, MAPJOIN

    • Broadcast Join ์‚ฌ์šฉ ์ œ์•ˆ
  • MERGE, SHUFFLE_MERGE, MERGEJOIN

    • Shuffle Merge Join ์‚ฌ์šฉ ์ œ์•ˆ
    • Spark์˜ ๊ธฐ๋ณธ ์กฐ์ธ ์ „๋žต
  • SHUFFLE_HASH

    • Shuffle Hash Join ์‚ฌ์šฉ ์ œ์•ˆ
    • Full Outer Join์—๋Š” ์‚ฌ์šฉ ๋ถˆ๊ฐ€
  • SHUFFLE_REPLICATE_NL

    • Shuffle-and-replicate (Cross Join) Join ์‚ฌ์šฉ ์ œ์•ˆ
  • ์—ฌ๋Ÿฌ ๊ฐœ๊ฐ€ ๋™์‹œ์— ์‚ฌ์šฉ๋  ๊ฒฝ์šฐ BROADCAST -> SHFFLE_REPLICATE_NL ์ˆœ์œผ๋กœ ์šฐ์„ ์ˆœ์œ„๊ฐ€ ๋‚ฎ์•„์ง„๋‹ค.

SELECT /*+ MERGE(df2) */ *
FROM df1 JOIN df2 ON df1.order_month = df2.year_month

์ฐธ๊ณ 

DataFrame Hint ์‚ฌ์šฉ๋ฒ•

  • Spark SQL
    -> /* + hint[,...] */
SELECT /*+ REPARTITION(3) */ * FROM TABLE
SELECT /*+ BROADCAST(table1) */ * FROM table1
JOIN table2 ON table1.key = table2.key
  • DataFrame API
    -> .hint ๋ฉ”์†Œ๋“œ ์‚ฌ์šฉ
join_df = df1.join(df2, "id", "inner").hint("COALESCE", 3)
join_df = df1.join(df2.hint("broadcast"), "id", "inner").hint("COALESCE", 3)

๐Ÿ“Œ AQE (Adaptive Query Execution)

Spark Optimization์˜ ์—ญ์‚ฌ

  • Spark 1.x : Catalyst Optimizer & Tungsten Project

    • ์ „์ž๋Š” ๊ทœ์น™๊ธฐ๋ฐ˜ ์ตœ์ ํ™”๋ฅผ ์ˆ˜ํ–‰(Predicate pushdown, projection pushdown)
    • ํ›„์ž๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ JVM ๋ฌธ์ œ์—†์ด ์ฝ”๋“œ ์ตœ์ ํ™”๋ฅผ ํ•˜๋ ค๋Š” ๊ฒƒ(GC๋ฅผ ํ”ผํ•˜๊ธฐ ์œ„ํ•ด ์ง์ ‘ Off Heap ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.)
  • Spark 2.x : CBO(Cost-Based Opimizer)

    • ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„ ํ†ต๊ณ„์ •๋ณด๋ฅผ ์ด์šฉํ•˜์—ฌ ํšจ์œจ์ ์ธ Execution plan์„ ์ƒ์„ฑํ•œ๋‹ค.
      -> ์ „์ฒด ํฌ๊ธฐ, ๋ ˆ์ฝ”๋“œ ์ˆ˜, ์ปฌ๋Ÿผ๋ณ„ ํŠน์„ฑ (์ตœ์†Œ/์ตœ๋Œ€/ํžˆ์Šคํ† ๊ทธ๋žจ ๋“ฑ๋“ฑ)

spark.sql.shuffle.partitions

SELECT sku, SUM(price) sales
FROM order
GROUP BY sku;
  • ์ด ์ฟผ๋ฆฌ์— GROUP BY๋ฌธ์€ 2๊ฐœ์˜ stage๋ฅผ ๋งŒ๋“ค์–ด๋‚ธ๋‹ค.

  • spark.sql.shuffle.partitions๊ฐ’์— ์˜ํ•ด Shuffling ํ›„ Partition ์ˆ˜๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค.

  • ์ด ๋ณ€์ˆ˜ ํ•˜๋‚˜๋กœ ๋‹ค์–‘ํ•œ ์ƒํ™ฉ์˜ Shuffling์„ ํ•ด๊ฒฐํ•˜๊ธฐ๋Š” ์‰ฝ์ง€ ์•Š๋‹ค.
    ->MapReduce ์„ธ์ƒ์—์„œ mapreduce.job.reduces์™€ ๋™์ผํ•˜๋‹ค.

  • ์ ์€ ์ˆ˜์˜ Partition์€ ๋ณ‘๋ ฌ์„ฑ์„ ๋‚ฎ์ถ”๊ณ  OOM๊ณผ disk spill์˜ ๊ฐ€๋Šฅ์„ฑ์„ ๋†’์ธ๋‹ค.

  • ๋งŽ์€ ์ˆ˜์˜ Partition์€ task scheduler์™€ task ์ƒ์„ฑ๊ณผ ๊ด€๋ จ๋œ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ์ƒ๊ธฐ๋ฉฐ ๋„ˆ๋ฌด ํ”ํ•œ ๋„คํŠธ์›Œํฌ I/O ์š”์ฒญ์œผ๋กœ ๋ณ‘๋ชฉํ˜„์ƒ์„ ์ดˆ๋ž˜ํ•œ๋‹ค.

์ด์Šˆ ํ•ด๊ฒฐ

  • ๋Œ€์šฉ๋Ÿ‰ DB ๋ถ„์•ผ์—์„œ๋Š” ์ด ๋ฌธ์ œ๋Š” ์ž˜ ์—ฐ๊ตฌ๋œ ๋ฌธ์ œ์ด๋‹ค.
  • Intel Big DataํŒ€์ด ํ”„๋กœํ† ํƒ€์ž…์„ ๊ฐœ๋ฐœํ•œ ํ›„ DataBricks์™€ ํ˜‘์—…์„ ํ•˜์˜€๋‹ค.(Spark 3.0)
  • ๊ธฐ๋ณธ์ ์ธ ์•„์ด๋””์–ด๋Š” parsing time ์ตœ์ ํ™”์™€ runtime ์ตœ์ ํ™”์˜ ๋ณ‘ํ–‰์ด๋‹ค.
    • Parsing time ์ •๋ณด๋กœ ์„ ํƒ๋œ physical plan๊ณผ ์ฝ”๋“œ ์ตœ์ ํ™”๋งŒ์œผ๋กœ๋Š” ๋ถˆ์ถฉ๋ถ„ํ•˜๋‹ค.
    • ํŠนํžˆ UDF๊ฐ€ ๋งŽ์ด ์‚ฌ์šฉ๋˜๋Š” ๊ฒฝ์šฐ ์ด ๋ฌธ์ œ๋Š” ๋”์šฑ ์‹ฌ๊ฐํ•ด์ง„๋‹ค.

AQE - Adaptive Query Execution

  • ๋Ÿฐํƒ€์ž„ ํ†ต๊ณ„๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์ฟผ๋ฆฌ ์‹คํ–‰ ์ค‘์— ๋ฐœ์ƒํ•˜๋Š” ๋™์  ์ฟผ๋ฆฌ ์ตœ์ ํ™”์ด๋‹ค.

  • AQE๋Š” ์ •ํ™•ํ•œ ๋Ÿฐํƒ€์ž„ ํ†ต๊ณ„๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ๋ชจ๋“  ์ตœ์ ํ™” ๊ฒฐ์ •์„ ๋‚ด๋ฆฐ๋‹ค.

  • ๊ทธ๋ ‡๋‹ค๋ฉด ์–ธ์ œ ์ด๋Ÿฐ ์‹คํ–‰์‹œ๊ฐ„ ํ†ต๊ณ„ ์ •๋ณด๋ฅผ ๋ฝ‘๊ณ  ์ตœ์ ํ™” ๋ฐฉ์‹์— ๋ณ€๊ฒฝ์„ ์ค„ ์ˆ˜ ์žˆ๋Š” ์ตœ์ ์˜ ์‹œ์ ์ผ๊นŒ?

    • ์ •๋‹ต์€ Stage์ด๋‹ค.
    • Query -> Job -> Stage -> Task

Stage๊ฐ€ ๊ฐ€์žฅ ์ข‹์€ ์ตœ์ ํ™” ๋ฐฉ์‹ ๋ณ€๊ฒฝ

  • Shuffling/Broadcasting์ด Job์„ Stage๋“ค๋กœ ๋‚˜๋ˆˆ๋‹ค.
  • ๋˜ํ•œ ์ด ๋•Œ ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๋“ค์ด materialize๋œ๋‹ค.
    -> ๋”ฐ๋ผ์„œ ๊ฐ€์žฅ ์ข‹์€ ์‹œ์ ์ด๋ฉฐ ๋˜ํ•œ Partition์˜ ์ˆ˜์™€ ํฌ๊ธฐ ์ •๋ณด๋“ค๋„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

AQE ์ดํ›„ ์„ธ์ƒ

  • ์•ž์„œ Group By ์ฟผ๋ฆฌ 2๋ฒˆ์งธ Stage ์‹œ์ž‘๋ถ€์— AQEShufflingRead๊ฐ€ ์‚ฌ์šฉ๋œ๋‹ค.

AQE๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ๋“ค

  • Dynamically coalescing (Post) shuffle partitions(Spark 3)
  • Dynamically switching join strategies (Spark 3.2)
  • Dynamically optimizing skew joins(Spark 3)

๐Ÿ“Œ Dynamically coalescing (Post) shuffle partitions

AQE์˜ ๋™์ž‘ ๋ฐฉ์‹

  • Stage Dag๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰
  • ๋งค๋ฒˆ ์ƒˆ๋กœ์šด ์ตœ์ ํ™” ๊ธฐํšŒ๊ฐ€ ์žˆ๋Š”์ง€ ์กฐ์‚ฌ
    -> ํ•„์š”ํ•˜๋‹ค๋ฉด ๋‹ค์‹œ ์‹คํ–‰ํ•˜๊ฑฐ๋‚˜ ์ฟผ๋ฆฌ ํ”Œ๋ Œ์„ ๋ณ€๊ฒฝํ•œ๋‹ค.

Dynamically coalescing shuffle partitions ๋™์ž‘ ๋ฐฉ์‹

  • AQE์˜ ํ•ด๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  • ๋‚ด๋ถ€์ ์œผ๋กœ ๋งŽ์€ ์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์„ ๊ณ ์˜๋กœ ์ƒ์„ฑํ•œ๋‹ค.
    -> spark.sql.adaptive.coalescePartitions.initialPartitionNum(200)

  • ๋งค Stage๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ ํ•„์š”ํ•˜๋‹ค๋ฉด ์ž๋™์œผ๋กœ Coalesce๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.
    -> spark.sql.adaptive.coalescePartitions.enabled

  • ์„ค์ •์— ๋”ฐ๋ผ ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋Š” ์ตœ์†Œ ํฌ๊ธฐ ํ˜น์€ ๋ชฉํ‘œ ํฌ๊ธฐ๋ฅผ ๋งž์ถ”๋ ค ๋™์ž‘ํ•œ๋‹ค.

    • spark.sql.adaptive.advisoryPartitionSizeInBytes
    • spark.sql.adaptive.coalescePartitions.minPartitionSize
    • ๋ฌด์—‡์„ ์„ ํƒํ• ์ง€๋Š” spark.sql.adaptive.coalescePartitions.parallelismFirst์— ์˜ํ•ด์„œ ๊ฒฐ์ •๋œ๋‹ค.

Dynamically coalescing shuffle partitions๋ž€?

  • ์™œ ํ•„์š”ํ•œ๊ฐ€?
    • ์ ๋‹นํ•œ Partition์˜ ํฌ๊ธฐ์™€ ์ˆ˜๋Š” ์„ฑ๋Šฅ์— ์ง€๋Œ€ํ•œ ์˜ํ–ฅ์„ ๋ผ์นœ๋‹ค.
    • ๋„ˆ๋ฌด ๋งŽ์€ ์ˆ˜์˜ ์ž‘์€ Partition
      • ์Šค์ผ€์ฅด๋Ÿฌ ์˜ค๋ฒ„ํ—ค๋“œ
      • ํ…Œ์ŠคํŠธ ์ค€๋น„ ์˜ค๋ฒ„ํ—ค๋“œ
      • ๋น„ํšจ์œจ์ ์ธ I/O (ํŒŒ์ผ ์‹œ์Šคํ…œ/๋„คํŠธ์›Œํฌ)
    • ์ ์€ ์ˆ˜์˜ ํฐ Partition
      • GC์•…๋ชฝ -> OOM = Out Of Memory
      • Disk Spill
    • spark.sql.shuffle.partitions๋ผ๋Š” ํ•˜๋‚˜์˜ ๋ณ€์ˆ˜๋กœ๋Š” ๋ถˆ์ถฉ๋ถ„ํ•˜๋‹ค.

Dynamically coalescing shuffle partitions ๊ด€๋ จ ๋ณ€์ˆ˜๋“ค(3.3.1)

ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ด๋ฆ„๊ธฐ๋ณธ ๊ฐ’์„ค๋ช…
spark.sql.adaptive.coalescePartitions.enabledTruespark.sql.adaptive.enabled๋„ true์ธ ๊ฒฝ์šฐ ์…”ํ”Œ ํ›„ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ๋™์ ์œผ๋กœ ์ค„์ด๋ฉฐ ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋Š” ์•„๋ž˜ ๋ณ€์ˆ˜ (advisoryPartitionSizeInBytes)๋กœ ๋งž์ถ”๋ ค ์‹œ๋„ํ•œ๋‹ค.
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB์…”ํ”Œ๋ง ํ›„ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ค„์ผ ๋–„ ๋ชฉํ‘œ๋กœ ํ•˜๋Š” ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ
spark.sql.adaptive.coalescePartitions.parallelismFirstTrue์ด ๊ฐ’์ด True์ด๋ฉด ๋ณ‘๋ ฌ์„ฑ ๋ณด์žฅ์„ ์œ„ํ•ด ์œ„์˜ ๋ชฉํ‘œ ํฌ๊ธฐ๊ฐ€ ๋ฌด์‹œ๋˜๊ณ  ์•„๋ž˜ minPartitionSize๋งŒ ๋ณด์žฅ๋œ๋‹ค.
spark.sql.adaptive.coalescePartitions.initialPartitionNumNoneCoalespcing ์ „์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜, ์—†์œผ๋ฉด spark.sql.shuffle.partitions๋กœ ์„ค์ •ํ•œ๋‹ค.

Dynamically coalescing shuffle partitions

  • Coalescing์ด ์—†๋Š” Shuffle

  • AQE Coalescing์ด ์ ์šฉ๋œ ๊ฒฝ์šฐ

AQE ๋ฐ๋ชจ ํ™˜๊ฒฝ

  • 2๊ฐœ์˜ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค์–ด์„œ 3๊ฐ€์ง€์˜ AQE ๊ธฐ๋Šฅ์„ ํ…Œ์ŠคํŠธํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•œ๋‹ค.

  • Table_1 (items) : ์ผ์ข…์˜ dimension ํ…Œ์ด๋ธ”

    • 30,000,000๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์กด์žฌ.
    • id์™€ price ํ•„๋“œ๋กœ ๊ตฌ์„ฑ
  • Table_2(sales) : ์ผ์ข…์˜ Fact ํ…Œ์ด๋ธ”

    • 1,000,000,000๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์กด์žฌ.
    • item_id์™€ quantity์™€ date ํ•„๋“œ๋กœ ๊ตฌ์„ฑ
  • ์œ„ ๋‘ ๊ฐœ์˜ ํ…Œ์ด๋ธ”์„ Spark SQL์„ ์‚ฌ์šฉํ•˜์—ฌ ์ƒ์„ฑํ•œ๋‹ค.


๐Ÿ“Œ Dynamically switching join strategies

Dynamically switching join strategies๋ž€

  • ์™œ ํ•„์š”ํ•œ๊ฐ€

    • Static Query Plan์ด ์—ฌ๋Ÿฌ ์ด์œ ๋กœ BHJ(Broadcast Hash Join)๊ธฐํšŒ๋ฅผ ๋†“์นœ ๊ฒฝ์šฐ
      • ์กฐ์ธ๋Œ€์ƒ DataFrame๋“ค์— ๋Œ€ํ•œ ํ†ต๊ณ„ ์ •๋ณด ๋ถ€์กฑ(ํ•„ํ„ฐ๋ง ๋“ฑ๋“ฑ)
      • UDF๊ฐ€ ์‚ฌ์šฉ๋œ ๊ฒฝ์šฐ
  • AQE์˜ ํ•ด๋ฒ•

    • Runtime ํ†ต๊ณ„์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์กฐ์ธ ์ „๋žต์„ ๋ณ€๊ฒฝํ•œ๋‹ค.
      -> ์ด๋Š” Stage๋“ค์ด ๋๋‚˜๊ณ  ์กฐ์ธ๋˜๊ธฐ ์ „์— ๋‹ค์‹œ ์ฟผ๋ฆฌ ํ”Œ๋ž˜๋‹์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
    • ์•„๋ž˜ ๋‘ ๊ฐ€์ง€ ์˜ต์…˜์ด ์กด์žฌํ•œ๋‹ค.
      • Broadcast Join (์ถ”์ฒœ๋˜๋ฉฐ ์šฐ์„ ์ˆœ์œ„๋ฅผ ๊ฐ–๋Š”๋‹ค.)
      • Shuffle Hash Join

Dynamically switching join strategies ๋™์ž‘๋ฐฉ์‹

  1. Leaf stage ์‹คํ–‰
  2. ๊ฒฐ๊ณผ๋ฅผ ๋ณด๊ณ  ๋‹ค์‹œ query planning(์ตœ์ ํ™”)
    -> B๊ฐ€ ํ•„ํ„ฐ๋ง ํ›„์— Broadcastr ํ•„ํ„ฐ๋ง ๋ฐ‘์œผ๋กœ ๋‚ด๋ ค๊ฐ€๋Š” ๊ฒฝ์šฐ B๋กœ Broadcast์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
  3. Static query planning๋ณด๋‹ค๋Š” ์„ฑ๋Šฅ์ด ๋–จ์–ด์ง„๋‹ค.(ํ•˜์ง€๋งŒ ์—ฌ์ „ํžˆ ๋„์›€์ด ๋œ๋‹ค.)
spark.sql.adaptive.autoBroadcastJoinThreshold
spark.autoBroadcastJoinThreshold

Dynamically switching join ๊ด€๋ จ ํ™˜๊ฒฝ๋ณ€์ˆ˜(Spark 3.3.1)

ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ด๋ฆ„๊ธฐ๋ณธ ๊ฐ’์„ค๋ช…
spark.sql.join.preferSortMergeJoinTrue๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์กฐ์ธ์‹œ Sort Merge Join์„ ๊ธฐ๋ณธ์œผ๋กœ ์‚ฌ์šฉํ• ์ง€ ์—ฌ๋ถ€, ํ•ญ์ƒ Sort Merge Join์„ ์“ด๋‹ค๋Š” ์˜๋ฏธ๋Š” ์•„๋‹ˆ๋‹ค.
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold0Hash Join์‹œ ํŒŒํ‹ฐ์…˜ ๋ณ„๋กœ ํ•ด์‹œ๋งต ์ƒ์„ฑ์— ์‚ฌ์šฉ๊ฐ€๋Šฅํ•œ ์ตœ๋Œ€ ํฌ๊ธฐ ์ง€์ •. ์ด ๊ฐ’์ด spark.sql.adaptive.advisorPartitionSizeInBytes๋ณด๋‹ค ํฌ๊ณ  ๋ชจ๋“  ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๊ฐ€ ์ด ๊ฐ’๋ณด๋‹ค ์ž‘๋‹ค๋ฉด Hash Join์„ ์„ ํƒํ•˜์—ฌ ์กฐ์ธ์„ ์ง„ํ–‰ํ•œ๋‹ค. spark.sql.join.preferSortMergeJoin์˜ ๊ฐ’์€ ๋ฌด์‹œ๋œ๋‹ค.
spark.sql.adaptive.autoBroadcastJoinThreshold์—†์Œ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๊ฐ€๋Šฅํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์ตœ๋Œ€ ํฌ๊ธฐ. ์ด ๊ฐ’์„ -1๋กœ ์„ค์ •ํ•˜๋ฉด ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋Š” ์‚ฌ์šฉ๋˜์ง€ ์•Š๋Š”๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ spark.sql.autoBroadcastJoinThreshold์™€ ๋™์ผํ•˜๋ฉฐ AQE๊ฐ€ ํ™œ์„ฑํ™”๋œ ๊ฒฝ์šฐ์—๋งŒ ์‚ฌ์šฉ๋œ๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€