๐Ÿ“’ Spark(21)

Kimdongkiยท2024๋…„ 7์›” 10์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
21/22

๐Ÿ“Œ All about Partitions

์„ธ ์ข…๋ฅ˜์˜ Partition(Life cycle of Partitions)

  1. ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•  ๋•Œ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ
  2. ์…”ํ”Œ๋ง ํ›„ ๋งŒ๋“ค์–ด์ง€๋Š” ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ
  3. ๋ฐ์ดํ„ฐ๋ฅผ ์ตœ์ข…์ ์œผ๋กœ ์ €์žฅํ•  ๋•Œ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ
  • ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋Š” 128MB - 1GM๊ฐ€ ์ข‹๋‹ค.

1. ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•  ๋•Œ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ

  • ๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” ํŒŒํ‹ฐ์…˜์˜ ์ตœ๋Œ€ ํฌ๊ธฐ์— ์˜ํ•ด ๊ฒฐ์ •๋œ๋‹ค.
    -> spark.sql.files.maxpartitionbytes(128MB)

  • ๊ฒฐ๊ตญ ํ•ด๋‹น ๋ฐ์ดํ„ฐ๊ฐ€ ์–ด๋–ป๊ฒŒ ์ €์žฅ๋˜์—ˆ๋Š”์ง€์™€ ์—ฐ๊ด€์ด ๋งŽ๋‹ค.

    • ํŒŒ์ผ ํฌ๋งท์ด ๋ฌด์—‡์ด๊ณ  ์••์ถ•๋˜์—ˆ๋Š”์ง€? ์••์ถ•๋˜์—ˆ๋‹ค๋ฉด ์–ด๋–ค ์•Œ๊ณ ๋ฆฌ์ฆ˜์ธ์ง€?
      -> ๊ฒฐ๊ตญ Splittableํ•œ๊ฐ€ -> ํ•œ ํฐ ํŒŒ์ผ์„ ๋‹ค์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ  ๋กœ๋“œํ•  ์ˆ˜ ์žˆ๋Š”๊ฐ€?
    • ๊ธฐํƒ€ ๊ด€๋ จ Spark ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋“ค

์ž…๋ ฅ ๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ๋ฅผ ๊ฒฐ์ •ํ•ด์ฃผ๋Š” ๋ณ€์ˆ˜๋“ค

  • bucketBy๋กœ ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š” ๊ฒฝ์šฐ

    • Bucket์˜ ์ˆ˜์™€ Bucket ๊ธฐ์ค€ ์ปฌ๋Ÿผ๋“ค๊ณผ ์ •๋ ฌ ๊ธฐ์ค€ ์ปฌ๋Ÿผ๋“ค
  • ์ฝ์–ด๋“ค์ด๋Š” ๋ฐ์ดํ„ฐ ํŒŒ์ผ์ด splittableํ•œ์ง€?

    • PARQUET/AVRO๋“ฑ์ด ์ข‹์€ ์ด์œ ๊ฐ€ ๋œ๋‹ค. : ํ•ญ์ƒ splittable
    • JSON/CSV๋“ฑ์˜ ๊ฒฝ์šฐ ํ•œ ๋ ˆ์ฝ”๋“œ๊ฐ€ multi-line์ด๋ผ๋ฉด splittableํ•˜์ง€ ์•Š๋‹ค.
      -> Single line์ด๋ผ๋„ ์••์ถ•์‹œ bzip2๋ฅผ ์‚ฌ์šฉํ•ด์•ผ๋งŒ splittable
  • ์ž…๋ ฅ ๋ฐ์ดํ„ฐ์˜ ์ „์ฒด ํฌ๊ธฐ(๋ชจ๋“  ํŒŒ์ผํฌ๊ธฐ์˜ ํ•ฉ)
    -> ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ์„ฑํ•˜๋Š” ํŒŒ์ผ์˜ ์ˆ˜

  • ๋‚ด๊ฐ€ ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €์—๊ฒŒ ์š”์ฒญํ•œ CPU์˜ ์ˆ˜
    -> executor์˜ ์ˆ˜ X executor ๋ณ„ CPU ์ˆ˜

