๐Ÿ“’ Spark(14)

Kimdongkiยท2024๋…„ 6์›” 28์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
14/22

๐Ÿ“Œ Spark Streaming

  • ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ Spark API

  • Kafka, Kinesis, Flume, TCP ์†Œ์ผ“ ๋“ฑ์˜ ๋‹ค์–‘ํ•œ ์†Œ์Šค์—์„œ ๋ฐœ์ƒํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜์žˆ๋‹ค.

  • Join, Map, Reduce, Window์™€ ๊ฐ™์€ ๊ณ ๊ธ‰ ํ•จ์ˆ˜ ์‚ฌ์šฉ ๊ฐ€๋Šฅ

Spark Streaming ๋™์ž‘ ๋ฐฉ์‹

  • ๋ฐ์ดํ„ฐ๋ฅผ Micro Batch๋กœ ์ฒ˜๋ฆฌ
  • ๊ณ„์†ํ•ด์„œ ์œ„์˜ ๊ณผ์ •์„ ๋ฐ˜๋ณต(๋ฃจํ”„)
  • ์ด๋ ‡๊ฒŒ ์ฝ์€ ๋ฐ์ดํ„ฐ๋ฅผ ์•ž์„œ ์ฝ์€ ๋ฐ์ดํ„ฐ์— Merge
  • Batch๋งˆ๋‹ค ๋ฐ์ดํ„ฐ ์œ„์น˜ ๊ด€๋ฆฌ(์‹œ์ž‘๊ณผ ๋)
  • Fault Tolerance์™€ ๋ฐ์ดํ„ฐ ์žฌ์ฒ˜๋ฆฌ ๊ด€๋ฆฌ(์‹คํŒจํ•  ๊ฒฝ์šฐ)

Spark Streaming ๋‚ด๋ถ€ ๋™์ž‘

  • Spark Streaming์€ ์‹ค์‹œ๊ฐ„ ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ Batch๋กœ ๋‚˜๋ˆˆ ๋‹ค์Œ Spark Engine์—์„œ ์ฒ˜๋ฆฌํ•˜์—ฌ ์ตœ์ข… ๊ฒจ๊ณผ ์ŠคํŠธ๋ฆผ์„ ์ผ๊ด„์ ์œผ๋กœ ์ƒ์„ฑํ•œ๋‹ค.

  • DStream๊ณผ Structured Streaming ๋‘ ์ข…๋ฅ˜๊ฐ€ ์กด์žฌํ•œ๋‹ค.

Spark Structured Streaming

  • Spark Streaming์€ ์‹ค์‹œ๊ฐ„ ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๋ฐฐ์น˜๋กœ ๋‚˜๋ˆˆ ๋‹ค์Œ Spark Engine์—์„œ ์ฒ˜๋ฆฌํ•˜์—ฌ ์ตœ์ข… ๊ฒฐ๊ณผ ์ŠคํŠธ๋ฆผ์„ ์ผ๊ด„์ ์œผ๋กœ ์ƒ์„ฑํ•œ๋‹ค.
DstreamStructured Streaming
RDD ๊ธฐ๋ฐ˜ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฒ˜๋ฆฌDataFrame ๊ธฐ๋ฐ˜ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฒ˜๋ฆฌ
Spark SQL ์—”์ง„์˜ ์ตœ์ ํ™” ๊ธฐ๋Šฅ ์‚ฌ์šฉ๋ถˆ๊ฐ€๋ŠฅCatalyst ๊ธฐ๋ฐ˜ ์ตœ์ ํ™” ํ˜œํƒ์„ ๊ฐ€์ ธ๊ฐ„๋‹ค.
์ด๋ฒคํŠธ ๋ฐœ์ƒ ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜ ์ฒ˜๋ฆฌ ๋ถˆ๊ฐ€๋Šฅ์ด๋ฒคํŠธ ๋ฐœ์ƒ ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜์„ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅใ…Ž๋‹ค.
๊ฐœ๋ฐœ์ด ์ค‘๋‹จ๋œ ์ƒํƒœ(RDD ๊ธฐ๋ฐ˜ ๋ชจ๋‘์— ํ•ด๋‹น)๊ณ„์†ํ•ด์„œ ๊ธฐ๋Šฅ์ด ์ถ”๊ฐ€๋˜๊ณ  ์žˆ๋‹ค.

