Почему получение сообщений не активирует мои вершины в GraphX? - PullRequest
0 голосов
/ 07 августа 2020

Я пытаюсь подсчитать количество граничных меток «comment» и «LikeBy» из источника на графике с использованием GraphX. На первом этапе все вершины активны, затем я управляю вычислением и отправкой сообщения, используя дополнительное значение, которое хранится в каждой вершине и обновляется с помощью сообщений. На первой итерации только соответствующая вершина вычисляет и отправляет сообщение. Второе значение, которое распространяется с сообщениями, является логическим, которое указывает, было ли используемое ребро помечено «comment» или «LikeBy».

val fstId = ... // Considered as given
var nbComment, nbLike = 0L

sn.mapVertices((_, v) =>  (fstId, v)).pregel((fstId, false))(vprog, sendMsg, mergeMsg)

def vprog(id: Long, value: (Long, String), merged_msg: (Long, Boolean)) = {
  println("activate " + id)
  if (merged_msg != (fstId, false))
    println("received message: " + merged_msg )

  if (merged_msg._1 == id || merged_msg._1 < 0)
    if (merged_msg._2) nbLike += 1L
    else nbComment += 1L
  (merged_msg._1, value._2)
}

def sendMsg(triplet: EdgeTriplet[(Long, String), String]) = {
  if (triplet.srcId == fstId || triplet.srcAttr._1 == -1L)
    if (triplet.attr == "comment"){
      println((-1L, false) + " has been sent to " + triplet.dstId )
      Iterator((triplet.dstId, (-1L, false)))
    }
    else if (triplet.attr == "likedBy") {
      println((-1L, true) + " has been sent to " + triplet.dstId )
      Iterator((triplet.dstId, (-1L, true)))
    }
  Iterator.empty
}

def mergeMsg(m1: (Long, Boolean), m2: (Long, Boolean)) = m1

Теоретически вершина может получать только одно сообщение на итераций. Функция mergeMsg просто определена с учетом подписи pregel.

Моя проблема: сообщения отправляются правильно, но никогда не принимаются получателями, и я не знаю почему.

...