๐Ÿ“’ Spark(20)

Kimdongkiยท2024๋…„ 7์›” 10์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
20/22

๐Ÿ“Œ Dynamically optimizing skew joins

Dynamically optimizing skew joins๋ž€

  • ์™œ ํ•„์š”ํ•œ๊ฐ€?

    • Skew ํŒŒํ‹ฐ์…˜์œผ๋กœ ์ธํ•œ ์„ฑ๋Šฅ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ ํ•„์š”ํ•˜๋‹ค.
      • ํ•œ ๋‘๊ฐœ์˜ ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” Task๋“ค๋กœ ์ธํ•œ ์ „์ฒด Job/Stage ์ข…๋ฃŒ ์ง€์—ฐ
      • ์ด ๋•Œ disk spill์ด ๋ฐœ์ƒํ•œ๋‹ค๋ฉด ๋” ๋А๋ ค์ง€๊ฒŒ ๋œ๋‹ค.
  • AQE์˜ ํ•ด๋ฒ•

    • ๋จผ์ € skew ํŒŒํ‹ฐ์…˜์˜ ์กด์žฌ ์—ฌ๋ถ€๋ฅผ ํŒŒ์•…ํ•œ๋‹ค.
    • ๋‹ค์Œ์œผ๋กœ skew ํŒŒํ‹ฐ์…˜์„ ์ž‘๊ฒŒ ๋‚˜๋ˆˆ๋‹ค.
    • ๋‹ค์Œ์œผ๋กœ ์ƒ๋Œ€ ์กฐ์ธ ํŒŒํ‹ฐ์…˜์„ ์ค‘๋ณตํ•˜์—ฌ ๋งŒ๋“ค๊ณ  ์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

Dynamically optimizing skew joins ๋™์ž‘๋ฐฉ์‹

  1. Leaf stage ์‹คํ–‰
  2. ํ•œ์ชฝ์— skew partition์ด ๋ณด์ด๋Š” ๊ฒฝ์šฐ skew reader๋ฅผ ํ†ตํ•ด ์ž‘์€ ๋‹ค์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ์žฌ๊ตฌ์„ฑ
  3. ์กฐ์ธ ๋Œ€์ƒ์ด ๋˜๋Š” ๋ฐ˜๋Œ€ํŽธ ํŒŒํ‹ฐ์…˜์€ ์•ž์„œ ๋‹ค์ˆ˜์˜ ๋ถ€๋ถ„ ํŒŒํ‹ฐ์…˜์ชฝ์œผ๋กœ ์ค‘๋ณตํ•ด์„œ ํŒŒํ‹ฐ์…˜์„ ์ƒ์„ฑ
spark.sql.adaptive.skewJoin.skewedPartitionFactor
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Dynamically optimizing skew joins ๊ทธ๋ฆผํ‘œํ˜„

Dynamically optimizing skew joins ํ™˜๊ฒฝ๋ณ€์ˆ˜(Spark 3.3.1)

ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ด๋ฆ„๊ธฐ๋ณธ ๊ฐ’์„ค๋ช…
spark.sql.adaptive.skewJoin.enabledTrue์ด ๊ฐ’๊ณผ spark.sql.adpative.enabled๊ฐ€ true์ธ ๊ฒฝ์šฐ Sort Merge Join ์ˆ˜ํ–‰์‹œ skew๋œ ํŒŒํ‹ฐ์…˜๋“ค์„ ๋จผ์ € ํฌ๊ธฐ๋ฅผ ์ค„์ด๊ณ  ๋‚œ ๋‹ค์Œ์— ์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
spark.sql.adaptive.skewJoin.skewedPartitionFactor5ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๊ฐ€ ์ค‘๊ฐ„ ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๋ณด๋‹ค 5๋ฐฐ ์ด์ƒ ํฌ๊ณ  spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes์˜ ๊ฐ’๋ณด๋‹ค ํฌ๋ฉด skew ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ฐ„์ฃผํ•œ๋‹ค.
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MBํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๊ฐ€ ์ด ๊ฐ’๋ณด๋‹ค ํฌ๊ณ  ์ค‘๊ฐ„ ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๋ณด๋‹ค 5๋ฐฐ์ด์ƒ ํฌ๋ฉด skewํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ฐ„์ฃผํ•œ๋‹ค. ์ด ๊ฐ’์€ spark.sql.adaptive.advisoryPartitionSizeInBytes๋ณด๋‹ค ์ปค์•ผํ•œ๋‹ค.

