๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ Spark ๋ชจ๋
๋ฐ์ดํฐ ํ๋ ์ ์์ ์ SQL๋ก ์ฒ๋ฆฌ ๊ฐ๋ฅํ๋ค.
๋ฐ์ดํฐ ํ๋ ์์ ํ
์ด๋ธ ์ด๋ฆ์ ์ง์ ํ ํ SQLํจ์๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
-> Pandas์๋ pandassql ๋ชจ๋์ sqldf ํจ์๋ฅผ ์ด์ฉํ๋ ๋์ผํ ํจํด์ด ์กด์ฌํ๋ค.
HQL(Hive Query Language)๊ณผ ํธํ์ ์ ๊ณตํ๋ค.
-> Hive ํ
์ด๋ธ๋ค์ ์ฝ๊ณ ์ธ ์ ์๋ค.(Hive Metastore)
Familiarity & Readability
-> SQL์ด ๊ฐ๋
์ฑ์ด ๋ ์ข๊ณ ๋ ๋ง์ ์ฌ๋๋ค์ด ์ฌ์ฉ๊ฐ๋ฅํ๋ค.
Optimization
-> Spark SQL ์์ง์ด ์ต์ ํํ๊ธฐ ๋ ์ข๋ค.(SQL์ Declarative)
->Catalyst Optimizer & Project Tungsten
Interoperability & Data Management
-> SQL์ด ํฌํ
๋ ์ฝ๊ณ ์ ๊ทผ๊ถํ ์ฒดํฌ๋ ์ฝ๋ค.
namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1
""")
print(namegender_group_df.collect())
df_user_session_channel = sprak.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
SQL ์กฐ์ธ์ ๋ ๊ฐ ํน์ ๊ทธ ์ด์์ ํ ์ด๋ธ๋ค์ ๊ณตํต ํ๋๋ฅผ ๊ฐ์ง๊ณ Merge
์คํ ์คํค๋ง๋ก ๊ตฌ์ฑ๋ ํ ์ด๋ธ๋ค๋ก ๋ถ์ฐ๋์ด ์๋ ์ ๋ณด๋ฅผ ํตํฉํ๋๋ฐ ์ฌ์ฉํ๋ค.
์ผ์ชฝ ํ ์ด๋ธ์ LEFT๋ผ๊ณ ํ๊ณ ์ค๋ฅธ์ชฝ ํ ์ด๋ธ์ RIGHT๋ผ๊ณ ํ๋ค๋ฉด?
๋ค์ํ ์ข
๋ฅ์ Join - ์ฐธ๊ณ

| UserID | VitalID | Date | Weight |
|---|---|---|---|
| 100 | 1 | 2020-01-01 | 75 |
| 100 | 3 | 2020-01-02 | 78 |
| 101 | 2 | 2020-01-01 | 90 |
| 101 | 4 | 2020-01-02 | 95 |
| AlertID | VitalID | AlertType | Date | UserID |
|---|---|---|---|---|
| 1 | 4 | WeightIncrease | 2020-01-02 | 101 |
| 2 | NULL | WissingVital | 2020-01-04 | 100 |
| 3 | NULL | WissingVital | 2020-01-04 | 101 |
์์ชฝ ํ ์ด๋ธ์์ ๋งค์น๊ฐ ๋๋ ๋ ์ฝ๋๋ค๋ง ๋ฐํํ๋ค.
์์ชฝ ํ ์ด๋ธ์ ํ๋๊ฐ ๋ชจ๋ ์ฑ์์ง ์ํ๋ก ๋ฐํ๋๋ค.
SELECT * FROM Vital v
JOIN Alert a ON v.vitalID = a.vitalID;
| v.UserID | v.VitalID | v.Date | v.Weight | a.AlertID | a.VitalID | a.AlertType | a.Date | a.UserID |
|---|---|---|---|---|---|---|---|---|
| 101 | 4 | 2020-01-02 | 95 | 1 | 4 | WeightIncrease | 2021-01-02 | 101 |
์ผ์ชฝ ํ ์ด๋ธ(Base)์ ๋ชจ๋ ๋ ์ฝ๋๋ค์ ๋ฐํํ๋ค.
์ค๋ฅธ์ชฝ ํ ์ด๋ธ์ ํ๋๋ ์ผ์ชฝ ๋ ์ฝ๋์ ๋งค์นญ๋๋ ๊ฒฝ์ฐ์๋ง ์ฑ์์ง ์ํ๋ก ๋ฐํ๋๋ค.
SELECT * FROM raw_data.Vital v
LEFT JOIN raw_data.Alert a ON v.vitalID = a.vitalID;
| v.UserID | v.VitalID | v.Date | v.Weight | a.AlertID | a.VitalID | a.AlertType | a.Date | a.UserID |
|---|---|---|---|---|---|---|---|---|
| 100 | 1 | 2020-01-01 | 75 | NULL | NULL | NULL | NULL | NULL |
| 100 | 3 | 2020-01-02 | 78 | NULL | NULL | NULL | NULL | NULL |
| 101 | 2 | 2020-01-01 | 90 | NULL | NULL | NULL | NULL | NULL |
| 101 | 4 | 2020-01-02 | 95 | 1 | 4 | WeightIncrease | 2021-01-02 | 101 |
์ผ์ชฝ ํ ์ด๋ธ๊ณผ ์ค๋ฅธ์ชฝ ํ ์ด๋ธ์ ๋ชจ๋ ๋ ์ฝ๋๋ค์ ๋ฐํํ๋ค.
๋งค์นญ ๋๋ ๊ฒฝ์ฐ์๋ง ์์ชฝ ํ ์ด๋ธ๋ค์ ๋ชจ๋ ํ๋๋ค์ด ์ฑ์์ง ์ํ๋ก ๋ฐํ๋๋ค.
SELECT * FROM raw_data.Vital v
FULL JOIN raw_data.Alert a ON v.vitalID = a.vitalID;
| v.UserID | v.VitalID | v.Date | v.Weight | a.AlertID | a.VitalID | a.AlertType | a.Date | a.UserID |
|---|---|---|---|---|---|---|---|---|
| 100 | 1 | 2020-01-01 | 75 | NULL | NULL | NULL | NULL | NULL |
| 100 | 3 | 2020-01-02 | 78 | NULL | NULL | NULL | NULL | NULL |
| 101 | 2 | 2020-01-01 | 90 | NULL | NULL | NULL | NULL | NULL |
| 101 | 4 | 2020-01-02 | 95 | 1 | 4 | WeightIncrease | 2021-01-02 | 101 |
| NULL | NULL | NULL | NULL | 2 | NULL | MissingVital | 2020-01-04 | 100 |
| NULL | NULL | NULL | NULL | 3 | NULL | MissingVital | 2020-01-04 | 101 |
SELECT * FROM raw_data.Vital v CROSS JOIN raw_data.Alert a;

SELECT * FROM raw_data.Vital v1
JOIN raw_data.Vital v2 ON v1.vitalID = v2.vitalID;
| v1.UserID | v1.VitalID | v1.Date | v1.Weight | v2.UserID | v2.VitalID | v2.Date | v2.Weight |
|---|---|---|---|---|---|---|---|
| 100 | 1 | 2020-01-01 | 75 | 100 | 1 | 2020-01-01 | 75 |
| 100 | 3 | 2020-01-02 | 78 | 100 | 3 | 2020-01-02 | 78 |
| 101 | 2 | 2020-01-01 | 90 | 101 | 2 | 2020-01-01 | 90 |
| 101 | 4 | 2020-01-02 | 95 | 101 | 4 | 2020-01-02 | 95 |
Shuffle Join
Broadcast Join


ํจ์ ๋ฑ๋ก
ํจ์ ์ฌ์ฉ
์ฑ๋ฅ์ด ์ค์ํ๋ค๋ฉด?
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
def upper(s):
return s.upper()
# Test
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame ๊ธฐ๋ฐ SQL์ ์ ์ฉ
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()

data = [
{"a":1,"b":2},
{"a":5,"b":5}
]
df = spark.createDataFrame(data)
df.withColumn("c"F.udf(lambda x, y: x + y)("a", "b"))
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()

from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf2)
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()

from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()