createOrReplaceTempView
: Spark Session이 살아 있는 동안 존재createOrReplaceGlobalTempView
: Spark Driver가 살아 있는 동안 존재.sql
함수로 SQL 결과를 DataFrame으로 받음이 함수들은 DataFrame보다 SQL로 작성하는 것을 추천
SUM
, MIN
, MAX
, AVG
, COUNT
, ...ROW_NUMBER
, FIRST_VALUE
, LAST_VALUE
spark.sql.autoBroadcastJoinThreshold
로 충분히 작은지 여부 결정UPPER
, LOWER
, ...SUM
, MIN
, MAX
, pyspark.sql.functions.udf
: df에서만 사용 가능spark.udf.register
: df, sql 모두 사용 가능.withColumn
, .agg
.sql
upper
함수를 구현한다고 해보자.
# df에 lambda 함수를 적용한 UDF
import pyspark.sql.functions as F
from pyspark.sql.types as *
upperUDF = F.udf(lambda x : x.upper(x))
df.withColumn("Upper Name", upperUDF("Name"))
# python 함수를 사용하여 Spark SQL에 적용
def upper(s):
return s.upper()
upperUDF = spark.udf.register("upper", upper)
df.createOrReplaceTempView("test")
# 아래의 두 함수는 같은 결과
spark.sql("""SELECT name, upper(name) as `Upper Name`
FROM test
""").show()
df.select("name", upperUDF("name").alias("Upper Name")).show()
# pandas_udf를 사용하기 (이 방법을 추천)
# 왜? input으로 레코드들의 집합(Series) 형태로 넣어주기 때문에 빠르게 계산
from pyspark.sql.functions import pandas_udf
import pandas as pd
# 결과 데이터의 타입을 입력
@pandas_udf(StringType())
def upper_udf(s:pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf)
df.select("name", upperUDF("name").alias("Upper Name")).show()
spark.sql("""SELECT name, upper_udf(name) as `Upper Name`
FROM test
""").show()
DataFrame에 Aggregation을 사용해보자.
Pandas UDF 방식을 권장
# 예시 : 나이 평균 구하기
@pandas_udf(FloatType())
def average(s:pd.Series) -> float:
return s.mean()
averageUDF = spark.udf.register("average", average)
spark.sql("""SELECT average(age) FROM test""").show()
df.select(averageUDF("age")).show()
UDF를 적용하는 건 익숙하게 하지 않으면 계속 까먹을 것 같다. 자주 실습해봐야겠다.