Demo

CREATE TABLE items
USING parquet
AS
SELECT id,
CAST(rand()* 1000 AS INT) AS price
FROM RANGE(30000000);

CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand()* 30000000 AS INT) END AS item_id,
CAST(rand()* 100 AS INT) AS quantity,
DATE_ADD(current_date(), - CAST(rand()* 360 AS INT)) AS date
FROM RANGE(1000000000);
  • Coalescing

pyspark --driver-memory 2g --executor-memory 2g

SELECT date, sum(quantity) AS q
FROM sales
GROUP BY 1
ORDER BY 2 DESC;
  • JOIN Strategies

pyspark --driver-memory 2g --executor-memory 2g

SELECT date, sum(quantity * price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
WHERE price < 10
GROUP BY 1
ORDER BY 2 DESC;
  • SKEW JOIN optimization

pyspark --driver-memory 4g --executor-memory 8g

SELECT date, sum(quantity * price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;

๐Ÿ“Œ Salting์„ ํ†ตํ•œ Data Skew ์ฒ˜๋ฆฌ

ํŒŒํ‹ฐ์…˜ ๊ด€๋ จ ํ™˜๊ฒฝ ์„ค์ • ๋ณ€์ˆ˜๋“ค (3.3.1 ๊ธฐ์ค€) - ์ฐธ๊ณ 

ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ด๋ฆ„๊ธฐ๋ณธ๊ฐ’์„ค๋ช…
spark.sql.adaptive.enalbedTrueSpark 3.2๋ถ€ํ„ฐ ๊ธฐ๋ณธ์œผ๋กœ ์ ์šฉ๋˜๊ธฐ ์‹œ์ž‘ํ–ˆ๋‹ค.(๋„์ž…์€ 1.6)
spark.sql.shuffle.partitions200Shuffle ํ›„ ๋งŒ๋“ค์–ด์งˆ ํŒŒํ‹ฐ์…˜์˜ ๊ธฐ๋ณธ ์ˆ˜
spark.sql.files.maxPartitionBytes128MBํŒŒ์ผ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์†Œ์Šค(Parquet, JSON, ORC ๋“ฑ๋“ฑ)์—์„œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋งŒ๋“ค ๋•Œ ํ•œ ํŒŒํ‹ฐ์…˜์˜ ์ตœ๋Œ€ ํฌ๊ธฐ
spark.sql.files.openCostInBytes4MB์ด ์—ญ์‹œ ํŒŒ์ผ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ์†Œ์Šค์—์„œ๋งŒ ์˜๋ฏธ๊ฐ€ ์žˆ์œผ๋ฉฐ ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ธฐ ์œ„ํ•ด ๋ช‡ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ์‚ฌ์šฉํ• ์ง€ ๊ฒฐ์ •ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ๋Œ€๋ถ€๋ถ„์˜ ๊ฒฝ์šฐ ๋ณ„ ์˜๋ฏธ ์—†๋Š” ๋ณ€์ˆ˜์ด์ง€๋งŒ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ฝ”์–ด๋ณ„ ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ํฌ๊ธฐ๋ฅผ ๊ณ„์‚ฐํ—€์„ ๋•Œ ์ด ๋ณ€์ˆ˜์˜ ๊ฐ’๋ณด๋‹ค ์ž‘์€ ๊ฒฝ์šฐ ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋Š” ์ด ํฌ๊ธฐ๋กœ ๋งž์ถฐ์„œ ๋งŒ๋“ค์–ด์ง„๋‹ค. ์ž์„ธํ•œ ์„ค๋ช…
spark.sql.files.minPartitionNumDefault parallelism์ด ์—ญ์‹œ ํŒŒ์ผ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ์†Œ์Šค์—์„œ๋งŒ ์˜๋ฏธ๊ฐ€ ์žˆ์œผ๋ฉฐ ๋ณด์žฅ๋˜์ง€๋Š” ์•Š์ง€๋งŒ ํŒŒ์ผ์„ ์ฝ์–ด๋“ค์ผ ๋•Œ ์ตœ์†Œ ๋ช‡ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ์‚ฌ์šฉํ• ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ๋ณ€์ˆ˜์ด๋‹ค. ์„ค์ •๋˜์–ด ์žˆ์ง€ ์•Š๋‹ค๋ฉด ๊ธฐ๋ณธ๊ฐ’์€ spark.default.parallelism๊ณผ ๋™์ผํ•˜๋‹ค.
spark.default.parallelismRDD๋ฅผ ์ง์ ‘ ์กฐ์ž‘ํ•˜๋Š” ๊ฒฝ์šฐ ์˜๋ฏธ๊ฐ€ ์žˆ๋Š” ๋ณ€์ˆ˜๋กœ shuffling ํ›„ ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค. ๊ฐ„๋‹จํ•˜๊ฒŒ ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด ์ด ์ฝ”์–ด์˜ ์ˆ˜ ๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

spark.sql.shuffle.partitions

  • ํด๋Ÿฌ์Šคํ„ฐ ์ž์›๊ณผ ์ฒ˜๋ฆฌ ๋ฐ์ดํ„ฐ์˜ ํฌ๊ธฐ๋ฅผ ๊ณ ๋ คํ•ด์„œ Job๋งˆ๋‹ค ๋ฐ”๊ฟ”์„œ ์„ค์ •ํ•œ๋‹ค.
  • ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฑฐ๋ผ๋ฉด ํด๋Ÿฌ์Šคํ„ฐ ์ „์ฒด ์ฝ”์–ด์˜ ์ˆ˜๋กœ ์„ค์ •ํ•œ๋‹ค.
  • AQE๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ด€์ ์—์„œ๋Š” ์กฐ๊ธˆ๋” ํฌ๊ฒŒ ์žก๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

Salting์ด๋ž€?

  • Salting์ด๋ž€ Skew Partition์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ํ…Œํฌ๋‹‰์ด๋‹ค.
    • AQE์˜ ๋“ฑ์žฅ์œผ๋กœ ์ธํ•ด ๊ทธ๋ ‡๊ฒŒ ๋งŽ์ด ์“ฐ์ด์ง€๋Š” ์•Š๋Š”๋‹ค.
      -> ๋‹จ AQE๋งŒ์œผ๋กœ Skew partition ์ด์Šˆ๊ฐ€ ์‚ฌ๋ผ์ง€์ง€ ์•Š๋Š”๋‹ค๋ฉด ์‚ฌ์šฉ์ด ํ•„์š”ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.
  • ๋žœ๋ค ํ•„๋“œ๋ฅผ ๋งŒ๋“ค๊ณ  ๊ทธ ๊ธฐ์ค€์œผ๋กœ ํŒŒํ‹ฐ์…˜์„ ์ƒˆ๋กœ ๋งŒ๋“ค์–ด์„œ ์ฒ˜๋ฆฌํ•œ๋‹ค.
    • Aggregation ์ฒ˜๋ฆฌ์˜ ๊ฒฝ์šฐ์—๋Š” ํšจ๊ณผ์ ์ผ ์ˆ˜ ์žˆ๋‹ค.
    • Join์˜ ๊ฒฝ์šฐ์—๋Š” ๊ทธ๋ฆฌ ํšจ๊ณผ์ ์ด์ง€ ์•Š๋‹ค
      • AQE์— ์˜์กดํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.
      • ์‚ฌ์‹ค์ƒ Join์‹œ ์‚ฌ์šฉ Salting ํ…Œํฌ๋‹‰์„ ์ผ๋ฐ˜ํ™”ํ•œ ๊ฒƒ์ด AQE์˜ skew Join์ฒ˜๋ฆฌ๋ฐฉ์‹์ด๋‹ค.

Salting์„ Aggregation์— ์‚ฌ์šฉํ•œ ์˜ˆ์‹œ

SELECT item_id, COUNT(1)
FROM sales
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;

SELECT item_id, SUM(cnt)
FROM(
	SELECT item_id, salt, COUNT(1) cnt
    FROM (
    	SELECT FLOOR(RAND()* 200) salt, item_id
    	FROM sales
    )
    GROUP BY 1, 2
)
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;

Salting์„ Skew Join์— ์‚ฌ์šฉํ•œ ์˜ˆ์‹œ

SELECT date, sum(quantity*price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
  1. Skew Partition์ด ์žˆ๋Š” ์ชฝ์— salt์ถ”๊ฐ€
  2. ๋ฐ˜๋Œ€ Partition์— ์ค‘๋ณต ๋ ˆ์ฝ”๋“œ ๋งŒ๋“ค๊ธฐ
  3. Join ์กฐ๊ฑด์— salt ์ถ”๊ฐ€ํ•˜๊ธฐ
SELECT date, sum(quantity*price) AS total_sales
FROM (
	SELECT *, FLOOR(RAND()* 20) AS salt
    FROM sales
) s
JOIN(
	SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19) AS salt
    FROM items
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC;
SELECT date, sum(quantity * price) AS total_sales
FROM (
   SELECT *, CASE WHEN item_id = 100 THEN FLOOR(RAND()*20) ELSE 1 END AS salt
   FROM sales
) s
JOIN (
   SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) AS salt
   FROM items
   WHERE id = 100
   UNION
   SELECT *, 1 AS salt
   FROM items
   WHERE id <> 100
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC

Spill์ด๋ž€?

  • ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๊ฐ€ ๋„ˆ๋ฌด ์ปค์„œ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•œ ๊ฒฝ์šฐ ๊ทธ ์—ฌ๋ถ„์„ ๋””์Šคํฌ์— ์“ฐ๋Š” ๊ฒƒ์ด๋‹ค.

    • Spill์ด ๋ฐœ์ƒํ•˜๋ฉด ์‹คํ–‰์‹œ๊ฐ„์ด ๋Š˜์–ด๋‚˜๊ณ  OOM์ด ๋ฐœ์ƒํ•  ๊ฐ€๋Šฅ์„ฑ์ด ์˜ฌ๋ผ๊ฐ„๋‹ค.
  • Spill์ด ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ -> ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•ด์ง€๋Š” ๊ฒฝ์šฐ

    • Skew Partition ๋Œ€์ƒ Aggregation
    • Skew Partition ๋Œ€์ƒ Join
    • ๊ต‰์žฅํžˆ ํฐ explode ์ž‘์—…
    • ํฐ ํŒŒํ‹ฐ์…˜(spark.sql.files.maxPartitionBytes)์ด๋ผ๋ฉด ์œ„์˜ ์ž‘์—…์‹œ Spill๊ฐ€๋Šฅ์„ฑ์ด ๋” ๋†’์•„์ง„๋‹ค

Spill์˜ ์ข…๋ฅ˜

  • Spill(memory)

    • ๋””์Šคํฌ๋กœ Spill๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ์„ ๋•Œ์˜ ํฌ๊ธฐ
    • Deserialized ํ˜•ํƒœ๋ผ์„œ ํฌ๊ธฐ๊ฐ€ ๋ณดํ†ต 8~10๋ฐฐ ์ •๋„ ๋” ํฌ๋‹ค.
  • Spill(disk)

    • ๋ฉ”๋ชจ๋ฆฌ์—์„œ spill๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ๋””์Šคํฌ์—์„œ ์ฐจ์ง€ํ•˜๋Š” ํฌ๊ธฐ
    • Serialized ํ˜•ํƒœ๋ผ์„œ ๋ณดํ†ต ํฌ๊ธฐ๊ฐ€ ํ›จ์”ฌ ๋” ์ž‘๋‹ค
  • ๊ฒฐ๊ตญ Spill(memory)์™€ Spill(disk)๋Š” ๊ฐ™์€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋Š”๋ฐ ํ•˜๋‚˜๋Š” ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ์„ ๋•Œ์˜ ํฌ๊ธฐ์ด๊ณ  ๋‹ค๋ฅธ ํ•˜๋‚˜๋Š” ๋””์Šคํฌ์— ์žˆ์„ ๋•Œ์˜ ํฌ๊ธฐ์ด๋‹ค

  • ๋ณดํ†ต ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” Task๋“ค์€ Spill์ด ์žˆ๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€