์ ํ์ํ๊ฐ?
AQE์ ํด๋ฒ
spark.sql.adaptive.skewJoin.skewedPartitionFactor
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes



| ํ๊ฒฝ ๋ณ์ ์ด๋ฆ | ๊ธฐ๋ณธ ๊ฐ | ์ค๋ช |
|---|---|---|
| spark.sql.adaptive.skewJoin.enabled | True | ์ด ๊ฐ๊ณผ spark.sql.adpative.enabled๊ฐ true์ธ ๊ฒฝ์ฐ Sort Merge Join ์ํ์ skew๋ ํํฐ์ ๋ค์ ๋จผ์ ํฌ๊ธฐ๋ฅผ ์ค์ด๊ณ ๋ ๋ค์์ ์กฐ์ธ์ ์ํํ๋ค. |
| spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | ํํฐ์ ์ ํฌ๊ธฐ๊ฐ ์ค๊ฐ ํํฐ์ ํฌ๊ธฐ๋ณด๋ค 5๋ฐฐ ์ด์ ํฌ๊ณ spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes์ ๊ฐ๋ณด๋ค ํฌ๋ฉด skew ํํฐ์ ์ผ๋ก ๊ฐ์ฃผํ๋ค. |
| spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | ํํฐ์ ์ ํฌ๊ธฐ๊ฐ ์ด ๊ฐ๋ณด๋ค ํฌ๊ณ ์ค๊ฐ ํํฐ์ ํฌ๊ธฐ๋ณด๋ค 5๋ฐฐ์ด์ ํฌ๋ฉด skewํํฐ์ ์ผ๋ก ๊ฐ์ฃผํ๋ค. ์ด ๊ฐ์ spark.sql.adaptive.advisoryPartitionSizeInBytes๋ณด๋ค ์ปค์ผํ๋ค. |
CREATE TABLE items
USING parquet
AS
SELECT id,
CAST(rand()* 1000 AS INT) AS price
FROM RANGE(30000000);
CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand()* 30000000 AS INT) END AS item_id,
CAST(rand()* 100 AS INT) AS quantity,
DATE_ADD(current_date(), - CAST(rand()* 360 AS INT)) AS date
FROM RANGE(1000000000);
pyspark --driver-memory 2g --executor-memory 2g
SELECT date, sum(quantity) AS q
FROM sales
GROUP BY 1
ORDER BY 2 DESC;
pyspark --driver-memory 2g --executor-memory 2g
SELECT date, sum(quantity * price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
WHERE price < 10
GROUP BY 1
ORDER BY 2 DESC;
pyspark --driver-memory 4g --executor-memory 8g
SELECT date, sum(quantity * price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
| ํ๊ฒฝ ๋ณ์ ์ด๋ฆ | ๊ธฐ๋ณธ๊ฐ | ์ค๋ช |
|---|---|---|
| spark.sql.adaptive.enalbed | True | Spark 3.2๋ถํฐ ๊ธฐ๋ณธ์ผ๋ก ์ ์ฉ๋๊ธฐ ์์ํ๋ค.(๋์ ์ 1.6) |
| spark.sql.shuffle.partitions | 200 | Shuffle ํ ๋ง๋ค์ด์ง ํํฐ์ ์ ๊ธฐ๋ณธ ์ |
| spark.sql.files.maxPartitionBytes | 128MB | ํ์ผ ๊ธฐ๋ฐ ๋ฐ์ดํฐ ์์ค(Parquet, JSON, ORC ๋ฑ๋ฑ)์์ ๋ฐ์ดํฐํ๋ ์์ ๋ง๋ค ๋ ํ ํํฐ์ ์ ์ต๋ ํฌ๊ธฐ |
| spark.sql.files.openCostInBytes | 4MB | ์ด ์ญ์ ํ์ผ ๊ธฐ๋ฐ ๋ฐ์ดํฐ์์ค์์๋ง ์๋ฏธ๊ฐ ์์ผ๋ฉฐ ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ธฐ ์ํด ๋ช ๊ฐ์ ํํฐ์ ์ ์ฌ์ฉํ ์ง ๊ฒฐ์ ํ๋๋ฐ ์ฌ์ฉ๋๋ค. ๋๋ถ๋ถ์ ๊ฒฝ์ฐ ๋ณ ์๋ฏธ ์๋ ๋ณ์์ด์ง๋ง ๊ธฐ๋ณธ์ ์ผ๋ก ์ฝ์ด๋ณ ์ ๋ ฅ ๋ฐ์ดํฐ ํฌ๊ธฐ๋ฅผ ๊ณ์ฐํ์ ๋ ์ด ๋ณ์์ ๊ฐ๋ณด๋ค ์์ ๊ฒฝ์ฐ ํํฐ์ ์ ํฌ๊ธฐ๋ ์ด ํฌ๊ธฐ๋ก ๋ง์ถฐ์ ๋ง๋ค์ด์ง๋ค. ์์ธํ ์ค๋ช |
| spark.sql.files.minPartitionNum | Default parallelism | ์ด ์ญ์ ํ์ผ ๊ธฐ๋ฐ ๋ฐ์ดํฐ์์ค์์๋ง ์๋ฏธ๊ฐ ์์ผ๋ฉฐ ๋ณด์ฅ๋์ง๋ ์์ง๋ง ํ์ผ์ ์ฝ์ด๋ค์ผ ๋ ์ต์ ๋ช ๊ฐ์ ํํฐ์ ์ ์ฌ์ฉํ ์ง ๊ฒฐ์ ํ๋ ๋ณ์์ด๋ค. ์ค์ ๋์ด ์์ง ์๋ค๋ฉด ๊ธฐ๋ณธ๊ฐ์ spark.default.parallelism๊ณผ ๋์ผํ๋ค. |
| spark.default.parallelism | RDD๋ฅผ ์ง์ ์กฐ์ํ๋ ๊ฒฝ์ฐ ์๋ฏธ๊ฐ ์๋ ๋ณ์๋ก shuffling ํ ํํฐ์ ์ ์๋ฅผ ๊ฒฐ์ ํ๋ค. ๊ฐ๋จํ๊ฒ ํด๋ฌ์คํฐ ๋ด ์ด ์ฝ์ด์ ์ ๋ผ๊ณ ํ ์ ์๋ค. |
SELECT item_id, COUNT(1)
FROM sales
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;

SELECT item_id, SUM(cnt)
FROM(
SELECT item_id, salt, COUNT(1) cnt
FROM (
SELECT FLOOR(RAND()* 200) salt, item_id
FROM sales
)
GROUP BY 1, 2
)
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;

SELECT date, sum(quantity*price) AS total_sales
FROM sales s
JOIN items i ON s.item_id = i.id
GROUP BY 1
ORDER BY 2 DESC;
SELECT date, sum(quantity*price) AS total_sales
FROM (
SELECT *, FLOOR(RAND()* 20) AS salt
FROM sales
) s
JOIN(
SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19) AS salt
FROM items
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC;
SELECT date, sum(quantity * price) AS total_sales
FROM (
SELECT *, CASE WHEN item_id = 100 THEN FLOOR(RAND()*20) ELSE 1 END AS salt
FROM sales
) s
JOIN (
SELECT *, EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) AS salt
FROM items
WHERE id = 100
UNION
SELECT *, 1 AS salt
FROM items
WHERE id <> 100
) i ON s.item_id = i.id and s.salt = i.salt
GROUP BY 1
ORDER BY 2 DESC
ํํฐ์ ์ ํฌ๊ธฐ๊ฐ ๋๋ฌด ์ปค์ ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ถ์กฑํ ๊ฒฝ์ฐ ๊ทธ ์ฌ๋ถ์ ๋์คํฌ์ ์ฐ๋ ๊ฒ์ด๋ค.
Spill์ด ๋ฐ์ํ๋ ๊ฒฝ์ฐ -> ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ถ์กฑํด์ง๋ ๊ฒฝ์ฐ
Spill(memory)
Spill(disk)
๊ฒฐ๊ตญ Spill(memory)์ Spill(disk)๋ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๋ฆฌํค๋๋ฐ ํ๋๋ ๋ฉ๋ชจ๋ฆฌ์ ์์ ๋์ ํฌ๊ธฐ์ด๊ณ ๋ค๋ฅธ ํ๋๋ ๋์คํฌ์ ์์ ๋์ ํฌ๊ธฐ์ด๋ค
๋ณดํต ์ค๋ ๊ฑธ๋ฆฌ๋ Task๋ค์ Spill์ด ์๋ค.
