๐Ÿ“’ Spark(16)

Kimdongkiยท2024๋…„ 7์›” 9์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
16/22

๐Ÿ“Œ Driver & Executor

Driver์˜ ์—ญํ• 

  • Spark Application = (1 Driver) + (1 + Executor)
  • Driver๋Š” ๋‹ค์Œ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
    • main ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•˜๊ณ  SparkSession/SparkContext๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
    • ์ฝ”๋“œ๋ฅผ Task๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ DAG๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
    • ์ด๋ฅผ execution/logical/physical plan์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
    • ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €์˜ ๋„์›€์„ ๋ฐ›์•„ Task๋“ค์„ ์‹คํ–‰ํ•˜๊ณ  ๊ด€๋ฆฌํ•œ๋‹ค.
      -> Task์˜ ์ˆ˜๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์•„์ง€๋ฉด Driver ๋ฉ”๋ชจ๋ฆฌ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.
    • ์œ„ ์ •๋ณด๋“ค์„ ๋ชจ๋‘ Web UI๋กœ ํ‘œํ˜„ํ•œ๋‹ค.(Port : 4040)

Driver ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ

spark.driver.memory = 4GB
spark.driver.cores = 4
spark.driver.memoryOverhead = 0.1
# max : spark.driver.memory์˜ 10% -> 384MB

Executor ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ

spark.executor.memory = 8GB
spark.executor.cores = 4
spark.executor.memoryOverhead = 0.1
# max : spark.executor.memory์˜ 10% -> 384MB
yarn.nodemanager.resource.memory-mb

Heap ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ

  • Reserved Memory : 300MB๋กœ Spark Engine ์ „์šฉ์ด๋‹ค.(ํ•„์ˆ˜)
  • Spark Memory : ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„ ๊ด€๋ จ ์ž‘์—…๊ณผ ์บ์‹ฑ์— ์‚ฌ์šฉ๋œ๋‹ค. (8000mb - 300mb) * 0.6
  • User Memory : UDF, UDAF๋“ฑ๋“ฑ์ด ์—ฌ๊ธฐ์— ์กด์žฌํ•œ๋‹ค. (8000mb - 300mb) * 0.4

User Memory

  • User-defined data structure

  • Spark internal metadata

  • UDF

  • RDD conversion operation

  • RDD lineage and dependency

  • RDD์™€ ๊ด€๋ จ๋œ ์ž‘์—…์„ ์ง์ ‘ ํ•˜๋Š” ๊ฒฝ์šฐ ์‚ฌ์šฉํ•œ๋‹ค.

  • ์ด ๋ถ€๋ถ„์ด ์ ๋‹ค๋ฉด spark.memory.fraction์„ ์ฆ๊ฐ€์‹œํ‚ค๋ฉด ๋œ๋‹ค.

Spark Memory

spark.executor.memory = 8GB
spark.executor.cores = 4
spark.memory.storageFraction = 0.5

Executor CPU

spark.executor.memory = 8GB
spark.executor.cores = 4
  • 4 Task๊ฐ€ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • Slot๋“ค์€ ์‚ฌ์‹ค ๊ฐ™์€ JVM์•ˆ์˜ Thread๋ฅผ ๋งํ•œ๋‹ค.

Executor Resource

  • 4๊ฐœ์˜ ์Šฌ๋กฏ์€ JVM ํž™ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์–ด๋–ป๊ฒŒ ๊ณต์œ ํ•˜๋Š”๊ฐ€.

Executor Memory Pool Management

  • Static Memory Management

    • Spark 1.6์ด์ „์—๋Š” ์Šฌ๋กฏ๋“ค ๋ผ๋ฆฌ ๊ณตํ‰ํ•˜๊ฒŒ ๋‚˜๋ˆ„์–ด ๊ฐ€์กŒ๋‹ค.
  • ์ง€๊ธˆ์€ Unified Memory Manager๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

    • ๋™์ž‘์ค‘์ธ Task ๋Œ€์ƒ์œผ๋กœ Fair Allocation์ด ๊ธฐ๋ณธ ๋™์ž‘์ด๋‹ค.
      -> ์ฆ‰ ์‹คํ–‰์ค‘์ธ Task๊ฐ€ ๋ชจ๋“  ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ฐ€์ ธ๊ฐ€๋Š” ๊ตฌ์กฐ์ด๋‹ค.

Unified Memory Management

  • Executor ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•ด์ง€๊ธฐ ์‹œ์ž‘ํ•˜๋ฉด ์ƒ๊ธฐ๋Š” ์ผ

    • ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•ด์ง€๋ฉด Storage Memory Pool์— ๋‚จ๋Š” ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
    • spark.memory.storageFraction๋กœ ์ง€์ •๋œ ๋น„์œจ์€ ์‹œ์ž‘ ๋น„์œจ์ด๋‹ค.
      -> ํ•˜์ง€๋งŒ ์–‘์ชฝ์˜ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ์ฐจ๊ธฐ ์‹œ์ž‘ํ•˜๋ฉด ์ด ๊ฒฝ๊ณ„์„ ์€ ์ง€์ผœ์ง€๋ฉด์„œ eviction์ด ๋ฐœ์ƒํ•œ๋‹ค.
  • ๋ฐ˜๋Œ€๋กœ Storage๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•ด์ง€๋ฉด ์ƒ๊ธฐ๋Š” ์ผ

    • DataFrame ์บ์‹ฑ์„ ํ•˜๊ธฐ ์œ„ํ•œ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•ด์ง€๋ฉด Executor Memory Pool์— ๋‚จ๋Š” ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
    • ํ•˜์ง€๋งŒ ๊ฒฐ๊ตญ spark.memory.storageFraction ์ด์ƒ์€ ๋„˜์–ด๊ฐ€์ง€ ์•Š๋Š”๋‹ค.
  • ๋” ์ด์ƒ ์“ธ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ์—†๋‹ค๋ฉด?

    • ๋ฐ์ดํ„ฐ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์—์„œ ๋””์Šคํฌ๋กœ ์˜ฎ๊ธด๋‹ค.(Disk Spill)
    • Disk๋กœ Spill์„ ํ•  ์ˆ˜ ์—†๋‹ค๋ฉด OOM(Out of Memory)๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

