Ваша функция необходима для использования 4 входных переменных, поэтому ваш фрейм данных также должен иметь эти переменные для вычисления. Я думаю, что это может быть достигнуто с помощью функций Window
и lag
.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("VehicleID", "Date").orderBy("id")
val df2 = df.withColumn("id", monotonically_increasing_id)
.withColumn("Longitude2", lag("Longitude", 1).over(w))
.withColumn("Latitude2", lag("Latitude", 1).over(w))
.orderBy("id")
df2.show(false)
Результат:
+---------+---------+--------+----------+---+----------+---------+
|VehicleID|Longitude|Latitude|Date |id |Longitude2|Latitude2|
+---------+---------+--------+----------+---+----------+---------+
|12311 |55.55431 |25.45631|01/02/2020|0 |null |null |
|12311 |55.55432 |25.45634|01/02/2020|1 |55.55431 |25.45631 |
|12311 |55.55433 |25.45637|02/02/2020|2 |null |null |
|12311 |55.55431 |25.45621|02/02/2020|3 |55.55433 |25.45637 |
|12309 |55.55427 |25.45627|01/02/2020|4 |null |null |
|12309 |55.55436 |25.45655|02/02/2020|5 |null |null |
|12412 |55.55441 |25.45657|01/02/2020|6 |null |null |
|12412 |55.55442 |25.45656|02/02/2020|7 |null |null |
+---------+---------+--------+----------+---+----------+---------+
Затем зарегистрируйте свою функцию как пользовательскую функцию, например as
def haversine_distance(longitude1 : Double,longitude2 : Double,latitude1 : Double,latitude2 : Double) : Double= {
val R = 6372.8
val dlat = math.toRadians(latitude2 - latitude1)
val dlog = math.toRadians(longitude2 - longitude1)
val a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.toRadians(latitude1)) * math.cos(math.toRadians(latitude2)) * math.sin(dlog / 2) * math.sin(dlog / 2)
val c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
val distance = R * c
distance
}
spark.udf.register("haversine_distance", haversine_distance(_: Double, _: Double, _: Double, _: Double): Double)
Наконец, вы можете использовать эту функцию в искре SQL:
df2.withColumn("haversine_distance", expr("haversine_distance(Longitude, Longitude2, Latitude, Latitude2)"))
.show(false)
, которая дает конечный результат:
+---------+---------+--------+----------+---+----------+---------+---------------------+
|VehicleID|Longitude|Latitude|Date |id |Longitude2|Latitude2|haversine_distance |
+---------+---------+--------+----------+---+----------+---------+---------------------+
|12311 |55.55431 |25.45631|01/02/2020|0 |null |null |null |
|12311 |55.55432 |25.45634|01/02/2020|1 |55.55431 |25.45631 |0.0034846437813896825|
|12311 |55.55433 |25.45637|02/02/2020|2 |null |null |null |
|12311 |55.55431 |25.45621|02/02/2020|3 |55.55433 |25.45637 |0.017909203100004076 |
|12309 |55.55427 |25.45627|01/02/2020|4 |null |null |null |
|12309 |55.55436 |25.45655|02/02/2020|5 |null |null |null |
|12412 |55.55441 |25.45657|01/02/2020|6 |null |null |null |
|12412 |55.55442 |25.45656|02/02/2020|7 |null |null |null |
+---------+---------+--------+----------+---+----------+---------+---------------------+