Я пытаюсь реализовать как direct Dijkstra, так и его обратную версию (которая находит самый длинный путь вместо самых коротких), но у меня возникли некоторые проблемы, потому что яполучение бесконечных расстояний для не отключенных узлов в взвешенном неориентированном графе (и нулевые расстояния в обратной версии).
До сих пор я доверял и модифицировал эту реализацию, которую нашел в сети: [http://note.yuhc.me/2015/03/graphx-pregel-shortest-path/]
Мои реализации для обеих функций следующие:
Прямая Дейкстра:
// Implementation of Dijkstra algorithm using Pregel API
def computeMinDistance(u: VertexId, k1: VertexId): Double = {
val g: Graph[(Double, VertexId), Double] = this.uncertainGraph.mapVertices((id, _) =>
if (id == u) (0.0, id) else (Double.PositiveInfinity, id)
)
println("Computing Digkstra distance info fof id: " + u.toString)
val sssp: Graph[(Double, VertexId), Double] = g.pregel[(Double, VertexId)]((Double.PositiveInfinity, Long.MaxValue), Int.MaxValue, EdgeDirection.Either)(
(id, dist, newDist) => {
if(dist._1 < newDist._1) {
(dist._1, id)
} else {
(newDist._1, id)
}
},
triplet => { // Send Message
if (triplet.srcAttr._1 + triplet.attr < triplet.dstAttr._1) {
println("triplet.srcAttr._1 = " + triplet.srcAttr._1 .toString)
println("triplet.dstAttr._1 = " + triplet.dstAttr._1 .toString)
Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr, triplet.srcId)))
} else {
Iterator.empty
}
},
(a, b) => (math.min(a._1, b._1), a._2) // Merge Message
)
sssp.vertices.take(20).foreach(println(_))
sssp.vertices.filter(element => element._1 == k1).map(element => element._2._1).collect()(0)
}
Обратная Дейкстра:
def computeMaxDistance(node: VertexId, center: VertexId): Double = {
val g: Graph[(Double, VertexId), Double] = this.uncertainGraph.mapVertices((id, _) =>
if (id != node) (0.0, id) else (Double.PositiveInfinity, id)
)
val sslp: Graph[(Double, VertexId), Double] = g.pregel[(Double, VertexId)]((Double.PositiveInfinity, Long.MaxValue), Int.MaxValue, EdgeDirection.Either)(
(id, dist, newDist) => {
if(dist._1 > newDist._1) {
(dist._1, id)
} else {
(newDist._1, id)
}
},
triplet => { // Send Message
if (triplet.srcAttr._1 + triplet.attr > triplet.dstAttr._1) {
println("triplet.srcAttr._1 = " + triplet.srcAttr._1 .toString)
println("triplet.dstAttr._1 = " + triplet.dstAttr._1 .toString)
Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr, triplet.srcId)))
} else {
Iterator.empty
}
},
(a, b) => (math.max(a._1, b._1), a._2) // Merge Message
)
sslp.vertices.take(20).foreach(println(_))
sslp.vertices.filter(element => element._1 == center).map(element => element._2._1).collect()(0)
}
Любаяпомощь высоко ценится.Я не очень опытен со Scala и SparkЗаранее спасибо.