У меня есть искровой фрейм данных, ради аргумента давайте возьмем его:
val df = sc.parallelize(
Seq(("a",1,2),("a",1,4),("b",5,6),("b",10,2),("c",1,1))
).toDF("id","x","y")
+---+---+---+
| id| x| y|
+---+---+---+
| a| 1| 2|
| a| 1| 4|
| b| 5| 6|
| b| 10| 2|
| c| 1| 1|
+---+---+---+
Я хотел бы вычислить все попарные различия между записями в кадре данных с тем же идентификатором и вывести результат в другой кадр данных. Для небольшого информационного кадра я могу сделать это:
df.crossJoin(
df.select(
(df.columns.map(x=>col(x).as("_"+x))):_*)
).where(
col("id")===col("_id")
).select(
col("id"),
(col("x")-col("_x")).as("dx"),
(col("y")-col("_y")).as("dy")
)
+---+---+---+
| id| dx| dy|
+---+---+---+
| c| 0| 0|
| b| 0| 0|
| b| -5| 4|
| b| 5| -4|
| b| 0| 0|
| a| 0| 0|
| a| 0| -2|
| a| 0| 2|
| a| 0| 0|
+---+---+---+
Однако для больших фреймов данных это не разумный подход, поскольку crossJoin будет в основном генерировать данные, которые будут отбрасываться последующим предложением where.
Я все еще довольно новичок в зажигании, и groupBy показался мне естественным местом, чтобы начать искать, но я не могу понять, как этого добиться с помощью groupBy. Любая помощь будет приветствоваться.
В конечном итоге я хотел бы удалить избыточность, например:
val df1 = df.withColumn("idx",monotonicallyIncreasingId)
df.crossJoin(
df.select(
(df.columns.map(x=>col(x).as("_"+x))):_*)
).where(
col("id")===col("_id") && col("idx") < col("_idx")
).select(
col("id"),
(col("x")-col("_x")).as("dx"),
(col("y")-col("_y")).as("dy")
)
+---+---+---+
| id| dx| dy|
+---+---+---+
| b| -5| 4|
| a| 0| -2|
+---+---+---+
Но если это легче сделать с помощью избыточности, то я могу жить с этим.
Это не редкое преобразование для выполнения в ML, поэтому я подумал, что что-то из MLlib может быть уместным, но опять же я тоже ничего там не нашел.