๐Ÿ“’ Spark(7)

Kimdongkiยท2024๋…„ 6์›” 19์ผ

Spark

๋ชฉ๋ก ๋ณด๊ธฐ
6/22

๐Ÿ“Œ ๋น…๋ฐ์ดํ„ฐ์—์„œ์˜ SQL

  • ๋ฐ์ดํ„ฐ ๋ถ„์•ผ์—์„œ ์ผํ•˜๊ณ ์ž ํ•˜๋ฉด ๋ฐ˜๋“œ์‹œ ์ตํ˜€์•ผํ•  ๊ธฐ๋ณธ ๊ธฐ์ˆ 
  • ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๋Š” ํ•œ SQL์€ ๋ฐ์ดํ„ฐ ๊ทœ๋ชจ์™€ ์ƒ๊ด€์—†์ด ์‚ฌ์šฉํ•œ๋‹ค.
  • ๋ชจ๋“  ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค๋Š” SQL๊ธฐ๋ฐ˜์ด๋‹ค.
    • Redshift, Snowflake, BigQuery
    • Hive, Presto
  • Spark๋„ ์˜ˆ์™ธ๋Š” ์•„๋‹ˆ๋‹ค.
    -> Spark SQL์ด ์ง€์›๋œ๋‹ค.

๐Ÿ“Œ Spark SQL

  • ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ Spark ๋ชจ๋“ˆ

  • ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„ ์ž‘์—…์„ SQL๋กœ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅํ•˜๋‹ค.

    • ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์— ํ…Œ์ด๋ธ” ์ด๋ฆ„์„ ์ง€์ •ํ•œ ํ›„ SQLํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
      -> Pandas์—๋„ pandassql ๋ชจ๋“ˆ์˜ sqldf ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜๋Š” ๋™์ผํ•œ ํŒจํ„ด์ด ์กด์žฌํ•œ๋‹ค.

    • HQL(Hive Query Language)๊ณผ ํ˜ธํ™˜์„ ์ œ๊ณตํ•œ๋‹ค.
      -> Hive ํ…Œ์ด๋ธ”๋“ค์„ ์ฝ๊ณ  ์“ธ ์ˆ˜ ์žˆ๋‹ค.(Hive Metastore)


๐Ÿ“Œ Spark SQL vs. DataFrame

  • SQL๋กœ ๊ฐ€๋Šฅํ•œ ์ž‘์—…์ด๋ผ๋ฉด DataFrame์„ ์‚ฌ์šฉํ•  ์ด์œ ๊ฐ€ ์—†๋‹ค.
    • ๋‘ ๊ฐœ๋ฅผ ๋™์‹œ์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์„ ๊ธฐ์–ตํ•ด์•ผ ํ•œ๋‹ค.
  1. Familiarity & Readability
    -> SQL์ด ๊ฐ€๋…์„ฑ์ด ๋” ์ข‹๊ณ  ๋” ๋งŽ์€ ์‚ฌ๋žŒ๋“ค์ด ์‚ฌ์šฉ๊ฐ€๋Šฅํ•˜๋‹ค.

  2. Optimization
    -> Spark SQL ์—”์ง„์ด ์ตœ์ ํ™”ํ•˜๊ธฐ ๋” ์ข‹๋‹ค.(SQL์€ Declarative)
    ->Catalyst Optimizer & Project Tungsten

  3. Interoperability & Data Management
    -> SQL์ด ํฌํŒ…๋„ ์‰ฝ๊ณ  ์ ‘๊ทผ๊ถŒํ•œ ์ฒดํฌ๋„ ์‰ฝ๋‹ค.


๐Ÿ“Œ Spark SQl ์‚ฌ์šฉ๋ฒ•

  • ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ…Œ์ด๋ธ” ๋ทฐ ์ƒ์„ฑ : Create Table
    • createOrReplaceTempView : Spark Session์ด ์‚ด์•„์žˆ๋Š” ๋™์•ˆ ์กด์žฌ
    • createOrReplaceGlobalTempView : Spark ๋“œ๋ผ์ด๋ฒ„๊ฐ€ ์‚ด์•„์žˆ๋Š” ๋™์•ˆ ์กด์žฌ
  • Spark Session์˜ sqlํ•จ์ˆ˜๋กœ SQL ๊ฒฐ๊ณผ๋ฅผ DataFrame์œผ๋กœ ๋ฐ›๋Š”๋‹ค.
namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
	SELECT gender, count(1) FROM namegender GROUP BY 1
""")

print(namegender_group_df.collect())

๐Ÿ“Œ SparkSession ์‚ฌ์šฉ ์™ธ๋ถ€ DB์—ฐ๊ฒฐ

  • Spark Session์˜ readํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœ(๋กœ๊ทธ์ธ ๊ด€๋ จ ์ •๋ณด์™€ ์ฝ์–ด์˜ค๊ณ ์ž ํ•˜๋Š” ํ…Œ์ด๋ธ” ํ˜น์€ SQL์„ ์ง€์ •)
    -> ๊ฒฐ๊ณผ๊ฐ€ DataFrame์œผ๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.
df_user_session_channel = sprak.read \
	.format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

๐Ÿ“Œ Aggregation Function

  • DataFrame์ด ์•„๋‹Œ SQL๋กœ ์ž‘์„ฑํ•˜๋Š” ๊ฒƒ์„ ์ถ”์ฒœํ•œ๋‹ค.
  • Group By
  • Window
  • Rank

๐Ÿ“Œ Join

  • SQL ์กฐ์ธ์€ ๋‘ ๊ฐœ ํ˜น์€ ๊ทธ ์ด์ƒ์˜ ํ…Œ์ด๋ธ”๋“ค์„ ๊ณตํ†ต ํ•„๋“œ๋ฅผ ๊ฐ€์ง€๊ณ  Merge

  • ์Šคํƒ€ ์Šคํ‚ค๋งˆ๋กœ ๊ตฌ์„ฑ๋œ ํ…Œ์ด๋ธ”๋“ค๋กœ ๋ถ„์‚ฐ๋˜์–ด ์žˆ๋˜ ์ •๋ณด๋ฅผ ํ†ตํ•ฉํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•œ๋‹ค.

  • ์™ผ์ชฝ ํ…Œ์ด๋ธ”์„ LEFT๋ผ๊ณ  ํ•˜๊ณ  ์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ”์„ RIGHT๋ผ๊ณ  ํ•œ๋‹ค๋ฉด?

    • Join์˜ ๊ฒฐ๊ณผ๋Š” ๋ฐฉ์‹์— ๋”ฐ๋ผ ์–‘์ชฝ์˜ ํ•„๋“œ๋ฅผ ๋ชจ๋‘ ๊ฐ€์ง„ ์ƒˆ๋กœ์šด ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑ
    • Join์˜ ๋ฐฉ์‹์— ๋”ฐ๋ผ ๋‹ค์Œ ๋‘ ๊ฐ€์ง€๊ฐ€ ๋‹ฌ๋ผ์ง„๋‹ค.
      • ์–ด๋–ค ๋ ˆ์ฝ”๋“œ๋“ค์ด ์„ ํƒ๋˜๋Š”๊ฐ€?
      • ์–ด๋–ค ํ•„๋“œ๋“ค์ด ์ฑ„์›Œ์ง€๋Š”๊ฐ€?
  • ๋‹ค์–‘ํ•œ ์ข…๋ฅ˜์˜ Join - ์ฐธ๊ณ 

Join ์‹ค์Šต

  • Vital
UserIDVitalIDDateWeight
10012020-01-0175
10032020-01-0278
10122020-01-0190
10142020-01-0295
  • Alert
AlertIDVitalIDAlertTypeDateUserID
14WeightIncrease2020-01-02101
2NULLWissingVital2020-01-04100
3NULLWissingVital2020-01-04101

INNER JOIN

  • ์–‘์ชฝ ํ…Œ์ด๋ธ”์—์„œ ๋งค์น˜๊ฐ€ ๋˜๋Š” ๋ ˆ์ฝ”๋“œ๋“ค๋งŒ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

  • ์–‘์ชฝ ํ…Œ์ด๋ธ”์˜ ํ•„๋“œ๊ฐ€ ๋ชจ๋‘ ์ฑ„์›Œ์ง„ ์ƒํƒœ๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

SELECT * FROM Vital v
JOIN Alert a ON v.vitalID = a.vitalID;
v.UserIDv.VitalIDv.Datev.Weighta.AlertIDa.VitalIDa.AlertTypea.Datea.UserID
10142020-01-029514WeightIncrease2021-01-02101

LEFT JOIN

  • ์™ผ์ชฝ ํ…Œ์ด๋ธ”(Base)์˜ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๋“ค์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

  • ์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ”์˜ ํ•„๋“œ๋Š” ์™ผ์ชฝ ๋ ˆ์ฝ”๋“œ์™€ ๋งค์นญ๋˜๋Š” ๊ฒฝ์šฐ์—๋งŒ ์ฑ„์›Œ์ง„ ์ƒํƒœ๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

SELECT * FROM raw_data.Vital v
LEFT JOIN raw_data.Alert a ON v.vitalID = a.vitalID;
v.UserIDv.VitalIDv.Datev.Weighta.AlertIDa.VitalIDa.AlertTypea.Datea.UserID
10012020-01-0175NULLNULLNULLNULLNULL
10032020-01-0278NULLNULLNULLNULLNULL
10122020-01-0190NULLNULLNULLNULLNULL
10142020-01-029514WeightIncrease2021-01-02101

FULL JOIN

  • ์™ผ์ชฝ ํ…Œ์ด๋ธ”๊ณผ ์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ”์˜ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๋“ค์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

  • ๋งค์นญ ๋˜๋Š” ๊ฒฝ์šฐ์—๋งŒ ์–‘์ชฝ ํ…Œ์ด๋ธ”๋“ค์˜ ๋ชจ๋“  ํ•„๋“œ๋“ค์ด ์ฑ„์›Œ์ง„ ์ƒํƒœ๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

SELECT * FROM raw_data.Vital v
FULL JOIN raw_data.Alert a ON v.vitalID = a.vitalID;
v.UserIDv.VitalIDv.Datev.Weighta.AlertIDa.VitalIDa.AlertTypea.Datea.UserID
10012020-01-0175NULLNULLNULLNULLNULL
10032020-01-0278NULLNULLNULLNULLNULL
10122020-01-0190NULLNULLNULLNULLNULL
10142020-01-029514WeightIncrease2021-01-02101
NULLNULLNULLNULL2NULLMissingVital2020-01-04100
NULLNULLNULLNULL3NULLMissingVital2020-01-04101

CROSS JOIN

  • ์™ผ์ชฝ ํ…Œ์ด๋ธ”๊ณผ ์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ”์˜ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๋“ค์˜ ์กฐํ•ฉ์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
SELECT * FROM raw_data.Vital v CROSS JOIN raw_data.Alert a;

SELF JOIN

  • ๋™์ผํ•œ ํ…Œ์ด๋ธ”์„ alias๋ฅผ ๋‹ฌ๋ฆฌํ•ด์„œ ์ž๊ธฐ ์ž์‹ ๊ณผ ์กฐ์ธํ•œ๋‹ค.
SELECT * FROM raw_data.Vital v1
JOIN raw_data.Vital v2 ON v1.vitalID = v2.vitalID;
v1.UserIDv1.VitalIDv1.Datev1.Weightv2.UserIDv2.VitalIDv2.Datev2.Weight
10012020-01-017510012020-01-0175
10032020-01-027810032020-01-0278
10122020-01-019010122020-01-0190
10142020-01-029510142020-01-0295

์ตœ์ ํ™” ๊ด€์ ์—์„œ ๋ณธ ์กฐ์ธ

  • Shuffle Join

    • ์ผ๋ฐ˜ ์กฐ์ธ ๋ฐฉ์‹
    • Bucket JOIN: ์กฐ์ธ ํ‚ค๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์ƒˆ๋กœ ํŒŒํ‹ฐ์…˜์„ ๋งŒ๋“ค๊ณ  ์กฐ์ธ์„ ํ•˜๋Š” ๋ฐฉ์‹
  • Broadcast Join

    • ํฐ ๋ฐ์ดํ„ฐ์™€ ์ž‘์€ ๋ฐ์ดํ„ฐ ๊ฐ„์˜ ์กฐ์ธ
    • ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„ ํ•˜๋‚˜๊ฐ€ ์ถฉ๋ถ„ํžˆ ์ž‘์œผ๋ฉด ์ž‘์€ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ด ์žˆ๋Š” ์„œ๋ฒ„๋“ค๋กœ ๋ฟŒ๋ฆฌ๋Š” ๊ฒƒ์ด๋‹ค.(broadcasting)
      -> spark.sql.autoBroadcastJoinThreshold ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ถฉ๋ถ„ํžˆ ์ž‘์€์ง€ ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค.

JOIN

Broadcast JOIN

๐Ÿ“Œ UDF

  • User Defined Function์˜ ์•ฝ์ž
  • DataFrame์ด๋‚˜ SQL์—์„œ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜
  • Scalar ํ•จ์ˆ˜ vs. Aggregation ํ•จ์ˆ˜
    • Scalar ํ•จ์ˆ˜ ์˜ˆ์‹œ : UPPER, LOWER, ...
    • Aggregation ํ•จ์ˆ˜(UDAF) ์˜ˆ์‹œ : SUM, MIN, MAX

UDF ์‚ฌ์šฉ ๋ฐฉ๋ฒ•(1)

  • ํ•จ์ˆ˜ ๊ตฌํ˜„
    • Python ํ•จ์ˆ˜
    • Python Lambda ํ•จ์ˆ˜
    • Python Pandas ํ•จ์ˆ˜
      • pyspark.sql.functions.pandas_udf๋กœ annotation
      • Apache Arrow๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Python๊ฐ์ฒด๋ฅผ Java๊ฐ์ฒด๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๊ฒƒ์ด ํ›จ์”ฌ ๋” ํšจ์œจ์ ์ด๋‹ค.

UDF ์‚ฌ์šฉ ๋ฐฉ๋ฒ•(2)

  • ํ•จ์ˆ˜ ๋“ฑ๋ก

    • pyspark.sql.functions.udf
      -> DataFrame์—์„œ๋งŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.
    • spark.udf.register
      ->SQL ๋ชจ๋‘์—์„œ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • ํ•จ์ˆ˜ ์‚ฌ์šฉ

    • .withColumn, .agg
    • SQL
  • ์„ฑ๋Šฅ์ด ์ค‘์š”ํ•˜๋‹ค๋ฉด?

    • Scala & Java๋กœ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.
    • Python์„ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค๋ฉด Pandas UDF๋กœ ๊ตฌํ˜„ํ•ด์•ผํ•œ๋‹ค.

UDF - DataFrame์— ์‚ฌ์šฉ(1)

import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))

def upper(s):
	return s.upper()
    
# Test
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

# DataFrame ๊ธฐ๋ฐ˜ SQL์— ์ ์šฉ
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()

UDF - DataFrame์— ์‚ฌ์šฉ(2)

data = [
	{"a":1,"b":2},
    {"a":5,"b":5}
]

df = spark.createDataFrame(data)
df.withColumn("c"F.udf(lambda x, y: x + y)("a", "b"))

def plus(x, y):
	return x + y

plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()

df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()

UDF - Pandas UDF Scalar Function

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
	return s.str.upper()
    
upperUDF = spark.udf.register("upper_udf", upper_udf2)

df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()

UDF - DataFrame/SQL์— Aggregation ์‚ฌ์šฉ

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
	return v.mean()
    
averageUDF = spark.udf.register('average', average)

spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()

0๊ฐœ์˜ ๋Œ“๊ธ€