๐Ÿ“’ Spark(5)

Kimdongkiยท2024๋…„ 6์›” 18์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
5/22

๐Ÿ“Œ Spark Session ์ƒ์„ฑ

  • Spark ํ”„๋กœ๊ทธ๋žจ์˜ ์‹œ์ž‘์€ SparkSession์„ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด๋‹ค.

    • ํ”„๋กœ๊ทธ๋žจ๋งˆ๋‹ค ํ•˜๋‚˜๋ฅผ ๋งŒ๋“ค์–ด์„œ Spakr Cluster์™€ ํ†ต์‹ ํ•œ๋‹ค. -> Singleton ๊ฐ์ฒด
    • Spark 2.0์—์„œ ์ฒ˜์Œ ์†Œ๊ฐœ๋˜์—ˆ๋‹ค.
  • Spark Session์„ ํ†ตํ•ด Spark์ด ์ œ๊ณตํ•ด์ฃผ๋Š” ๋‹ค์–‘ํ•œ ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•œ๋‹ค.

    • DataFrame, SQL, Streaming, ML API ๋ชจ๋‘ ์ด ๊ฐ์ฒด๋กœ ํ†ต์‹ ํ•œ๋‹ค.
    • config ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•˜์—ฌ ๋‹ค์–‘ํ•œ ํ™˜๊ฒฝ์„ค์ •์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
    • RDD์™€ ๊ด€๋ จ๋œ ์ž‘์—…์„ ํ• ๋•Œ๋Š” SparkSession ์•„๋ž˜์— sparkContext ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
  • Spark Session API


๐Ÿ“Œ Spark Session Create - PySpark ์˜ˆ์ œ

from pyspark.sql import SparkSession
# SparkSession์€ Singleton์ด๋‹ค.
spark = SparkSession.builder.master("local[*]").appName('PySpark Tutorial').getOrCreate()

spark.stop()
  • Spark SQL Engine์ด ์ค‘์‹ฌ์œผ๋กœ ๋Œ์•„๊ฐ„๋‹ค.

  • Singleton


๐Ÿ“Œ pyspark.sql ์ œ๊ณต ์ฃผ์š” ๊ธฐ๋Šฅ

pyspark.sql.SparkSession

  • Spark APP์˜ ์—”ํŠธ๋ฆฌ ํฌ์ธํŠธ
  • Spark ์ปจํ…์ŠคํŠธ์™€ SQL ์ปจํ…์ŠคํŠธ๋ฅผ ๋ชจ๋‘ ํฌํ•จํ•˜๊ณ  ์žˆ๋‹ค.
  • DataFrame, RDD๋“ฑ์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

pyspark.sql.DataFrame

  • Spark์˜ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ ์ปฌ๋ ‰์…˜
  • ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ(ํ–‰+์—ด)์„ ์ €์žฅํ•˜๊ณ  ์ฒ˜๋ฆฌํ•œ๋‹ค.
  • SQL ์ฟผ๋ฆฌ, ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜, ํ•„ํ„ฐ๋ง ๋“ฑ์˜ ์ž‘์—… ์ˆ˜ํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

pyspark.sql.Column

  • DataFrame์˜ ์—ด์„ ๋‚˜ํƒ€๋‚ด๋Š” ๊ฐ์ฒด
  • ์‚ฐ์ˆ  ์—ฐ์‚ฐ, ์กฐ๊ฑด๋ฌธ, ํ•จ์ˆ˜ ์ ์šฉ ๋“ฑ ๋‹ค์–‘ํ•œ ์ž‘์—… ์ˆ˜ํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

pyspark.sql.Row

  • DataFrame์˜ ํ–‰์„ ๋‚˜ํƒ€๋‚ด๋Š” ๊ฐ์ฒด์ด๋‹ค.
  • ํ–‰ ๋ฐ์ดํ„ฐ์— ์ ‘๊ทผํ•˜๊ณ  ์กฐ์ž‘์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

pyspark.sql.functions

SparkSQL์—์„œ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋‹ค์–‘ํ•œ ํ•จ์ˆ˜๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

  • ๋ฌธ์ž์—ด ์ฒ˜๋ฆฌ, ๋‚ ์งœ ์ฒ˜๋ฆฌ, ์ˆ˜ํ•™ ํ•จ์ˆ˜ ๋“ฑ์„ ํฌํ•จํ•œ๋‹ค.

pyspark.sql.types

SparkSQL์—์„œ ์‚ฌ์šฉ๋˜๋Š” ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ์ •์˜ํ•œ๋‹ค.

pyspark.sql.Window

  • ์œˆ๋„์šฐ ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•œ ๊ฐ์ฒด์ด๋‹ค.
  • ํ–‰ ๊ธฐ๋ฐ˜ ๊ณ„์‚ฐ์„ ์ˆ˜ํ–‰ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค. (ex: ์ˆœ์œ„, ๋ˆ„์ ํ•ฉ๋“ฑ๋“ฑ)

