๐Ÿ“’ Spark(15)

Kimdongkiยท2024๋…„ 7์›” 9์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
15/22

๐Ÿ“Œ Broadcast Variable

Broadcast Variable์ด๋ž€

  • ๋ฃฉ์—… ํ…Œ์ด๋ธ”๋“ฑ์„ Broadcastingํ•˜์—ฌ Shuffling์„ ๋ง‰๋Š” ๋ฐฉ์‹์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.

    • Broadcast Join์—์„œ ์‚ฌ์šฉ๋˜๋Š” ๊ฒƒ๊ณผ ๋™์ผํ•œ ํ…Œํฌ๋‹‰์ด๋‹ค.

    • ๋Œ€๋ถ€๋ถ„ ๋ฃฉ์—… ํ…Œ์ด๋ธ”(ํ˜น์€ ๋””๋ฉ˜์…˜ ํ…Œ์ด๋ธ” - 10~20MB)์„ Executor๋กœ ์ „์†กํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•œ๋‹ค.
      -> ๋งŽ์€ DB์—์„œ ์Šคํƒ€ ์Šคํ‚ค๋งˆ ํ˜•ํƒœ๋กœ ํŒฉํŠธ ํ…Œ์ด๋ธ”๊ณผ ๋””๋ฉ˜์…˜ ํ…Œ์ด๋ธ”์„ ๋ถ„๋ฆฌํ•œ๋‹ค.

    • spark.sparkContext.broadcast๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

LookUP Table - LUT

  • ์ฃผ์–ด์ง„ ์—ฐ์‚ฐ์— ๋Œ€ํ•ด์„œ ๋ฏธ๋ฆฌ ๊ณ„์‚ฐ๋œ ๊ฒฐ๊ณผ๋“ค์˜ ์ง‘ํ•ฉ(๋ฐฐ์—ด)์„ ๊ฐ€๋ฆฌํ‚จ๋‹ค.

  • ์ด ์ง‘ํ•ฉ(๋ฐฐ์—ด)์€ ์ฃผ์–ด์ง„ ์—ฐ์‚ฐ์— ๋Œ€ํ•œ ๊ฒฐ๊ณผ๋ฅผ ๊ณ„์‚ฐํ•˜๋Š” ์‹œ๊ฐ„๋ณด๋‹ค ๋” ๋น ๋ฅด๊ฒŒ ๊ฐ’์„ ์ทจ๋“ํ•ด ๊ฐˆ ์ˆ˜ ์žˆ๋„๋ก ์‚ฌ์šฉ๋˜๋Š” ๋ ˆํผ๋Ÿฐ์Šค๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

  • ์ปดํ“จํ„ฐ ๊ณผํ•™์—์„œ ์ผ๋ฐ˜์ ์œผ๋กœ ๋ฐฐ์—ด์ด๋‚˜ ์—ฐ๊ด€ ๋ฐฐ์—ด๋กœ ๋œ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๋ฅผ LUT๋ผ ๋ถ€๋ฅธ๋‹ค.

๋ฃฉ์—… ํ…Œ์ด๋ธ”(ํŒŒ์ผ)์„ UDF๋กœ ๋ณด๋‚ด๋Š” ๋ฐฉ๋ฒ•

  • Closure

    • Serialization์ด Task๋‹จ์œ„๋กœ ์ผ์–ด๋‚œ๋‹ค.
    • UDF์•ˆ์—์„œ Python Data Structure๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ
  • Broadcast

    • Serialization์ด Worker Node๋‹จ์œ„๋กœ ์ผ์–ด๋‚œ๋‹ค.(๊ทธ ์•ˆ์—์„œ ์บ์‹ฑ๋˜๊ธฐ์— ํ›จ์”ฌ ๋” ํšจ์œจ์ ์ด๋‹ค.)
    • UDF์•ˆ์—์„œ Broadcast๋œ Data Structure๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ
  • Broadcast Dataset์˜ ํŠน์ง•

    • Worker node๋กœ ๊ณต์œ ๋˜๋Š” ๋ณ€๊ฒฝ ๋ถˆ๊ฐ€ ๋ฐ์ดํ„ฐ
    • Worker node๋ณ„๋กœ ํ•œ ๋ฒˆ ๊ณต์œ ๋˜๊ณ  ์บ์‹ฑ๋œ๋‹ค.
    • ์ œ์•ฝ์ ์œผ๋กœ๋Š” Task Memory์•ˆ์— ๋“ค์–ด๊ฐˆ ์ˆ˜ ์žˆ์–ด์•ผํ•œ๋‹ค๋Š” ์ ์ด๋‹ค

