Один и два хоп цитирования в scala - PullRequest
0 голосов
/ 19 апреля 2020

У меня есть список «от узла» и «до узла», который выглядит следующим образом:

1234        4567
1234        6789
1234        3456
4567        9876
….

Суть в том, чтобы найти, какой узел является наиболее значимым, что означает, какой из них имеет самый и цитата из двух прыжков: 1234 с (4567,6789,3456,9876 (потому что он связан с 4567))

В настоящее время все, что я сделал, - это карта и функция сокращения, чтобы получить наиболее появляющийся узел, который бы покрывал цитирование одного узла. Но мне нужно охватить случаи, когда A -> B и B -> C для этого A -> C.

текущий код для поиска десяти лучших узлов:

val textFile = sc.textFile("cit-Patents.txt")

val arrayForm = textFile.filter(_.charAt(0)!='#')

val mapreduce = arrayForm.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).sortBy(_._2,ascending=false).take(10);

Я знаю, что GraphX ​​может помочь с этим, но я не знал, как это сделать.

Если вам нужна дополнительная информация, пожалуйста, дайте мне знать. Спасибо.

1 Ответ

1 голос
/ 20 апреля 2020

Я думаю, что для ваших условий вам не нужен spark-graphx. Ваша проблема просто решена путем присоединения вашей базы DataFrame к себе, посмотрите на код:

предположим, что у нас есть DataFrame с прямыми ссылками от X до Y:

val df = Seq(
  (1234, 4567),
  (1234, 6789),
  (1234, 3456),
  (4567, 9876),
  (5, 6),
  (6, 7),
  (6, 8),
  (6, 9),
  (5, 9),
  (6, 10)
).toDF("X", "Y")

мы видим, что некоторые Строки имеют то же значение Y, что и другие строки X. Это означает, что мы можем соединить DataFrame с самим собой (давайте используем псевдонимы a и b) по условию: a.Y должно быть равно b.X:

import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
  df.as("b"), 
  col("a.Y") === col("b.X")
)
  .select(col("a.X").as("X"), col("b.Y").as("Y"))

теперь мы видим все переходные ссылки от a.X до b.Y:

twoHopCitation.show()
+----+----+
|   X|   Y|
+----+----+
|1234|9876|
|   5|  10|
|   5|   9|
|   5|   8|
|   5|   7|
+----+----+

, поэтому все, что нам нужно, это объединить эти два DataFrames и агрегировать по X считая Y и сортируя по count Y по убыванию:

df.union(
  twoHopCitation
)
  .groupBy("X")
  .agg(count(col("Y")).as("cntY"))
  .sort(col("cntY").desc)
  .show()
+----+----+
|   X|cntY|
+----+----+
|   5|   6|
|   6|   4|
|1234|   4|
|4567|   1|
+----+----+
...