GraphX на самом деле не имеет понятия о направлении, если вы не скажете ему использовать его.Если вы посмотрите на внутреннюю работу библиотеки ShortestPaths
, то увидите, что она использует Pregel
, а направление по умолчанию (EdgeDirection.Either
).Это означает, что для всех триплетов он добавит источник и назначение в активный набор.Однако, если вы укажете в функции sendMsg
Pregel
сохранение только srcId в активном наборе (как это происходит в ShortestPaths
lib), некоторые вершины (только с исходящими ребрами) не будут переоценены.
В любом случае, решение состоит в том, чтобы написать свой собственный объект / библиотеку Diameter, возможно, выглядящий так (в значительной степени основанный на ShortestPath
, так что, может быть, есть даже лучшие решения?)
object Diameter extends Serializable {
type SPMap = Map[VertexId, Int]
def makeMap(x: (VertexId, Int)*) = Map(x: _*)
def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
}
// Removed landmarks, since all paths have to be taken in consideration
def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Int = {
val spGraph = graph.mapVertices { (vid, _) => makeMap(vid -> 0) }
val initialMessage:SPMap = makeMap()
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
addMaps(attr, msg)
}
def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
// added the concept of updating the dstMap based on the srcMap + 1
val newSrcAttr = incrementMap(edge.dstAttr)
val newDstAttr = incrementMap(edge.srcAttr)
List(
if (edge.srcAttr != addMaps(newSrcAttr, edge.srcAttr)) Some((edge.srcId, newSrcAttr)) else None,
if (edge.dstAttr != addMaps(newDstAttr, edge.dstAttr)) Some((edge.dstId, newDstAttr)) else None
).flatten.toIterator
}
val pregel = Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
// each vertex will contain map with all shortest paths, so just get first
pregel.vertices.first()._2.values.max
}
}
val diameter = Diameter.run(graph)