Как получить количество итераций алгоритма Pregel GraphX - PullRequest
0 голосов
/ 19 апреля 2020

Я работаю с Pregel API GraphX ​​для реализации некоторых графовых алгоритмов. Мне нужно найти количество супершагов (итераций), выполненных Прегелем.

Я пробовал это с помощью переменной счетчика локально для вершинной программы, которую я увеличиваю каждый раз, когда вершина активируется, но это не дает правильное число, потому что не все вершины активируются на всех итерациях!

Это количество итераций доступно в исходном коде Pregel, но оно не публикуется c. Я пытался создать свой собственный класс Pregel, но он также не работал, потому что он использует метод, приватный для пакета spark, поэтому я не могу использовать его из своего собственного класса.

package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.graphx.util.PeriodicGraphCheckpointer
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.util.PeriodicRDDCheckpointer

object Pregel extends Logging {    

  def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
           (graph: Graph[VD, ED],
            initialMsg: A,
            maxIterations: Int = Int.MaxValue,
            activeDirection: EdgeDirection = EdgeDirection.Either)
           (vprog: (VertexId, VD, A) => VD,
            sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
            mergeMsg: (A, A) => A)
          : Graph[VD, ED] =
        {
          require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
            s" but got ${maxIterations}")

          val checkpointInterval = graph.vertices.sparkContext.getConf
            .getInt("spark.graphx.pregel.checkpointInterval", -1)
          var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
          val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
            checkpointInterval, graph.vertices.sparkContext)
          graphCheckpointer.update(g)

          // compute the messages
          var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
          val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
            checkpointInterval, graph.vertices.sparkContext)
          messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
          var activeMessages = messages.count()

          // Loop
          var prevG: Graph[VD, ED] = null
          var i = 0
          while (activeMessages > 0 && i < maxIterations) {
            // Receive the messages and update the vertices.
            prevG = g
            g = g.joinVertices(messages)(vprog)
            graphCheckpointer.update(g)

            val oldMessages = messages
            // Send new messages, skipping edges where neither side received a message. We must cache
            // messages so it can be materialized on the next line, allowing us to uncache the previous
            // iteration.


            messages = GraphXUtils.mapReduceTriplets(
              g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
            // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
            // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
            // and the vertices of g).
            messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
            activeMessages = messages.count()

            logInfo("Pregel finished iteration " + i)

            // Unpersist the RDDs hidden by newly-materialized RDDs
            oldMessages.unpersist(blocking = false)
            prevG.unpersistVertices(blocking = false)
            prevG.edges.unpersist(blocking = false)
            // count the iteration
            i += 1
          }
          messageCheckpointer.unpersistDataSet()
          graphCheckpointer.deleteAllCheckpoints()
          messageCheckpointer.deleteAllCheckpoints()
          g
        } // end of apply
}

Я хочу иметь доступ к переменной i после того, как pregel закончит вычисления, или даже использовать другой лог c, чтобы вычислить его внутри моей вершинной программы, но я не знаю, с чего начать! Любые идеи будут так цениться.

...