zeppelin 으로 작업하다 만난 Task not serializable 해결 과정을 기록한다.
두 gps 좌표간의 거리를 계산하는 udf를 아래와 같이 만들어 사용하려 했다.
def deg2rad (deg: Double) = { deg * Math.PI / 180.0 }
def rad2deg (rad: Double) = { rad * 180 / Math.PI }
val distance = udf { (lat1: Double, lon1: Double, lat2: Double, lon2: Double) => {
val theta = lon1 - lon2
var dist = Math.sin(deg2rad(lat1)) * Math.sin(deg2rad(lat2))
+ Math.cos(deg2rad(lat1)) * Math.cos(deg2rad(lat2)) * Math.cos(deg2rad(theta))
dist = Math.acos(dist)
dist = rad2deg(dist)
dist = dist * 60 * 1.1515
dist * 1.609344
}
}
// UDF 사용
df.withColumn("distance", distance(col("pos_lat"), col("pos_lon"), col("lag_pos_lat"), col("lag_pos_lon")))
distance를 적용 후 액션
org.apache.spark.SparkException: Task not serializable
.
.
.
.
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: lag(pos_lat, 1, NULL) OVER (ORDER BY diff ASC NULLS FIRST unspecifiedframe$()))
LAG를 사용하는데 문제가 있나해서 검색하여 @transient 를 적용
@transient val window = Window.orderBy("id")
@transient val lagCol = lag(col("ego_speed"), 1).over(window)
해결 안됨.
udf에 serializable 적용
object distance extends java.io.Serializable {
}
해결 안됨.
udf 밖에 선언한 deg2rad와 rag2deg를 udf 안에 선언하여 해결 완료.
val distance = udf { (lat1: Double, lon1: Double, lat2: Double, lon2: Double) => {
def deg2rad (deg: Double) = { deg * Math.PI / 180.0 }
def rad2deg (rad: Double) = { rad * 180 / Math.PI }
val theta = lon1 - lon2
var dist = Math.sin(deg2rad(lat1)) * Math.sin(deg2rad(lat2))
+ Math.cos(deg2rad(lat1)) * Math.cos(deg2rad(lat2)) * Math.cos(deg2rad(theta))
dist = Math.acos(dist)
dist = rad2deg(dist)
dist = dist * 60 * 1.1515
dist * 1.609344
}
}