๐Ÿ“Œ Accumulators

Accumulators๋ž€

  • ํŠน์ • ์ด๋ฒค๋“œ์˜ ์ˆ˜๋ฅผ ๊ธฐ๋กํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. -> ์ผ์ข…์˜ ์ „์—ญ ๋ณ€์ˆ˜
    -> ํ•˜๋‘ก์—์„œ ์นด์šดํ„ฐ์™€ ์•„์ฃผ ํก์‚ฌํ•˜๋‹ค.

  • ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋น„์ •์ƒ์ ์ธ ๊ฐ’์„ ๊ฐ€์ง€๋Š” ๋ ˆ์ฝ”๋“œ์˜ ์ˆ˜๋ฅผ ์นด์šดํŒ…ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋œ๋‹ค.

def handle_bad_zipcode(c:int) -> int:
	if len(str(c)) != 5:
    	return None
    return c

Accumulators์˜ ํŠน์ง•

  • ๋ณ€๊ฒฝ ๊ฐ€๋Šฅํ•œ ์ „์—ญ๋ณ€์ˆ˜๋กœ ๋“œ๋ผ์ด๋ฒ„์— ์œ„์น˜ํ•œ๋‹ค.

  • ์Šค์นผ๋ผ๋กœ ๋งŒ๋“ค๋ฉด ์ด๋ฆ„์„ ์ค„ ์ˆ˜ ์žˆ์ง€๋งŒ ๊ทธ์ด์™ธ์—๋Š” ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค.
    -> ์ด๋ฆ„์žˆ๋Š” Accumulators๋งŒ Spark Web UI์— ๋‚˜ํƒ€๋‚œ๋‹ค.

  • ๋ ˆ์ฝ”๋“œ ๋ณ„๋กœ ์„ธ๊ฑฐ๋‚˜ ํ•ฉ์„ ๊ตฌํ•˜๋Š”๋ฐ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.

  • ๋‘ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์œผ๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋ฉฐ ๊ฐ’์˜ ์ •ํ™•๋„๋„ ๋‹ฌ๋ผ์ง„๋‹ค.

    • Transformation์—์„œ ์‚ฌ์šฉํ•œ๋‹ค.
      -> ์ด ๊ฒฝ์šฐ ๊ฐ’์ด ๋ถ€์ •ํ™•ํ•  ์ˆ˜ ์žˆ๋‹ค.(Task์˜ ์žฌ์‹คํ–‰๊ณผ Speculative Execution)
    • DataFrame/RDD Foreach์—์„œ ์‚ฌ์šฉํ•œ๋‹ค.
      -> ์ถ”์ฒœ๋˜๋Š” ๋ฐฉ์‹์œผ๋กœ ์ด ๊ฒฝ์šฐ ์ •ํ™•ํ•˜๋‹ค.

๐Ÿ“Œ Speculative Execution

Speculative Execution๋ž€

  • ๋А๋ฆฐ Task๋ฅผ ๋‹ค๋ฅธ Worker node์— ์žˆ๋Š” Executor์—์„œ ์ค‘๋ณต ์‹คํ–‰ํ•œ๋‹ค.
    • ์ด๋ฅผ ํ†ตํ•ด Worker node์˜ ํ•˜๋“œ์›จ์–ด ์ด์Šˆ๋“ฑ์œผ๋กœ ๋А๋ ค์ง€๋Š” ๊ฒฝ์šฐ ๋น ๋ฅธ ์‹คํ–‰์„ ๋ณด์žฅํ•œ๋‹ค.
    • ํ•˜์ง€๋งŒ Data Skew๋กœ ์ธํ•ด ์˜ค๋ž˜ ๊ฑธ๋ฆฐ๋‹ค๋ฉด ๋„์›€์ด ์•ˆ๋˜๊ณ  ๋ฆฌ์†Œ์Šค๋งŒ ๋‚ญ๋น„ํ•˜๊ฒŒ ๋œ๋‹ค.
    • ๋‹ค๋ฅธ Worker node๋กœ ์ƒˆ๋กœ์šด Task๋กœ ๋งŒ๋“ค๊ณ  ๋จผ์ € ์‹คํ–‰๋˜๋Š” ๊ฒƒ์„ ์‚ฌ์šฉํ•˜๊ณ  ๋‚˜๋จธ์ง€๋Š” Kill ํ•œ๋‹ค.

