Искра: евклидово расстояние поэлементно между двумя столбцами векторов - PullRequest
1 голос
/ 23 апреля 2019

У меня есть два кадра данных df1 и df2, которые имеют следующие структуры:

print(df1)
+-------+------------+-------------+---------+
|     id|      vector|  start_time | end_time|
+-------+------------+-------------+---------+
|      1| [0,0,0,0,0]|          000|      200|
|      2| [1,1,1,1,1]|          200|      500|
|      3| [0,1,0,1,0]|          100|      500|
+-------+------------+-------------+---------+

print(df2)
+-------+------------+-------+
|     id|      vector|   time|
+-------+------------+-------+
|      A| [0,1,1,1,0]|    050|
|      B| [1,0,0,1,1]|    150|
|      C| [1,1,1,1,1]|    250|
|      D| [1,0,1,0,1]|    350|
|      E| [1,1,1,1,1]|    450|
|      F| [1,0,5,0,0]|    550|
+-------+------------+-------+

То, что я хочу: для каждого из данных df1 получить все данные из df2, для которых time находится между start_time и end_time, и для всех этих данных вычислить евклидово расстояние между двумя векторы.

Я начал со следующего кода, но застрял на пути вычисления расстояния:

val joined_DF = kafka_DF.crossJoin(
        hdfs_DF.withColumnRenamed("id","id2").withColumnRenamed("vector","vector2")
    )
      .filter(col("time")>= col("start_time") &&
        col("time")<= col("end_time"))
        .withColumn("distance", ???) // Euclidean distance element-wise between columns vector and column vector2

Вот ожидаемый результат на данных примера:

+-------+------------+-------------+---------+-------+------------+------+----------+
|     id|      vector|  start_time | end_time|    id2|     vector2|  time| distance |
+-------+------------+-------------+---------+-------+------------+------+----------+
|      1| [0,0,0,0,0]|          000|      200|      A| [0,1,1,1,0]|   050|   1.73205|
|      1| [0,0,0,0,0]|          000|      200|      B| [1,0,0,1,1]|   150|   1.73205|
|      2| [1,1,1,1,1]|          200|      500|      C| [1,1,1,1,1]|   250|         0|
|      2| [1,1,1,1,1]|          200|      500|      D| [1,0,1,0,1]|   350|   1.41421|
|      2| [1,1,1,1,1]|          200|      500|      E| [1,1,1,1,1]|   450|         0|
|      3| [0,1,0,1,0]|          100|      500|      B| [1,0,0,1,1]|   150|   1.73205|
|      3| [0,1,0,1,0]|          100|      500|      C| [1,1,1,1,1]|   250|   1.73205|
|      3| [0,1,0,1,0]|          100|      500|      D| [1,0,1,0,1]|   350|   2.23606|
|      3| [0,1,0,1,0]|          100|      500|      E| [1,1,1,1,1]|   450|   1.73205|
+-------+------------+-------------+---------+-------+------------+------+----------+

Примечания:

  • df1 всегда будет иметь небольшое количество данных, поэтому кросс-джойн не рискует взорвать мою память.
  • Мои фреймы данных были созданы с использованием API структурированной потоковой передачи.
  • Я использую Spark 2.3.2

1 Ответ

4 голосов
/ 23 апреля 2019

A udf должно работать в этом случае.

import math._
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.Vectors

//input two vectors of length n, but must be equal length
//output euclidean distance between the vectors
val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
    sqrt(Vectors.sqdist(v1, v2))
}

Используйте свой новый udf, как это:

joined_DF.withColumn("distance", euclideanDistance($"vector", $"vector2"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...