์ž…๋ ฅ ๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ ๊ฒฐ์ •๋ฐฉ์‹

  • ๋จผ์ € ์•„๋ž˜ ๊ณต์‹์œผ๋กœ maxSplitBytes๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค.

    • bytesPerCore = (Full size of DataFiles + Files Num * OpenCostInBytes) / default.parallelism
    • maxSplitBytes = Min(maxPartitionBytes, bytesPerCore)
      -> maxSplitBytes = Min(maxPartitionBytes, Max(bytesPerCore, OpenCostInBytes))
  • ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ์„ฑํ•˜๋Š” ๊ฐ ํŒŒ์ผ์— ๋Œ€ํ•˜์—ฌ ๋‹ค์Œ์„ ์ง„ํ–‰ํ•œ๋‹ค.

    • Splittableํ•˜๋‹ค๋ฉด MaxSplitBytes ๋‹จ์œ„๋กœ ๋ถ„ํ• ํ•˜์—ฌ File Chunk๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
    • Splittableํ•˜์ง€ ์•Š๊ฑฐ๋‚˜ ํฌ๊ธฐ๊ฐ€ maxSplitBytes๋ณด๋‹ค ์ž‘๋‹ค๋ฉด ํ•˜๋‚˜์˜ File chunk๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
  • ๋‹ค์Œ์œผ๋กœ ์œ„์—์„œ ๋งŒ๋“ค์–ด์ง€๋Š” File Chunk๋“ค๋กœ ๋ถ€ํ„ฐ ํŒŒํ‹ฐ์…˜์„ ์ƒ์„ฑํ•œ๋‹ค.

    • ๊ธฐ๋ณธ์ ์œผ๋กœ ํ•œ ํŒŒํ‹ฐ์…˜์€ ํ•˜๋‚˜ ํ˜น์€ ๊ทธ ์ด์ƒ์˜ File Chunk๋“ค๋กœ ๊ตฌ์„ฑ๋˜์–ด์žˆ๋‹ค.
    • ํ•œ ํŒŒํ‹ฐ์…˜์— ๋‹ค์Œ File Chunk์˜ ํฌ๊ธฐ + OpenConstInBytes๋ฅผ ๋”ํ–ˆ์„ ๋•Œ ์ด ๊ฐ’์ด maxSplitBytes๋ฅผ ๋„˜์–ด๊ฐ€์ง€ ์•Š์„ ๋•Œ๊นŒ์ง€ ๊ณ„์†ํ•ด์„œ Mergeํ•œ๋‹ค.
      -> ํŒŒ์ผ๋“ค์ด ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ํŒจํ‚น๋œ๋‹ค.

Spark Scala ์ฝ”๋“œ : maxSplitBytes

