Когда я пытался реализовать алгоритм в Graphx с помощью Scala, я не находил возможным активировать все вершины в следующей итерации. Как я могу отправить сообщение всем моим вершинам графа?В моем алгоритме есть несколько супершагов, которые должны быть выполнены всеми вершинами (независимо от того, получают они сообщение или нет, потому что даже не получение сообщения - это событие, которое должно быть обработано в следующей итерации).
Я привожу здесь официальный код алгоритма SSSP, реализованного в логике Прегеля. Вы можете видеть, что только вершины, получившие сообщение, будут выполнять свою программу на следующей итерации, но в моем случае я хочу, чтобы функция pregel выполнялась итеративно, то есть каждый супершагвершины выполняют свои программы, и они могут голосовать, чтобы остановить, если это необходимо !!Рассуждения в этом примере не похожи на логику Прегеля.Пожалуйста, есть идеи, как реализовать настоящую логику Прегеля?
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
}