Я пытаюсь подсчитать количество граничных меток «comment» и «LikeBy» из источника на графике с использованием GraphX. На первом этапе все вершины активны, затем я управляю вычислением и отправкой сообщения, используя дополнительное значение, которое хранится в каждой вершине и обновляется с помощью сообщений. На первой итерации только соответствующая вершина вычисляет и отправляет сообщение. Второе значение, которое распространяется с сообщениями, является логическим, которое указывает, было ли используемое ребро помечено «comment» или «LikeBy».
val fstId = ... // Considered as given
var nbComment, nbLike = 0L
sn.mapVertices((_, v) => (fstId, v)).pregel((fstId, false))(vprog, sendMsg, mergeMsg)
def vprog(id: Long, value: (Long, String), merged_msg: (Long, Boolean)) = {
println("activate " + id)
if (merged_msg != (fstId, false))
println("received message: " + merged_msg )
if (merged_msg._1 == id || merged_msg._1 < 0)
if (merged_msg._2) nbLike += 1L
else nbComment += 1L
(merged_msg._1, value._2)
}
def sendMsg(triplet: EdgeTriplet[(Long, String), String]) = {
if (triplet.srcId == fstId || triplet.srcAttr._1 == -1L)
if (triplet.attr == "comment"){
println((-1L, false) + " has been sent to " + triplet.dstId )
Iterator((triplet.dstId, (-1L, false)))
}
else if (triplet.attr == "likedBy") {
println((-1L, true) + " has been sent to " + triplet.dstId )
Iterator((triplet.dstId, (-1L, true)))
}
Iterator.empty
}
def mergeMsg(m1: (Long, Boolean), m2: (Long, Boolean)) = m1
Теоретически вершина может получать только одно сообщение на итераций. Функция mergeMsg
просто определена с учетом подписи pregel
.
Моя проблема: сообщения отправляются правильно, но никогда не принимаются получателями, и я не знаю почему.