๐Ÿ“Œ Spark Session ํ™˜๊ฒฝ๋ณ€์ˆ˜

  • ์ฐธ์กฐ

  • executor ๋ณ„ ๋ฉ”๋ชจ๋ฆฌ : spark.executor.memory (๊ธฐ๋ณธ : 1gb)

  • executor ๋ณ„ CPU๊ฐœ์ˆ˜ : spark.executor.cores (YARN์—์„œ๋Š” ๊ธฐ๋ณธ๊ฐ’ 1)

  • driver ๋ฉ”๋ชจ๋ฆฌ : spark.driver.memory (๊ธฐ๋ณธ : 1gb)

  • Shuffle ํ›„ Partition์˜ ์ˆ˜ : spark.sql.shuffle.partitions (๊ธฐ๋ณธ : ์ตœ๋Œ€ 200)

  • ์‚ฌ์šฉํ•˜๋Š” Resource Manager์— ๋”ฐ๋ผ ํ™˜๊ฒฝ๋ณ€์ˆ˜๊ฐ€ ๋งŽ์ด ๋‹ฌ๋ผ์ง„๋‹ค.


๐Ÿ“Œ Spark Session ํ™˜๊ฒฝ์„ค์ • ๋ฐฉ๋ฒ•

  • ํ™˜๊ฒฝ๋ณ€์ˆ˜

  • $SPARK_HOME/conf/spark_defaults.conf

  • 2๊ฐœ๋Š” ๋ณดํ†ต Spark Cluster Admin์ด ๊ด€๋ฆฌํ•œ๋‹ค.

  • spark-submit ๋ช…๋ น์˜ ์ปค๋งจ๋“œ๋ผ์ธ ํŒŒ๋ผ๋ฏธํ„ฐ

  • SparkSession์„ ๋งŒ๋“ค๋•Œ ์ง€์ •
    -> SparkConf

๋งŒ์•ฝ ํ™˜๊ฒฝ ์„ค์ •์ด ์ถฉ๋Œ๋  ๊ฒฝ์šฐ SparkConf -> Spark-submit -> $SPARK_HOME -> ํ™˜๊ฒฝ๋ณ€์ˆ˜ ์ˆœ์„œ์˜ ์šฐ์„  ์ˆœ์œ„๋ฅผ ๊ฐ€์ง„๋‹ค.

1. Spark Session ํ™˜๊ฒฝ ์„ค์ • - SparkSession ์ƒ์„ฑ์‹œ ์ผ์ผํžˆ ์ง€์ •

  • ์ด ์‹œ์ ์˜ Spark Configuration์€ ์•ž์„œ ์–ธ๊ธ‰ํ•œ ํ™˜๊ฒฝ๋ณ€์ˆ˜์™€ spark_defaults.conf์™€ spark-submit๋กœ ๋“ค์–ด์˜จ ํ™˜๊ฒฝ์„ค์ •์ด ์šฐ์„ ์ˆœ์œ„๋ฅผ ๊ณ ๋ คํ•œ ์ƒํƒœ๋กœ ์ •๋ฆฌ๋œ ์ƒํƒœ
from pyspark.sql import SparkSession

spark = SparkSession.builder\
	.master("local[*]")\
	.appName('PySpark Tutorial')\
	.config("spark.some.config.option1", "some-value") \
	.config("spark.some.config.option2", "some-value") \
	.getOrCreate()

2. Spark Session ํ™˜๊ฒฝ ์„ค์ • - SparkConf ๊ฐ์ฒด์— ํ™˜๊ฒฝ ์„ค์ •ํ•˜๊ณ  SparkSession์— ์ง€์ •

from pyspark.sql import SparkSession
from pyspark import SparkConf

# SparkConf ๊ฐ์ฒด ์ƒ์„ฑ
conf = SparkConf()
# set ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ง€์ •
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
	.config(conf=conf) \
	.getOrCreate()

๐Ÿ“Œ ์ „์ฒด์ ์ธ ํ๋ฆ„

  • Spark ์„ธ์…˜์„ ๋งŒ๋“ค๊ธฐ
  • ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ๋กœ๋”ฉ
  • ๋ฐ์ดํ„ฐ ์กฐ์ž‘ ์ž‘์—…
    • DataFrame API๋‚˜ Spark SQL์„ ์‚ฌ์šฉํ•œ๋‹ค.
    • ์›ํ•˜๋Š” ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์˜ฌ๋•Œ๊นŒ์ง€ ์ƒˆ๋กœ์šด DataFrame์„ ์ƒ์„ฑํ•œ๋‹ค.
  • ์ตœ์ข… ๊ฒฐ๊ณผ ์ €์žฅ

๐Ÿ“Œ Spark Session์ด ์ง€์›ํ•˜๋Š” ๋ฐ์ดํ„ฐ ์†Œ์Šค

  • ์ฐธ๊ณ 

  • spark.read(DataFrameReader)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ DataFrame์œผ๋กœ ๋กœ๋“œํ•œ๋‹ค.

  • DataFrame.write(DataFrameWriter)์„ ์‚ฌ์šฉํ•˜์—ฌ DataFrame์„ ์ €์žฅํ•œ๋‹ค.

  • ๋งŽ์ด ์‚ฌ์šฉ๋˜๋Š” ๋ฐ์ดํ„ฐ ์†Œ์Šค

    • HDFS File
      • CSV, JSON, Parquet, ORC, Text, Avro
        - Hive Table
    • JDBC ๊ด€๊ณ„ํ˜• DB
    • Cloud ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์‹œ์Šคํ…œ
    • ์ŠคํŠธ๋ฆฌ๋ฐ ์‹œ์Šคํ…œ

0๊ฐœ์˜ ๋Œ“๊ธ€