๊ธฐ๋ณธ์ ์ผ๋ก๋ ํํฐ์
์ ์ต๋ ํฌ๊ธฐ์ ์ํด ๊ฒฐ์ ๋๋ค.
-> spark.sql.files.maxpartitionbytes(128MB)
๊ฒฐ๊ตญ ํด๋น ๋ฐ์ดํฐ๊ฐ ์ด๋ป๊ฒ ์ ์ฅ๋์๋์ง์ ์ฐ๊ด์ด ๋ง๋ค.

bucketBy๋ก ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋ ๊ฒฝ์ฐ
์ฝ์ด๋ค์ด๋ ๋ฐ์ดํฐ ํ์ผ์ด splittableํ์ง?
์
๋ ฅ ๋ฐ์ดํฐ์ ์ ์ฒด ํฌ๊ธฐ(๋ชจ๋ ํ์ผํฌ๊ธฐ์ ํฉ)
-> ์
๋ ฅ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ์ฑํ๋ ํ์ผ์ ์
๋ด๊ฐ ๋ฆฌ์์ค ๋งค๋์ ์๊ฒ ์์ฒญํ CPU์ ์
-> executor์ ์ X executor ๋ณ CPU ์
๋จผ์ ์๋ ๊ณต์์ผ๋ก maxSplitBytes๋ฅผ ๊ฒฐ์ ํ๋ค.
์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ์ฑํ๋ ๊ฐ ํ์ผ์ ๋ํ์ฌ ๋ค์์ ์งํํ๋ค.
๋ค์์ผ๋ก ์์์ ๋ง๋ค์ด์ง๋ File Chunk๋ค๋ก ๋ถํฐ ํํฐ์ ์ ์์ฑํ๋ค.
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Lang = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.fileMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partitionedFiles.foreach { fiel =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
๋ฐ์ดํฐ๊ฐ ์์ฃผ ์ฌ์ฉ๋๋ ์ปฌ๋ผ ๊ธฐ์ค์ผ๋ก ๋ฏธ๋ฆฌ ์ ์ฅํด๋๊ณ ํ์ฉํ๋ค.
๋ค์ํ ์ต์ ํ๊ฐ ๊ฐ๋ฅํ๋ค
Bucket ์ ๋ณด๊ฐ Metastore์ ์ ์ฅ๋๊ณ Spark Compiler๋ ์ด๋ฅผ ํ์ฉํ๋ค.
Bucket์ ์ X Partition์ ์ ๋งํผ์ ํ์ผ์ด ๋ง๋ค์ด์ง๋ค
๋ค์ ์ฝ์ด๋ค์ผ ๋ ์๋ Partition์ ์๋งํผ์ผ๋ก ์ฌ๊ตฌ์ฑ๋๋ค.
Bucketingํค๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์์ ์ Shuffle์ด ์์ด์ง๋ค.
์ ์์ ํฌ๊ธฐ์ ๋ง์ ํ์ผ์ด ๋ฌธ์ ๊ฐ ๋๋๊ฐ?
์ฝ์ด ๋ค์ด๋ฉด์ Partition์ ์๋ฅผ ์ค์ผ ์ ์์ง๋ง ์ค๋ฒํค๋๊ฐ ํฌ๋ค.
-> ํ์ผ๋ก ์ธ ๋ ์ด๋ ์ ๋ ์ ๋ฆฌ๋ฅด ํด์ฃผ๋ ๊ฒ์ด ํ์ํ๋ค.

3๊ฐ์ง ๋ฐฉ์์ด ์กด์ฌํ๋ค.
ํ์ผ ํฌ๊ธฐ๋ฅผ ๋ ์ฝ๋ ์๋ก๋ ์ ์ด ๊ฐ๋ฅํ๋ค.
bucketBy๋ partitionBy๋ฅผ ์ฌ์ฉํ์ง ์๋ ๊ฒฝ์ฐ
์ ๋นํ ํฌ๊ธฐ์ ์์ ํํฐ์ ์ ์ฐพ๋ ๊ฒ์ด ์ค์ํ๋ค.
Repartition ํน์ coalesce๋ฅผ ์ ์ ํ๊ฒ ์ฌ์ฉํ๋ค.
-> ์ด ๊ฒฝ์ฐ AQE์ Coalescing์ด ๋์๋ ์ ์๋ค. -> repartition
PARQUET ํฌ๋งท ์ฌ์ฉ
-> Snappy compression ์ฌ์ฉ
๋ฐ์ดํฐ ํน์ฑ์ ์ ์๋ ๊ฒฝ์ฐ ํน์ ID๋ฅผ ๊ธฐ์ค์ผ๋ก ๋๋ ์ ํ ์ด๋ธ๋ก ์ ์ฅํ๋ค.
bucket์ ์์ ํค๋ฅผ ์ง์ ํด์ผํ๋ค.
CREATE TABLE bucketed_table(
id INT,
name STRING,
age INT
)
USING PARQUET
CLUSTERED BY (id)
INTO 4 BUCKETS;
๊ต์ฅํ ํฐ ๋ก๊ทธ ํ์ผ์ ๋ฐ์ดํฐ ์์ฑ ์๊ฐ ๊ธฐ๋ฐ์ผ๋ก ๋ฐ์ดํฐ ์ฝ๊ธฐ๋ฅผ ๋ง์ด ํ๋ค๋ฉด?
๋ฐ์ดํฐ ์์ฒด๋ฅผ ์ฐ-์-์ผ์ ํด๋ ๊ตฌ์กฐ๋ก ์ ์ฅํ๋ค.
ํ์ง๋ง Partition key๋ฅผ ์๋ชป ์ ํํ๋ฉด ์์ฒญ๋๊ฒ ๋ง์ ํ์ผ๋ค์ด ์์ฑ๋๋ค.
-> Cardinality๊ฐ ๋์ ์ปฌ๋ผ์ ํค๋ก ์ฌ์ฉํ๋ฉด ์๋๋ค.
partitioningํ ํค๋ฅผ ์ง์ ํด์ผ ํ๋ค.
CREATE TABLE partitioned_table (
id INT,
value STRING
)
USING parquet
PARTITIONED BY (id);
df.write.mode("overwrite").partitionBy("dept").bucketBy(5, "employeeid")
spark.sql("DESCRIBE EXTENDED appl_stock").show()