Speculative Execution ์ œ์–ด๋ฐฉ์‹

  • spark.speculation์œผ๋กœ ์ปจํŠธ๋กค ๊ฐ€๋Šฅํ•˜๋ฉฐ ๊ธฐ๋ณธ์€ False์ด๋‹ค.
    -> ํ•˜๋‘ก MapReduce์—์„œ๋ถ€ํ„ฐ ์žˆ๋˜ ๊ธฐ๋Šฅ์ด๋‹ค.
  • ๋‹ค์–‘ํ•œ ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋กœ ์„ธ๋ฐ€ํ•˜๊ฒŒ ์ œ์–ด ๊ฐ€๋Šฅํ•˜๋‹ค.
ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ด๋ฆ„DefaultLinkedIn
spark.speculation.interval100ms1 sec
spark.speculation.multiplier1.54
spark.speculation.quantile0.750.9
spark.speculation.minTaskRuntime100ms30 sec
spark.speculation.task.duration.thresholdNoneNone

Spark์˜ ๋ฆฌ์†Œ์Šค ํ• ๋‹น(์Šค์ผ€์ฅด๋ง)

  • Spark Application๋“ค๊ฐ„์˜ ๋ฆฌ์†Œ์Šค ํ• ๋‹น

    • ๊ธฐ๋ฐ˜์ด ๋˜๋Š” ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €๊ฐ€ ๊ฒฐ์ •๋œ๋‹ค.
      -> YARN์€ ์„ธ ๊ฐ€์ง€ ๋ฐฉ์‹์„ ์ง€์›ํ•œ๋‹ค. FIFO, FAIR, CAPACITY
    • ํ•œ ๋ฒˆ ๋ฆฌ์†Œ์Šค๋ฅผ ํ• ๋‹น ๋ฐ›์œผ๋ฉด ํ•ด๋‹น ๋ฆฌ์†Œ์Šค๋ฅผ ๋๊นŒ์ง€ ๋“ค๊ณ  ๊ฐ€๋Š” ๊ฒƒ์ด ๊ธฐ๋ณธ์ด๋‹ค.
  • ํ•˜๋‚˜์˜ Spark Application์•ˆ์—์„œ Job๋“ค ๊ฐ„์˜ ๋ฆฌ์†Œ์Šค ํ• ๋‹น

    • FIFO ํ˜•ํƒœ๋กœ ์ฒ˜์Œ Job์ด ํ•„์š”ํ•œ ๋Œ€๋กœ ๋ฆฌ์†Œ์Šค๋ฅผ ๋ฐ›์•„์„œ ์“ฐ๋Š” ๊ฒƒ์ด ๊ธฐ๋ณธ์ด๋‹ค.

Spark Application์˜ ๋ฆฌ์†Œ์Šค ์š”๊ตฌ/๋ฆด๋ฆฌ์Šค ๋ฐฉ์‹

  • Static Allocation(๊ธฐ๋ณธ ๋™์ž‘)

    • Spark Application์€ ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €๋กœ๋ถ€ํ„ฐ (YARN)๋ฐ›์€ ๋ฆฌ์†Œ์Šค๋ฅผ ๋ณดํ†ต ๋๊นŒ์ง€ ๋“ค๊ณ ๊ฐ„๋‹ค.
    • ์ด๋Š” ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋ฅ ์— ์•…์˜ํ–ฅ์„ ์ค„ ๊ฐ€๋Šฅ์„ฑ์ด ๋†’๋‹ค.
  • Dynamic Allocation

    • Spark Application์ด ์ƒํ™ฉ์— ๋”ฐ๋ผ executor๋ฅผ ๋ฆด๋ฆฌ์Šคํ•˜๊ธฐ๋„ ํ•˜๊ณ  ์š”๊ตฌํ•˜๊ธฐ๋„ ํ•œ๋‹ค.
    • ๋‹ค์ˆ˜์˜ Spark Application๋“ค์ด ํ•˜๋‚˜์˜ ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €๋ฅผ ๊ณต์œ ํ•œ๋‹ค๋ฉด ํ™œ์„ฑํ™”ํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