Source & Sink

  • ์†Œ์Šค์™€ ์‹ฑํฌ๋Š” ์™ธ๋ถ€ ์‹œ์Šคํ…œ(์†Œ์Šค)์—์„œ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ  ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์™ธ๋ถ€ ์‹œ์Šคํ…œ(์‹ฑํฌ)์œผ๋กœ ์ถœ๋ ฅํ•˜๋Š” ๊ฒƒ์„ ์šฉ์ดํ•˜๊ฒŒ ํ•˜๋Š” ๊ตฌ์„ฑ ์š”์†Œ์ด๋‹ค.

Source

  • Kafka, Amazon Kinesis, Apache Flume, TCP/IP ์†Œ์ผ“, HDFS, File๋“ฑ์„ Spark Structured Streaming์—์„œ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์ค€๋‹ค.
    • ๊ฒฐ๊ตญ Spark DataFrame์œผ๋กœ ๋ณ€ํ™˜ํ•ด์ค€๋‹ค.
    • ex) Kafka์—์„œ Spark Structured Streaming์œผ๋กœ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๋ ค๋Š” ๊ฒฝ์šฐ, Kafka Source๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Kafka ํด๋Ÿฌ์Šคํ„ฐ์—์„œ ํ•˜๋‚˜ ์ด์ƒ์˜ ํ† ํ”ฝ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ DataFrame์œผ๋กœ ๋ณ€ํ™˜ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • Spark Data Frame๊ณผ ๋น„๊ตํ•˜๋ฉด readStream์„ ์‚ฌ์šฉํ•˜๋Š” ์ ์ด ๋‹ค๋ฅด๋‹ค
lines_df = spark.readStream \
	.format("socket") \
    .option("host", "localhost") \
    .option("port", "9999") \
    .load()

Sink

  • Spark Structured Streaming์—์„œ ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์™ธ๋ถ€ ์‹œ์Šคํ…œ์ด๋‚˜ ์Šคํ† ๋ฆฌ์ง€๋กœ ์ถœ๋ ฅ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ด์ค€๋‹ค.

  • ๋ณ€ํ™˜๋˜๊ฑฐ๋‚˜ ์ง‘๊ณ„๋œ ๋ฐ์ดํ„ฑ ์–ด๋–ป๊ฒŒ ์“ฐ์ด๊ฑฐ๋‚˜ ์†Œ๋น„๋˜๋Š”์ง€๋ฅผ ์ •์˜ํ•œ๋‹ค.

    • Source์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ Sink๋Š” Kafka, HDFS, S3, Cassandra, JDBC DB๋“ฑ๊ณผ ๊ฐ™์€ ๋‹ค์–‘ํ•œ ๋Œ€์ƒ์— ๋Œ€ํ•ด ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.
    • Kafka Sink๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Spark Structured Streaming์—์„œ ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ Kafka Topic์œผ๋กœ ์“ฐ๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
  • OutputMode : ํ˜„์žฌ Micro Batch์˜ ๊ฒฐ๊ณผ๊ฐ€ Sink์— ์–ด๋–ป๊ฒŒ ์“ฐ์ผ์ง€ ๊ฒฐ์ •

    • Append
    • Update
    • Complete : Full Refresh ๋А๋‚Œ
word_count_query = counts_df.writeStream \
	.format("console") \
    .outputMode("complete") \
    .option("checkpointLocation", "chk-point-dir") \
    .start()

