У меня есть два кадра данных df1
и df2
, которые имеют следующие структуры:
print(df1)
+-------+------------+-------------+---------+
| id| vector| start_time | end_time|
+-------+------------+-------------+---------+
| 1| [0,0,0,0,0]| 000| 200|
| 2| [1,1,1,1,1]| 200| 500|
| 3| [0,1,0,1,0]| 100| 500|
+-------+------------+-------------+---------+
print(df2)
+-------+------------+-------+
| id| vector| time|
+-------+------------+-------+
| A| [0,1,1,1,0]| 050|
| B| [1,0,0,1,1]| 150|
| C| [1,1,1,1,1]| 250|
| D| [1,0,1,0,1]| 350|
| E| [1,1,1,1,1]| 450|
| F| [1,0,5,0,0]| 550|
+-------+------------+-------+
То, что я хочу: для каждого из данных df1
получить все данные из df2
, для которых time
находится между start_time
и end_time
, и для всех этих данных вычислить евклидово расстояние между двумя векторы.
Я начал со следующего кода, но застрял на пути вычисления расстояния:
val joined_DF = kafka_DF.crossJoin(
hdfs_DF.withColumnRenamed("id","id2").withColumnRenamed("vector","vector2")
)
.filter(col("time")>= col("start_time") &&
col("time")<= col("end_time"))
.withColumn("distance", ???) // Euclidean distance element-wise between columns vector and column vector2
Вот ожидаемый результат на данных примера:
+-------+------------+-------------+---------+-------+------------+------+----------+
| id| vector| start_time | end_time| id2| vector2| time| distance |
+-------+------------+-------------+---------+-------+------------+------+----------+
| 1| [0,0,0,0,0]| 000| 200| A| [0,1,1,1,0]| 050| 1.73205|
| 1| [0,0,0,0,0]| 000| 200| B| [1,0,0,1,1]| 150| 1.73205|
| 2| [1,1,1,1,1]| 200| 500| C| [1,1,1,1,1]| 250| 0|
| 2| [1,1,1,1,1]| 200| 500| D| [1,0,1,0,1]| 350| 1.41421|
| 2| [1,1,1,1,1]| 200| 500| E| [1,1,1,1,1]| 450| 0|
| 3| [0,1,0,1,0]| 100| 500| B| [1,0,0,1,1]| 150| 1.73205|
| 3| [0,1,0,1,0]| 100| 500| C| [1,1,1,1,1]| 250| 1.73205|
| 3| [0,1,0,1,0]| 100| 500| D| [1,0,1,0,1]| 350| 2.23606|
| 3| [0,1,0,1,0]| 100| 500| E| [1,1,1,1,1]| 450| 1.73205|
+-------+------------+-------------+---------+-------+------------+------+----------+
Примечания:
df1
всегда будет иметь небольшое количество данных, поэтому кросс-джойн не рискует взорвать мою память.
- Мои фреймы данных были созданы с использованием API структурированной потоковой передачи.
- Я использую Spark 2.3.2