Запутался в состоянии остановки в программе-примере Spark / Graphx / Pregel, чтобы найти «путь» - PullRequest
0 голосов
/ 22 мая 2019

'

Я пробираюсь через Graphx In ​​Action и эту книгу. (исходный код для которого здесь: https://github.com/insidedctm/spark-graphx-in-action) обсуждаются два способа расчета расстояния (количество переходов по ребрам) между корнем дерева и всеми узлами до листьев. Я понимаю примеры кода, которые предоставляются с использованием aggregateMessages. В частности, условие остановки имеет смысл (у меня есть выделил это условие через комментарий, который включает в себя текст «STOP CONDITION», ниже.) После того, как атрибуты на вершинах график перестает меняться, больше не имеет смысла продолжать работу алгоритма.

Я был немного озадачен, когда посмотрел на способ вычисления тот же результат (показан ниже.)

В частности, когда вызывается метод применения Прегеля, maxIterations по умолчанию используется Integer.MAX_VALUE (который для всех практических целей «работает вечно».) Следовательно, это похоже на функцию sendMsg:

               (et:EdgeTriplet[Int,String]) =>
                    Iterator((et.dstId, et.srcAttr+1)),

будет вызываться бесконечно, даже после того, как значения в вершинах сходятся.

Есть ли какой-то механизм, который я упустил, который вызывает остановку программы после конвергенции?

// aggregateMessages approach
// from: https://github.com/insidedctm/spark-graphx-in-action/blob/51e4c667b927466bd02a0a027ca36625b010e0d6/Chapter04/Listing4_10IteratedFurthestVertex.scala

def sendMsg(ec: EdgeContext[Int,String,Int]): Unit = {
  ec.sendToDst(ec.srcAttr+1)
}

def mergeMsg(a: Int, b: Int): Int = {
  math.max(a,b)
}

def propagateEdgeCount(g:Graph[Int,String])
 :Graph[Int,String] = {    
  val verts = 
        g.aggregateMessages[Int](sendMsg, mergeMsg)
  val g2 = 
        Graph(verts, g.edges)
  val check = 
        g2.vertices.join(g.vertices).
           map(x => x._2._1 – x._2._2).
           reduce(_ + _)

  // STOP CONDITION
  // check here ensures stop if nothing changed  (******)
  if (check > 0)            
    propagateEdgeCount(g2)
  else
    g
}

// Pregel approach

val g = Pregel(myGraph.mapVertices((vid,vd) => 0), 0,
               activeDirection = EdgeDirection.Out)(
               (id:VertexId,vd:Int,a:Int) => math.max(vd,a),
               (et:EdgeTriplet[Int,String]) =>
                    Iterator((et.dstId, et.srcAttr+1)),
               (a:Int,b:Int) => math.max(a,b))
g.vertices.collect
...