Spark에서 dataframe을 가공할 때 기본적으로 제공되는 함수들로는 원하는 모양으로 가공하기 어려울 때가 있다. 이 때 사용자가 원하는 형태로 함수를 구현할 수 있는 것이 UDF(User Defined Functions) 이다.
User-Defined Functions (UDFs) are user-programmable routines that act on one row.
row 마다 적용되어 새로운 column을 만들 수 있는 사용자 정의 함수 이다. RDBMS의 User Defined Functions와 유사하게 작동하며, scala 에서는 udf()
로 함수를 wrapping하여 사용할 수 있다.
Spark dataframe에서 사용할 수도 있고, Spark SQL 에서도 직접 사용할 수 있다.
val df = Seq(("Tom", 13, "A"), ("Billy", 15, "C"), ("Mason", 12, "B")).toDF("name", "age", "grade")
df.show()
+-----+---+-----+
| name|age|grade|
+-----+---+-----+
| Tom| 13| A|
|Billy| 15| C|
|Mason| 12| B|
+-----+---+-----+
// udf
val generateMessage = udf((grade: String) =>
try {
grade match {
case "A" => "Good grade"
case "B" => "Avergae grade"
case "C" => "Poor grade"
}
} catch {
case e: Exception => "No grade"
}
)
val dfWithMessage = df.withColumn("message", generateMessage(col("grade")))
dfWithMessage.show()
+-----+---+-----+-------------+
| name|age|grade| message|
+-----+---+-----+-------------+
| Tom| 13| A| Good grade|
|Billy| 15| C| Poor grade|
|Mason| 12| B|Avergae grade|
+-----+---+-----+-------------+
위 코드처럼 dataframe의 한 row 마다 접근 후 udf를 적용한 결과값으로 새로운 컬럼을 생성할 수 있다. 기본 제공되는 함수가 아닌 복잡한 새로운 함수를 필요로 하는 결과를 얻어야 할 때 사용된다.
df.createOrReplaceTempView("grade_data")
spark.udf.register("generateMessage", generateMessage)
val grade_date = spark.sql("""
SELECT name, age, grade, generateMessage(grade)
FROM grade_data;
""").show()
+-----+---+-----+----------------------+
| name|age|grade|generateMessage(grade)|
+-----+---+-----+----------------------+
| Tom| 13| A| Good grade|
|Billy| 15| C| Poor grade|
|Mason| 12| B| Avergae grade|
+-----+---+-----+----------------------+
마찬가지로 Spark SQL에서도 udf를 사용할 수 있다.