
- SparkSQL
- UDF (User Defined Function)
구조화된 데이터 처리를 위한 Spark 모듈로, 데이터 프레임 작업을 SQL로 처리 가능하게 해준다.
- 데이터 프레임에 테이블 이름 지정 후 sql함수 사용 가능
- pandas에도 pandasql 모듈의 sqldf 함수를 이용하는 동일한 패턴 존재
- HQL(Hive Query Language)과의 호환도 제공
- Hive 테이블들을 읽고 쓸 수 있음 (Hive Meatastore)
DataFrame과 비교해서 Spark SQL은 다음과 같은 장점이 있다.
데이터 프레임을 기반으로 테이블 뷰를 생성 : 테이블이 만들어짐
createOrReplaceTempView : spark Session이 살아있는 동안 존재createOrReplaceGlobalTempView : Spark 드라이버가 살아있는 동안 존재Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 받음
namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1
""")
print(namegender_group_df.collect())
JDBC를 이용하여 외부의 관계형 데이터베이스와 연결하려면 Spark Session의 read 함수를 호출하면 결과가 데이터프레임으로 리턴됨
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver","com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift//HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
특정 조건을 만족시키기 위해 새로운 함수를 만들어야 할 경우에 유용하게 사용할 수 있는 것이 UDF이다. 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적이다.
Aggregation용 UDAF도 존재한다. (GROUP BY에서 사용되는 SUM, AVG와 같은 함수를 만드는 것, PySpark에서는 지원되지 않음)
# 예시(Pandas UDF Scalar 함수)
from pyspark.sql.functions import pandas_udf
import pandas as pd
# UDF 생성
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf2)
# 결과 출력
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) 'Curated Name' FROM test""").show()
Aggregation용 UDF도 만들어보자
# 예시(Pandas UDF로 Aggregation 사용)
from pyspark.sql.functions import pandas_udf
import pandas as pd
# UDF 생성
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
# 결과 출력
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()