๐Ÿ“’ Spark(18)

Kimdongkiยท2024๋…„ 7์›” 9์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
18/22

๐Ÿ“Œ Caching๊ณผ Persist

Caching

  • ์ž์ฃผ ์‚ฌ์šฉ๋˜๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋ฉ”๋ชจ๋ฆฌ์— ์œ ์ง€ํ•˜์—ฌ ์ฒ˜๋ฆฌ์†๋„๊ฐ€ ์ฆ๊ฐ€ํ•œ๋‹ค.

    • ๋‹จ ๊ทธ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ด ์ •๋ง ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ๋Š”์ง€ ํ™•์ธ์ด ํ•„์š”ํ•˜๋‹ค.
    • ์–ด๋–ค ๊ฒฝ์šฐ์—๋Š” ๋‹ค์‹œ ๊ณ„์‚ฐํ•˜๋Š” ๊ฒƒ์ด ๋น ๋ฅผ ์ˆ˜๋„ ์žˆ๋‹ค.
  • ๋‹จ ๋ฉ”๋ชจ๋ฆฌ ์†Œ๋น„๋ฅผ ๋Š˜๋ฆฌ๋ฏ€๋กœ ๋ถˆํ•„์š”ํ•˜๊ฒŒ ๋ชจ๋“ ๊ฑธ ์บ์‹ฑํ•  ํ•„์š”๋Š” ์—†๋‹ค.

์–ด๋–ป๊ฒŒ DataFrame์„ Cachingํ•˜๋Š”๊ฐ€

  • ๋‘ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์กด์žฌํ•œ๋‹ค.

    • cache()
    • persist()
  • ๋‘ ๊ฐ€์ง€ ๋ชจ๋‘ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ Memory/Disk/Off Heap์— ๋ณด์กดํ•œ๋‹ค.

    • ๋ชจ๋‘ Lazy execution - ํ•„์š”ํ•ด์ง€๊ธฐ ์ „๊นŒ์ง€ ์บ์‹ฑํ•˜์ง€ ์•Š๋Š”๋‹ค.
    • caching์€ ํ•ญ์ƒ ํŒŒํ‹ฐ์…˜ ๋‹จ์œ„๋กœ ๋ฉ”๋ชจ๋ฆฌ์— ๋ณด์กดํ•œ๋‹ค.
      -> ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜์ด ๋ถ€๋ถ„์ ์œผ๋กœ caching๋˜์ง€๋Š” ์•Š๋Š”๋‹ค.

Persist

  • persist๋Š” ์ธ์ž๋ฅผ ํ†ตํ•ด ์„ธ๋ถ€ ์ œ์–ด๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

  • useDisk = True

  • useMemory = True

  • useOffHeap = False
    -> off Heap ์„ค์ •์ด ํ•„์š”ํ•˜๋‹ค.

  • deserialized = False

    • ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์ค„์ผ์ง€ ์•„๋‹ˆ๋ฉด CPU ๊ณ„์‚ฐ์„ ์ค„์ผ์ง€ -> CPU ๊ณ„์‚ฐ์„ ์ค„์ธ๋‹ค.
    • deserialized = True๋Š” ๋ฉ”๋ชจ๋ฆฌ์—์„œ๋งŒ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • replication = 1
    -> ๋ช‡ ๊ฐœ์˜ ๋ณต์‚ฌ๋ณธ์„ ์„œ๋กœ ๋‹ค๋ฅธ executor์— ์ €์žฅํ• ์ง€ ๊ฒฐ์ •ํ•œ๋‹ค.

  • persist์˜ ์ธ์ž๋กœ ์ž์ฃผ ์‚ฌ์šฉ๋˜๋Š” ์กฐํ•ฉ์€ ํ•˜๋‚˜์˜ ์ƒ์ˆ˜๋กœ ์ง€์ • ๊ฐ€๋Šฅํ•˜๋‹ค.

    • DISK_ONLY
    • MEMORY_ONLY
    • MEMORY_AND_DISK
    • MEMORY_ONLY_SER
    • MEMORY_AND_DISK_SER
    • OFF_HEAP
    • MEMORY_ONLY_2
    • MEMORY_ONLY_3
  • persist๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ caching๋˜๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋ฉ”๋ชจ๋ฆฌ์™€ ๋””์Šคํฌ์— ๋ณด๊ด€ํ•˜๊ณ  ๋ณต์ œ๋„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

  • cache๋Š” persist์˜ ๋‹ค์Œ ๋ฒ„์ „์ด๋‹ค.

    • disk = False
    • memory=True
    • offHeap=False
    • deserialized=True
    • replication=1