def maxSplitBytes(
	sparkSession: SparkSession,
    selectedPartitions: Seq[PartitionDirectory]): Lang = {
  val defaultMaxSplitBytes = sparkSession.sessionState.conf.fileMaxPartitionBytes
  val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
  val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
    .getOrElse(sparkSession.leafNodeDefaultParallelism)
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / minPartitionNum
  
  Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

Spark Scala ์ฝ”๋“œ : ํŒŒํ‹ฐ์…˜ ์ƒ์„ฑ ์ฝ”๋“œ

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partitionedFiles.foreach { fiel =>
  if (currentSize + file.length > maxSplitBytes) {
    closePartition()
  }
  // Add the given file to the current partition.
  currentSize += file.length + openCostInBytes
  currentFiles += file
}
closePartition()

์ž…๋ ฅ ๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ ๊ฒฐ์ • ๋ฐฉ์‹ ์˜ˆ

  • ํŒŒ์ผ ์ˆ˜ : 50
  • ํŒŒ์ผ ํฌ๋งท : PARQUET
  • ํŒŒ์ผ ํฌ๊ธฐ : 65MB
  • Spark Application์— ํ• ๋‹น๋œ ์ฝ”์–ด ์ˆ˜ : 10
  • spark.sql.files.maxPartitionBytes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB
  • bytesPerCore = (50 * 65MB + 50 * 4MB) / 10 = 345MB
  • maxSplitBytes = Min(128MB, 345MB) = 128MB
  • ์ตœ์ข… ํŒŒํ‹ฐ์…˜ ์ˆ˜ : 50

Bucketing (bucketBy) ์†Œ๊ฐœ

  • ๋ฐ์ดํ„ฐ๊ฐ€ ์ž์ฃผ ์‚ฌ์šฉ๋˜๋Š” ์ปฌ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ๋ฏธ๋ฆฌ ์ €์žฅํ•ด๋‘๊ณ  ํ™œ์šฉํ•œ๋‹ค.

  • ๋‹ค์–‘ํ•œ ์ตœ์ ํ™”๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค

    • ์กฐ์ธ๋Œ€์ƒ ํ…Œ์ด๋ธ”๋“ค์ด ์กฐ์ธ ํ‚ค๋ฅผ ๊ฐ€์ง€๊ณ  bucketing๋œ ๊ฒฝ์šฐ shuffle free join์ด ๊ฐ€๋Šฅํ•˜๋‹ค
    • ํ•œ ์ชฝ๋งŒ bucketing๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ one-side shuffle free join์ด ๊ฐ€๋Šฅํ•˜๋‹ค.(bucket์˜ ํฌ๊ธฐ์— ๋‹ฌ๋ ธ๋‹ค.)
    • Bucket Pruning์„ ํ†ตํ•œ ์ตœ์ ํ™”๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.
    • Shuffle Free Aggregation
  • Bucket ์ •๋ณด๊ฐ€ Metastore์— ์ €์žฅ๋˜๊ณ  Spark Compiler๋Š” ์ด๋ฅผ ํ™œ์šฉํ•œ๋‹ค.

    • ์ด๊ฒฝ์šฐ sortBy๋ฅผ ํ†ตํ•˜์—ฌ ์ˆœ์„œ๋ฅผ ๋ฏธ๋ฆฌ ์ •ํ•ด์ฃผ๊ธฐ๋„ ํ•œ๋‹ค.
    • Spark ํ…Œ์ด๋ธ”๋กœ ์ €์žฅํ•˜๊ณ  ๋กœ๋”ฉํ•ด์•ผ์ง€๋งŒ ์ด ์ •๋ณด๋ฅผ ์ด์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
      -> saveAsTable, spark.table()

Bucketing (bucketBy) ์ €์žฅ ๋ฐฉ์‹

  • Bucket์˜ ์ˆ˜ X Partition์˜ ์ˆ˜ ๋งŒํผ์˜ ํŒŒ์ผ์ด ๋งŒ๋“ค์–ด์ง„๋‹ค

    • ex) DataFrame์˜ Partition์ˆ˜๊ฐ€ 10์ด๊ณ  Bucket์˜ ์ˆ˜๊ฐ€ 4๋ผ๋ฉด 40๊ฐœ์˜ ํŒŒ์ผ์„ ์ƒ์„ฑํ•œ๋‹ค
    • ๋‹ค์‹œ ์ฝ์–ด๋“ค์ผ ๋•Œ 10๊ฐœ์˜ Partition์œผ๋กœ ์ฝํ˜€์ง„๋‹ค
  • ๋‹ค์‹œ ์ฝ์–ด๋“ค์ผ ๋•Œ ์›๋ž˜ Partition์˜ ์ˆ˜๋งŒํผ์œผ๋กœ ์žฌ๊ตฌ์„ฑ๋œ๋‹ค.

  • Bucketingํ‚ค๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์ž‘์—…์‹œ Shuffle์ด ์—†์–ด์ง„๋‹ค.

Small Files ์‹ ๋“œ๋กฌ

  • ์™œ ์ž‘์€ ํฌ๊ธฐ์˜ ๋งŽ์€ ํŒŒ์ผ์ด ๋ฌธ์ œ๊ฐ€ ๋˜๋Š”๊ฐ€?

    • 64MB์˜ ํŒŒ์ผ ํ•˜๋‚˜๋ฅผ ์ฝ๋Š”๊ฒƒ vs. 64Byte์˜ ํŒŒ์ผ ๋ฐฑ๋งŒ๊ฐœ๋ฅผ ์ฝ๋Š”๊ฒƒ
    • ์ด API์ฝœ์€ ๋ชจ๋‘ ๋„คํŠธ์›Œํฌ RPC์ฝœ์ด๋‹ค.
    • ํŒŒ์ผ ์‹œ์Šคํ…œ ์ ‘๊ทผ ๊ด€๋ จ ์˜ค๋ฒ„ํ—ค๋“œ
      • ํŒŒ์ผ ํ•˜๋‚˜๋ฅผ ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋‹ค์ˆ˜์˜ API์ฝœ์ด ํ•„์š”ํ•˜๋‹ค.
      • ๊ทธ๋ž˜์„œ ์•ž์„œ openCostInBytes๋ผ๋Š” ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ๊ฐ ํŒŒ์ผ๋งˆ๋‹ค ๋ถ€์—ฌ๋œ๋‹ค.
  • ์ฝ์–ด ๋“ค์ด๋ฉด์„œ Partition์˜ ์ˆ˜๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ์ง€๋งŒ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ํฌ๋‹ค.
    -> ํŒŒ์ผ๋กœ ์“ธ ๋•Œ ์–ด๋А ์ •๋„ ์ •๋ฆฌ๋ฅด ํ•ด์ฃผ๋Š” ๊ฒƒ์ด ํ•„์š”ํ•˜๋‹ค.