Micro Batch Trigger Option

  • Unspecified : default ๋ชจ๋“œ, ํ˜„์žฌ Micro Batch๊ฐ€ ๋๋‚˜๋ฉด ๋‹ค์Œ Batch๊ฐ€ ๋ฐ”๋กœ ์‹œ์ž‘ํ•œ๋‹ค.

  • Time Interval : ๊ณ ์ •๋œ ์‹œ๊ฐ„๋งˆ๋‹ค Micro Batch๋ฅผ ์‹œ์ž‘ํ•œ๋‹ค. ํ˜„์žฌ Batch๊ฐ€ ์ง€์ •๋œ ์‹œ๊ฐ„์„ ๋„˜์–ด์„œ ๋๋‚˜๋ฉด ๋๋‚˜์ž๋งˆ์ž ๋‹ค์Œ Batch๊ฐ€ ์‹œ์ž‘๋œ๋‹ค. ์ฝ์„ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ ์‹œ์ž‘๋˜์ง€ ์•Š๋Š”๋‹ค.

  • One Time -> Available-Now : ์ง€๊ธˆ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•˜๊ณ  ์ค‘๋‹จ

  • Continuous : ์ƒˆ๋กœ์šด ์ €์ง€์—ฐ ์—ฐ์† ์ฒ˜๋ฆฌ ๋ชจ๋“œ์—์„œ ์‹คํ–‰


๐Ÿ“Œ Spark Streaming ๊ฐœ๋ฐœ ํ™˜๊ฒฝ ์„ค์ •

Local Standalone Spark

  • Spark Cluster Manager๋กœ local[n] ์ง€์ •

    • master๋ฅผ local[n]์œผ๋กœ ์ง€์ •ํ•œ๋‹ค.
    • master๋Š” ํด๋Ÿฌ์Šคํ„ฐ ๋งค๋‹ˆ์ €๋ฅผ ์ง€์ •ํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•œ๋‹ค.
  • ์ฃผ๋กœ ๊ฐœ๋ฐœ์ด๋‚˜ ๊ฐ„๋‹จํ•œ ํ…Œ์ŠคํŠธ ์šฉ๋„์ด๋‹ค.

  • ํ•˜๋‚˜์˜ JVM์—์„œ ๋ชจ๋“  ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

    • ํ•˜๋‚˜์˜ Driver์™€ ํ•˜๋‚˜์˜ Executor๊ฐ€ ์‹คํ–‰๋œ๋‹ค.
    • 1+ Thread๊ฐ€ Executor์•ˆ์—์„œ ์‹คํ–‰๋œ๋‹ค.
  • Executor์•ˆ์— ์ƒ์„ฑ๋˜๋Š” Thread ์ˆ˜

    • local : ํ•˜๋‚˜์˜ Thread๋งŒ ์ƒ์„ฑ
    • local[*] : ์ปดํ“จํ„ฐ CPU ์ˆ˜๋งŒํผ Thread ์ƒ์„ฑ

Local Standalone Spark ์„ค์น˜

  • Mac์ด๋ผ๋ฉด Mac Catalina ํ˜น์€ ์ดํ›„ ๋ฒ„์ „๊ธฐ์ค€

    • z Shell์ด ๊ธฐ๋ณธ์œผ๋กœ ์‚ฌ์šฉ๋œ๋‹ค (๊ทธ์ „์—๋Š” Bash shell)
  • Java

    • JDK8/11 ํ•„์š” : ํ„ฐ๋ฏธ๋„์—์„œ java-version ๋ช…๋ น์œผ๋กœ ์ฒดํฌํ•ด์•ผํ•จ
    • JAVA_HOME ํ™˜๊ฒฝ๋ณ€์ˆ˜๋ฅผ Z Shell ์‹œ์ž‘ ์Šคํฌ๋ฆฝํŠธ(~/.zshrc)์— ๋“ฑ๋กํ•œ๋‹ค.
      • echo export "JAVA_HOME=\$(//usr/libexec/java_home)" >> ~/.zshrc
  • Spark ๋‹ค์šด๋กœ๋“œ

0๊ฐœ์˜ ๋Œ“๊ธ€