'
Я пробираюсь через Graphx In Action и эту книгу.
(исходный код для которого здесь: https://github.com/insidedctm/spark-graphx-in-action)
обсуждаются два способа расчета расстояния
(количество переходов по ребрам) между корнем дерева и всеми узлами
до листьев. Я понимаю примеры кода, которые предоставляются с использованием
aggregateMessages. В частности, условие остановки имеет смысл (у меня есть
выделил это условие через комментарий, который включает в себя
текст «STOP CONDITION», ниже.) После того, как атрибуты на вершинах
график перестает меняться, больше не имеет смысла продолжать работу алгоритма.
Я был немного озадачен, когда посмотрел на способ вычисления
тот же результат (показан ниже.)
В частности, когда вызывается метод применения Прегеля, maxIterations
по умолчанию используется Integer.MAX_VALUE (который для всех практических целей «работает вечно».)
Следовательно, это похоже на функцию sendMsg:
(et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
будет вызываться бесконечно, даже после того, как значения в вершинах сходятся.
Есть ли какой-то механизм, который я упустил, который
вызывает остановку программы после конвергенции?
// aggregateMessages approach
// from: https://github.com/insidedctm/spark-graphx-in-action/blob/51e4c667b927466bd02a0a027ca36625b010e0d6/Chapter04/Listing4_10IteratedFurthestVertex.scala
def sendMsg(ec: EdgeContext[Int,String,Int]): Unit = {
ec.sendToDst(ec.srcAttr+1)
}
def mergeMsg(a: Int, b: Int): Int = {
math.max(a,b)
}
def propagateEdgeCount(g:Graph[Int,String])
:Graph[Int,String] = {
val verts =
g.aggregateMessages[Int](sendMsg, mergeMsg)
val g2 =
Graph(verts, g.edges)
val check =
g2.vertices.join(g.vertices).
map(x => x._2._1 – x._2._2).
reduce(_ + _)
// STOP CONDITION
// check here ensures stop if nothing changed (******)
if (check > 0)
propagateEdgeCount(g2)
else
g
}
// Pregel approach
val g = Pregel(myGraph.mapVertices((vid,vd) => 0), 0,
activeDirection = EdgeDirection.Out)(
(id:VertexId,vd:Int,a:Int) => math.max(vd,a),
(et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
(a:Int,b:Int) => math.max(a,b))
g.vertices.collect