JSON์œผ๋กœ ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ 3๊ฐ€์ง€ ์ฝ์–ด๋ณด๊ธฐ

  • ํŒŒ์ผ ์ˆ˜ : 100
  • ํŒŒ์ผ ํฌ๋งท : JSON
    -> ์••์ถ• ์—ฌ๋ถ€ : No, Yes(gzip), Yes(bzip2)
  • ๊ฐ ํŒŒ์ผ์˜ ํฌ๊ธฐ
    -> 10.5MB, 955KB, 481KB
  • Spark Application์— ํ• ๋‹น๋œ ์ฝ”์–ด ์ˆ˜ : 10
  • spark.sql.files.maxPartitionByutes : 128MB
  • spark.default.parallelism : 10
  • spark.sql.files.openCostInBytes : 4MB

2. ์…”ํ”Œ๋ง ํ›„ ๋งŒ๋“ค์–ด์ง€๋Š” ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ

  • spark.sqsl.shuffle.partitions
  • AQE๋ฅผ ๊ผญ ์‚ฌ์šฉํ•œ๋‹ค

3. ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•  ๋•Œ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ํฌ๊ธฐ

  • 3๊ฐ€์ง€ ๋ฐฉ์‹์ด ์กด์žฌํ•œ๋‹ค.

    • default
    • bucketBy
    • partitionBy
  • ํŒŒ์ผ ํฌ๊ธฐ๋ฅผ ๋ ˆ์ฝ”๋“œ ์ˆ˜๋กœ๋„ ์ œ์–ด ๊ฐ€๋Šฅํ•˜๋‹ค.

    • spark.sql.files.maxRecordsPerFile
      -> ๊ธฐ๋ณธ๊ฐ’์€ 0์ด๋ฉฐ ๋ ˆ์ฝ”๋“œ์ˆ˜๋กœ ์ œ์•ฝํ•˜์ง€ ์•Š๊ณ˜๋‹ค๋Š” ์˜๋ฏธ์ด๋‹ค

3-1. Default

  • bucketBy๋‚˜ partitionBy๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ

    • ๊ฐ ํŒŒํ‹ฐ์…˜์ด ํ•˜๋‚˜์˜ ํŒŒ์ผ๋กœ ์“ฐ์—ฌ์ง„๋‹ค.
    • saveAsTable vs. save
  • ์ ๋‹นํ•œ ํฌ๊ธฐ์™€ ์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์„ ์ฐพ๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•˜๋‹ค.

    • ์ž‘์€ ํฌ๊ธฐ์˜ ๋‹ค์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์ด ์žˆ๋‹ค๋ฉด ๋ฌธ์ œ๊ฐ€๋œ๋‹ค.
    • ํฐ ํฌ๊ธฐ์˜ ์†Œ์ˆ˜์˜ ํŒŒํ‹ฐ์…˜๋„ ๋ฌธ์ œ๋‹ค.
      -> splittableํ•˜์ง€ ์•Š์€ ํฌ๋งท์œผ๋กœ ์ €์žฅ๋  ๊ฒฝ์šฐ
  • Repartition ํ˜น์€ coalesce๋ฅผ ์ ์ ˆํ•˜๊ฒŒ ์‚ฌ์šฉํ•œ๋‹ค.
    -> ์ด ๊ฒฝ์šฐ AQE์˜ Coalescing์ด ๋„์›€๋  ์ˆ˜ ์žˆ๋‹ค. -> repartition

  • PARQUET ํฌ๋งท ์‚ฌ์šฉ
    -> Snappy compression ์‚ฌ์šฉ

