Я думаю, что для ваших условий вам не нужен spark-graphx. Ваша проблема просто решена путем присоединения вашей базы DataFrame
к себе, посмотрите на код:
предположим, что у нас есть DataFrame с прямыми ссылками от X до Y:
val df = Seq(
(1234, 4567),
(1234, 6789),
(1234, 3456),
(4567, 9876),
(5, 6),
(6, 7),
(6, 8),
(6, 9),
(5, 9),
(6, 10)
).toDF("X", "Y")
мы видим, что некоторые Строки имеют то же значение Y
, что и другие строки X
. Это означает, что мы можем соединить DataFrame с самим собой (давайте используем псевдонимы a
и b
) по условию: a.Y
должно быть равно b.X
:
import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
df.as("b"),
col("a.Y") === col("b.X")
)
.select(col("a.X").as("X"), col("b.Y").as("Y"))
теперь мы видим все переходные ссылки от a.X
до b.Y
:
twoHopCitation.show()
+----+----+
| X| Y|
+----+----+
|1234|9876|
| 5| 10|
| 5| 9|
| 5| 8|
| 5| 7|
+----+----+
, поэтому все, что нам нужно, это объединить эти два DataFrames и агрегировать по X
считая Y
и сортируя по count Y
по убыванию:
df.union(
twoHopCitation
)
.groupBy("X")
.agg(count(col("Y")).as("cntY"))
.sort(col("cntY").desc)
.show()
+----+----+
| X|cntY|
+----+----+
| 5| 6|
| 6| 4|
|1234| 4|
|4567| 1|
+----+----+