В вашем примере все вершины связаны с направленным путем, поэтому каждая вершина должна иметь значение 4.
Но если вы удалите соединение 4-> 0 (id = 8) , то, конечно, будет другое число.
Поскольку ваша проблема основана на (рекурсивном) обходе графа параллельно, Graphx Pregel API , вероятно, является лучшим подходом.
Для вызова pregel требуется 3 функции
vprog
для инициализации каждой вершины сообщением (в вашем случае пустым List[VertexId]
)
sendMsg
шаг обновления, который применяется к каждой итерации (в вашем случае накапливается соседние VertexId
s и возвращается Iterator
с сообщениями для отправки на следующую итерацию
mergeMsg
для объединения двух сообщений (2 List[VertexId]
с в 1)
В коде это будет выглядеть так:
def vprog(id: VertexId, orig: List[VertexId], newly: List[VertexId]) : List[VertexId] = newly
def mergeMsg(a: List[VertexId], b: List[VertexId]) : List[VertexId] = (a ++ b).distinct
def sendMsg(trip: EdgeTriplet[List[VertexId],Double]) : Iterator[(VertexId, List[VertexId])] = {
val recursivelyConnectedNeighbors = (trip.dstId :: trip.dstAttr).filterNot(_ == trip.srcId)
if (trip.srcAttr.intersect(recursivelyConnectedNeighbors).length != recursivelyConnectedNeighbors.length)
Iterator((trip.srcId, recursivelyConnectedNeighbors))
else
Iterator.empty
}
val initList = List.empty[VertexId]
val result = graph
.mapVertices((_,_) => initList)
.pregel(
initialMsg = initList,
activeDirection = EdgeDirection.Out
)(vprog, sendMsg, mergeMsg)
.mapVertices((_, neighbors) => neighbors.length)
result.vertices.toDF("vertex", "value").show()
Выход:
+------+-----+
|vertex|value|
+------+-----+
| 0| 4|
| 1| 3|
| 2| 3|
| 3| 1|
| 4| 1|
+------+-----+
Обязательно поэкспериментируйте с spark.graphx.pregel.checkpointInterval
, если вы получаете обходные большие графики OoM (или настраиваете maxIterations
в pregel init)