как найти количество вершин, которые достижимы из данной вершины в Spark GraphX - PullRequest
0 голосов
/ 27 апреля 2018

Я хочу узнать количество достижимых вершин из данной вершины в ориентированном графе (см. Изображение ниже), например, для id = 0L, поскольку он подключается к 1L и 2L, 1L подключается к 3L, 2L подключается к 4L, следовательно, вывод должен быть 4. Ниже приведены данные отношения графика:

edgeid from to distance
0 0 1 10.0
1 0 2 5.0
2 1 2 2.0
3 1 3 1.0
4 2 1 3.0
5 2 3 9.0
6 2 4 2.0
7 3 4 4.0
8 4 0 7.0
9 4 3 5.0

Мне удалось настроить график, но я не уверен, как использовать graph.edges.filter для получения результата

val vertexRDD: RDD[(Long, (Double))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(Double), Int] = Graph(vertexRDD, edgeRDD)

enter image description here

1 Ответ

0 голосов
/ 29 апреля 2018

В вашем примере все вершины связаны с направленным путем, поэтому каждая вершина должна иметь значение 4.

Но если вы удалите соединение 4-> 0 (id = 8) , то, конечно, будет другое число.

Поскольку ваша проблема основана на (рекурсивном) обходе графа параллельно, Graphx Pregel API , вероятно, является лучшим подходом.

Для вызова pregel требуется 3 функции

  • vprog для инициализации каждой вершины сообщением (в вашем случае пустым List[VertexId])
  • sendMsg шаг обновления, который применяется к каждой итерации (в вашем случае накапливается соседние VertexId s и возвращается Iterator с сообщениями для отправки на следующую итерацию
  • mergeMsg для объединения двух сообщений (2 List[VertexId] с в 1)

В коде это будет выглядеть так:

  def vprog(id: VertexId, orig: List[VertexId], newly: List[VertexId]) : List[VertexId] = newly

  def mergeMsg(a: List[VertexId], b: List[VertexId]) : List[VertexId] = (a ++ b).distinct

  def sendMsg(trip: EdgeTriplet[List[VertexId],Double]) : Iterator[(VertexId, List[VertexId])] = {
    val recursivelyConnectedNeighbors = (trip.dstId :: trip.dstAttr).filterNot(_ == trip.srcId)

    if (trip.srcAttr.intersect(recursivelyConnectedNeighbors).length != recursivelyConnectedNeighbors.length)
      Iterator((trip.srcId, recursivelyConnectedNeighbors))
    else
      Iterator.empty
  }

  val initList = List.empty[VertexId]

  val result = graph
    .mapVertices((_,_) => initList)
    .pregel(
      initialMsg = initList,
      activeDirection = EdgeDirection.Out
    )(vprog, sendMsg, mergeMsg)
    .mapVertices((_, neighbors) => neighbors.length)

  result.vertices.toDF("vertex", "value").show()

Выход:

+------+-----+
|vertex|value|
+------+-----+
|     0|    4|
|     1|    3|
|     2|    3|
|     3|    1|
|     4|    1|
+------+-----+

Обязательно поэкспериментируйте с spark.graphx.pregel.checkpointInterval, если вы получаете обходные большие графики OoM (или настраиваете maxIterations в pregel init)

...