Я работаю с Pregel API GraphX для реализации некоторых графовых алгоритмов. Мне нужно найти количество супершагов (итераций), выполненных Прегелем.
Я пробовал это с помощью переменной счетчика локально для вершинной программы, которую я увеличиваю каждый раз, когда вершина активируется, но это не дает правильное число, потому что не все вершины активируются на всех итерациях!
Это количество итераций доступно в исходном коде Pregel, но оно не публикуется c. Я пытался создать свой собственный класс Pregel, но он также не работал, потому что он использует метод, приватный для пакета spark, поэтому я не могу использовать его из своего собственного класса.
package org.apache.spark.graphx
import scala.reflect.ClassTag
import org.apache.spark.graphx.util.PeriodicGraphCheckpointer
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.util.PeriodicRDDCheckpointer
object Pregel extends Logging {
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
val checkpointInterval = graph.vertices.sparkContext.getConf
.getInt("spark.graphx.pregel.checkpointInterval", -1)
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
checkpointInterval, graph.vertices.sparkContext)
graphCheckpointer.update(g)
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
checkpointInterval, graph.vertices.sparkContext)
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog)
graphCheckpointer.update(g)
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messageCheckpointer.unpersistDataSet()
graphCheckpointer.deleteAllCheckpoints()
messageCheckpointer.deleteAllCheckpoints()
g
} // end of apply
}
Я хочу иметь доступ к переменной i
после того, как pregel закончит вычисления, или даже использовать другой лог c, чтобы вычислить его внутри моей вершинной программы, но я не знаю, с чего начать! Любые идеи будут так цениться.