3-2. bucketBy

  • ๋ฐ์ดํ„ฐ ํŠน์„ฑ์„ ์ž˜ ์•„๋Š” ๊ฒฝ์šฐ ํŠน์ • ID๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋‚˜๋ˆ ์„œ ํ…Œ์ด๋ธ”๋กœ ์ €์žฅํ•œ๋‹ค.

    • ๋‹ค์Œ๋ถ€ํ„ฐ๋Š” ์ด๋ฅผ ๋กœ๋”ฉํ•˜์—ฌ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ ๋ฐ˜๋ณต ์ฒ˜๋ฆฌ์‹œ ์‹œ๊ฐ„์„ ๋‹จ์ถ•ํ•œ๋‹ค.
      ->Bucket์˜ ์ˆ˜์™€ ๊ธฐ์ค€ ID๋ฅผ ์ง€์ •ํ•œ๋‹ค.
    • ๋ฐ์ดํ„ฐ์˜ ํŠน์„ฑ์„ ์ž˜ ์•Œ๊ณ  ์žˆ๋Š” ๊ฒฝ์šฐ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • bucket์˜ ์ˆ˜์™€ ํ‚ค๋ฅผ ์ง€์ •ํ•ด์•ผํ•œ๋‹ค.

    • df.write.mode("overwrite").bucketBy(3, key).saveAsTable(table)
    • sortBy๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ˆœ์„œ๋ฅผ ์ •ํ•˜๊ธฐ๋„ ํ•œ๋‹ค.
    • ์ด ์ •๋ณด๋Š” MetaStore์— ๊ฐ™์ด ์ €์žฅ๋œ๋‹ค.
CREATE TABLE bucketed_table(
	id INT,
    name STRING,
    age INT
)
USING PARQUET
CLUSTERED BY (id)
INTO 4 BUCKETS;

3-3. partitionBy

  • ๊ต‰์žฅํžˆ ํฐ ๋กœ๊ทธ ํŒŒ์ผ์„ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜์œผ๋กœ ๋ฐ์ดํ„ฐ ์ฝ๊ธฐ๋ฅผ ๋งŽ์ด ํ•œ๋‹ค๋ฉด?

    • ๋ฐ์ดํ„ฐ ์ž์ฒด๋ฅผ ์—ฐ-์›”-์ผ์˜ ํด๋” ๊ตฌ์กฐ๋กœ ์ €์žฅํ•œ๋‹ค.

      • ์ด๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ์˜ ์ฝ๊ธฐ ๊ณผ์ •์„ ์ตœ์ ํ™”ํ•œ๋‹ค.(์Šค์บ๋‹ ๊ณผ์ •์ด ์ค„์–ด๋“ค๊ฑฐ๋‚˜ ์—†์–ด์ง„๋‹ค.)
      • ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ๋„ ์‰ฌ์›Œ์ง„๋‹ค.(Retention Policy ์ ์šฉ์‹œ)
    • ํ•˜์ง€๋งŒ Partition key๋ฅผ ์ž˜๋ชป ์„ ํƒํ•˜๋ฉด ์—„์ฒญ๋‚˜๊ฒŒ ๋งŽ์€ ํŒŒ์ผ๋“ค์ด ์ƒ์„ฑ๋œ๋‹ค.
      -> Cardinality๊ฐ€ ๋†’์€ ์ปฌ๋Ÿผ์„ ํ‚ค๋กœ ์‚ฌ์šฉํ•˜๋ฉด ์•ˆ๋œ๋‹ค.

  • partitioningํ•  ํ‚ค๋ฅผ ์ง€์ •ํ•ด์•ผ ํ•œ๋‹ค.

    • df.write.mode("overwrite").partitionBy("order_month").saveAsTable("order")
    • df.write.mode("overwrite").partitionBy("year", "month", "day").saveAsTable("appl_stock")
CREATE TABLE partitioned_table (
   id INT,
	value STRING
)
USING parquet
PARTITIONED BY (id);

bucketBy & partitionBy

  • partitionByํ›„์— bucketBy๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
  • ํ•„ํ„ฐ๋ง ํŒจํ„ด ๊ธฐ์ค€ partitionByํ›„ ๊ทธ๋ฃนํ•‘/์กฐ์ธ ํŒจํ„ด ๊ธฐ์ค€ bucketBy๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
df.write.mode("overwrite").partitionBy("dept").bucketBy(5, "employeeid")

Spark Persistent ํ…Œ์ด๋ธ”์˜ ํ†ต๊ณ„ ์ •๋ณด ํ™•์ธ

  • spark.sql("DESCRIBE EXTENDED Table_Name")
    -> ์ด๋ฅผ ํ†ตํ•ด bucket/partition ํ…Œ์ด๋ธ” ์ •๋ณด๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค
spark.sql("DESCRIBE EXTENDED appl_stock").show()

0๊ฐœ์˜ ๋Œ“๊ธ€