Spark Zeppelin - Task not serializable

larvatus·2021년 11월 17일
0

zeppelin 으로 작업하다 만난 Task not serializable 해결 과정을 기록한다.

두 gps 좌표간의 거리를 계산하는 udf를 아래와 같이 만들어 사용하려 했다.

def deg2rad (deg: Double) = { deg * Math.PI / 180.0 }
def rad2deg (rad: Double) = { rad * 180 / Math.PI }

val distance = udf { (lat1: Double, lon1: Double, lat2: Double, lon2: Double) => {
            
      val theta = lon1 - lon2
      var dist = Math.sin(deg2rad(lat1)) * Math.sin(deg2rad(lat2)) 
      	+ Math.cos(deg2rad(lat1)) * Math.cos(deg2rad(lat2)) * Math.cos(deg2rad(theta))
    
      dist = Math.acos(dist)
      dist = rad2deg(dist)
      dist = dist * 60 * 1.1515
    
      dist * 1.609344
    
    }
}
    
   
// UDF 사용
df.withColumn("distance", distance(col("pos_lat"), col("pos_lon"), col("lag_pos_lat"), col("lag_pos_lon"))) 

distance를 적용 후 액션

org.apache.spark.SparkException: Task not serializable
.
.
.
.
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
	- object not serializable (class: org.apache.spark.sql.Column, value: lag(pos_lat, 1, NULL) OVER (ORDER BY diff ASC NULLS FIRST unspecifiedframe$()))

LAG를 사용하는데 문제가 있나해서 검색하여 @transient 를 적용

@transient val window = Window.orderBy("id")
@transient val lagCol = lag(col("ego_speed"), 1).over(window)

해결 안됨.

udf에 serializable 적용

object distance extends java.io.Serializable {
    
}

해결 안됨.
udf 밖에 선언한 deg2rad와 rag2deg를 udf 안에 선언하여 해결 완료.

val distance = udf { (lat1: Double, lon1: Double, lat2: Double, lon2: Double) => {
    
    def deg2rad (deg: Double) = { deg * Math.PI / 180.0 }
    def rad2deg (rad: Double) = { rad * 180 / Math.PI }

    val theta = lon1 - lon2
    var dist = Math.sin(deg2rad(lat1)) * Math.sin(deg2rad(lat2)) 
    + Math.cos(deg2rad(lat1)) * Math.cos(deg2rad(lat2)) * Math.cos(deg2rad(theta))

    dist = Math.acos(dist)
    dist = rad2deg(dist)
    dist = dist * 60 * 1.1515

    dist * 1.609344

    }
}
profile
물고기를 좋아합니다.

0개의 댓글