spark-submit --num-executors 100 --executor-cores 4 --executor-memory 32G


๐Ÿ“Œ Scheduler

Scheduler๋ž€

  • ํ•˜๋‚˜์˜ Spark Application๋‚ด์˜ Job๋“ค์— ๋ฆฌ์†Œ์Šค๋ฅผ ๋‚˜๋ˆ„์–ด์ฃผ๋Š” ์ •์ฑ…์ด๋‹ค.
    -> Spark Application๋“ค๊ฐ„์— ๋ฆฌ์†Œ์Šค๋ฅผ ๋‚˜๋ˆ„์–ด์ฃผ๋Š” ๋ฐฉ์‹์€ ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €์—๊ฒŒ ๋‹ฌ๋ ค์žˆ๋‹ค.

  • ๋‹ค์Œ 2๊ฐ€์ง€๊ฐ€ ์กด์žฌํ•œ๋‹ค.

    • FIFO(๊ธฐ๋ณธ)

      • ๋ฆฌ์†Œ์Šค๋ฅผ ์ฒ˜์Œ ์š”์ฒญํ•œ Job์—๊ฒŒ ๋ฆฌ์†Œ์Šค ์šฐ์„  ์ˆœ์œ„๊ฐ€ ๊ฐ„๋‹ค.
    • FAIR

      • ๋ผ์šด๋“œ๋กœ๋นˆ ๋ฐฉ์‹์œผ๋กœ ๋ชจ๋“  Job์—๊ฒŒ ๊ณ ๋ฅด๊ฒŒ ๋ฆฌ์†Œ์Šค๋ฅผ ๋ถ„๋ฐฐํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.
      • ์ด ์•ˆ์—์„œ Pool์ด๋ž€ ํ˜•ํƒœ๋กœ ๋ฆฌ์†Œ์Šค๋ฅผ ๋‚˜๋ˆ„์–ด์„œ ์šฐ์„ ์ˆœ์œ„๋ฅผ ๊ณ ๋ คํ•œ ํ˜•ํƒœ๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.
        -> Pool์•ˆ์—์„œ ๋ฆฌ์†Œ์Šค ๋ถ„๋ฐฐ๋„ FAIR ํ˜น์€ FIFO๋กœ ์ง€์ • ๊ฐ€๋Šฅํ•˜๋‹ค.

Scheduler๋ฅผ ํ™œ์šฉํ•œ ๋ณ‘๋ ฌ์„ฑ ์ฆ๋Œ€

  • ๋ณ‘๋ ฌ์„ฑ ์ฆ๋Œ€ -> Thread ํ™œ์šฉ์ด ํ•„์š”ํ•˜๋‹ค.

    • ์ด๋Š” FAIR ๋ชจ๋“œ์˜ ์Šค์ผ€์ฅด๋Ÿฌ์ผ ๊ฒฝ์šฐ ๋” ํšจ๊ณผ์ ์ด๋‹ค.
  • ๊ด€๋ จ ํ™˜๊ฒฝ ๋ณ€์ˆ˜

    • spark.scheduler.mode : FIFO(๊ธฐ๋ณธ) or FAIR
    • spark.scheduler.allocation.file: "FAIR"์ธ ๊ฒฝ์šฐ ํ•„์š”ํ•˜๋ฉฐ Pool์„ ์ •์˜ํ•ด๋†“๋Š” ํ˜•ํƒœ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.


๐Ÿ“Œ Dynamic Resource Allocation

  • ์•„๋ž˜ ํ™˜๊ฒฝ๋ณ€์ˆ˜๋“ค๋กœ ์ œ์–ด๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • spark.dynamicAllocation.enabled = tru
  • spark.dynamicAllocation.shuffleTracking.enabled = true
  • spark.dynamicAllocation.executorldleTimeout = 60s(๋ฆด๋ฆฌ์Šค ํƒ€์ด๋ฐ ๊ฒฐ์ •)
  • spark.dynamicAllocation.schedulerBacklogTimeout = 1s
  • spark.dynamicAllocation.minExecutors
  • spark.dynamicAllocation.maxExecutors
  • spark.dynamicAllocation.initialExecutors
  • spark.dynamicAllocation.executorAllocationRatio

์ฐธ๊ณ 

0๊ฐœ์˜ ๋Œ“๊ธ€