Caching

  • Spark SQL์„ ์‚ฌ์šฉํ•œ Caching

    • spark.sql("cache table table_name")
    • spark.sql("cache lazy table table_name")
    • spark.sql("uncache table table_name")
  • Caching์„ ์ทจ์†Œํ•˜๋Š” ๋ฐฉ๋ฒ•์€?

    • DataFrame.unpersist(LRU - Least Recently Used)
    • spark.sql("uncache table table_name")
    • spark.catalog.isCached("table_name")
    • spark.catalog.clearCache()

Caching ๊ด€๋ จ Best Practices

  • ์บ์‹ฑ๋œ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์ด ์žฌ์‚ฌ์šฉ๋˜๋Š” ๊ฒƒ์„ ๋ถ„๋ช…ํ•˜๊ฒŒ ํ•˜๊ธฐ

    • cachedDF = df.cache()
    • cachedDF.select(...)
  • ์ปฌ๋Ÿผ์ด ๋งŽ๋‹ค๋ฉด ์ •๋ง ํ•„์š”ํ•œ ์ปฌ๋Ÿผ๋งŒ ์บ์‹ฑ

    • cachedDF = df.select(col1, col2, col3, col4).cache()
  • ๋ถˆํ•„์š”ํ•  ๋•Œ uncache

  • ๋•Œ๋กœ๋Š” ๋งค๋ฒˆ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๊ณ„์‚ฐํ•˜๋Š” ๊ฒƒ์ด ์บ์‹ฑ๋ณด๋‹ค ๋น ๋ฅผ ์ˆ˜ ์žˆ๋‹ค.

    • ์ด๋Š” ํฐ ๋ฐ์ดํ„ฐ์…‹์ด Paruet์™€ ๊ฐ™์€ ํฌ๋งท์œผ๋กœ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ
    • ์†Œ์ˆ˜์˜ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„๋งŒ ์บ์‹ฑํ•œ๋‹ค.
    • ํฐ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์บ์‹ฑ์€ ํ•˜์ง€ ๋ง๊ฒƒ.
    • ์บ์‹ฑ์„ ๋„ˆ๋ฌด ๋ฏฟ์ง€ ๋ง๊ฒƒ.

๐Ÿ“Œ Dynamic Partition Pruning

Filter(Predicate) Pushdown

  • ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ์ฝ์–ด๋“ค์ผ ๋•Œ ํ•„ํ„ฐ๋ง์„ ์ ์šฉํ•ด ์ฝ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ตœ์†Œํ™”ํ•œ๋‹ค.
  • ํŠน์ • ๋ฐ์ดํ„ฐ ์†Œ์Šค์—๋งŒ ์‚ฌ์šฉ๊ฐ€๋Šฅ(PARQUET - ์ปฌ๋Ÿผ ํ†ต๊ณ„ ์ •๋ณด๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ)

Partition Pruning

  • ์ตœ์ ํ™” ๋ฐฉ์‹์˜ ์ผ์ข…์ด๋‹ค.
  • Optimizer๊ฐ€ ์ •๋ง ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ์™€ ๋ถˆํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ๋ณ„ํ•˜์—ฌ ์ฝ๋Š” ๊ฒƒ์ด๋‹ค.

Static Partition Pruning

  • ๋ฐ์ดํ„ฐ์†Œ์Šค๊ฐ€ ํ•„ํ„ฐ๋ง ์ปฌ๋Ÿผ์„ ์ค‘์‹ฌ์œผ๋กœ Partitioning๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ

Partition Pruning๊ณผ Execution Plan

  • Partition Pruning์€ Logical Plan Optimization ๋‹จ๊ณ„์—์„œ ๋ฐœ์ƒํ•œ๋‹ค.

Static Partition Pruning์˜ ๋ฌธ์ œ

  • Partitioning์€ ๋ณดํ†ต ํฐ ํ…Œ์ด๋ธ”์— ์ ์šฉ๋˜์–ด ์žˆ๋‹ค. -> Fack ํ…Œ์ด๋ธ”

  • Fact ํ…Œ์ด๋ธ”๊ณผ Dimension ํ…Œ์ด๋ธ” ์กฐ์ธ์‹œ ํ•„ํ„ฐ๋ง์ด Dimension ํ…Œ์ด๋ธ”์— ์ ์šฉ ๋˜์–ด์žˆ๋‹ค๋ฉด?

Dynamic Partition Pruning์ด๋ž€

  • ๋น„ Partition ํ…Œ์ด๋ธ”์— ์ ์šฉ๋œ ํ•„ํ„ฐ๋ง์„ Partitionํ…Œ์ด๋ธ”์— ์ ์šฉํ•ด๋ณด๋Š” ๊ฒƒ
  • ํ›„์ž๊ฐ€ ์ž‘์€ Dimension ํ…Œ์ด๋ธ”์ด๋ผ๋ฉด ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ์กฐ์ธ๊นŒ์ง€ ํ•˜๋ฉด ๋”์šฑ ์ข‹๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€