Spark Executor Memory Configuration

์ด๋ฆ„DefaultExplain
spark.driver.memory1G
spark.driver.cores1
spark.executor.memoryOverheadexecutorMemory * spark.executor.memoryOverheadFactor์™€ 384์ค‘ ๋” ํฐ ๊ฐ’(MB)Non-JVM ์ž‘์—…์— ๋ถ€์—ฌ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ
spark.executor.memory1GJVM์— ๋ถ€์—ฌ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ
spark.memory.fraction0.6Spark ๋ฉ”๋ชจ๋ฆฌ : JVM์— ๋ถ€์—ฌ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ ์ค‘ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๊ด€๋ จ ๋ฉ”๋ชจ๋ฆฌ ๋น„์œจ(์บ์‹ค ํฌํ•จ)
spark.memory.storageFraction0.5Spark ๋ฉ”๋ชจ๋ฆฌ ์ค‘ ์บ์‹ฑ์— ์‚ฌ์šฉ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ์˜ ๋น„์œจ
spark.executor.cores1(YARN)๋ณดํ†ต executor๋‹น 1~5๊ฐœ์˜ CPU๋ฅผ ํ• ๋‹นํ•˜๋Š” ๊ฒƒ์ด ์ผ๋ฐ˜์ ์ด๋‹ค.
spark.memory.offHeap.enabledFasleNon-JVM ์ž‘์—…์— ๋ถ€์—ฌ๋˜๋Š” ๋ณ„๋„ ๋ฉ”๋ชจ๋ฆฌ ํ™œ์„ฑํ™” ์—ฌ๋ถ€
spark.memory.offHeap.size0offHeap์— ๋ถ€์—ฌ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ
spark.executor.pyspark.memory์—†์ŒPython ํ”„๋กœ์„ธ์Šค์— ์ง€์ •๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ, ์ง€์ •๋˜์ง€ ์•Š์œผ๋ฉด Non-JVM๋ฉ”๋ชจ๋ฆฌ(์˜ค๋ฒ„ํ—ค๋“œ ๋ฉ”๋ชจ๋ฆฌ)๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ์ง€์ •๋œ๋‹ค๋ฉด ์ด ๊ฐ’๋งŒ์„ ์‚ฌ์šฉํ•œ๋‹ค.
spark.python.worker.memory512Py4J์— ์ง€์ •๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ(MB)
  • ํ•˜๋‚˜์˜ Executor์— ํ• ๋‹น๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ
    -> spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size + spark.executor.pyspark.memory (+ spark.python.worker.memory)

Off Heap Memory

  • Spark์€ On Heap ๋ฉ”๋ชจ๋ฆฌ์—์„œ ๊ฐ€์žฅ ์ž˜ ์ž‘๋™ํ•œ๋‹ค.
  • ํ•˜์ง€๋งŒ JVM Heap์€ Garbage collection์˜ ๋Œ€์ƒ์ด ๋œ๋‹ค.
  • JVM Heap์˜ ํฌ๊ธฐ๊ฐ€ ํด์ˆ˜๋ก Garbe collection์œผ๋กœ ์ธํ•œ ๋น„์šฉ์ด ์ฆ๊ฐ€ํ•œ๋‹ค.
  • ์ด๋•Œ ๊ฐ™์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์ด JVM ๋ฐ–์— ์žˆ๋Š” ๋ฉ”๋ชจ๋ฆฌ์ด๋‹ค.
    • Overhead ๋ฉ”๋ชจ๋ฆฌ
    • Off Heap ๋ฉ”๋ชจ๋ฆฌ
      • spark.memory.offHeap.enabled๋ฅผ True๋กœ ์„ค์ •
      • spark.memory.offHeap.size์— ์›ํ•˜๋Š” ํฌ๊ธฐ๋ฅผ ์ง€์ •

Spark 3.x์˜ Off heap memory ์ •๋ฆฌ

  • Spark 3.x๋Š” Off Heap Memory ์ž‘์—…์— ์ตœ์ ํ™” ๋˜์–ด์žˆ๋‹ค.
    -> JVM ์—†์ด ์ง์ ‘ ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ ๊ฐ€๋Šฅํ•˜๋‹ค.

  • Spark 3.x๋Š” Off Heap ๋ฉ”๋ชจ๋ฆฌ๋ฅผ DataFrame์šฉ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
    -> GC์˜ ๋ฐœ์ƒ์„ ์ค„์ผ ์ˆ˜ ์žˆ๋‹ค.

  • ์ฆ‰ Off Heap ๋ฉ”๋ชจ๋ฆฌ์˜ ํฌ๊ธฐ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

    • spark.executor.memoryOverhead + spark.offHeap.size
    • spark.offHeap.size๋ฅผ ์‚ฌ์šฉํ•ด์„œ executor memory์˜ ์ฆ๊ฐ€ ์—†์ด off heap ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์ฆ๊ฐ€ ์‹œํ‚ค๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€