Почему я получаю ошибку несоответствия типов в Pregel в GraphX? - PullRequest
1 голос
/ 05 августа 2020

Я работаю над графиком с помощью GraphX. Я хочу использовать Pregel API для параллельных операций с графиком. Структура графика, над которым я работаю, выглядит следующим образом:

(1,(4,0.08))
(2,(9,0.9))
(3,(3,0.01))
(4,(1, 0.31))
...

Ключом является NodeID, а свойство состоит из label, равного vertexid, и оценки метки, равной Double.

Я хочу реализовать алгоритм таким образом, чтобы каждый узел отправлял свое свойство своим соседям, а каждый сосед получал сообщения и в это время просто сохранял свойство, что его оценка является наивысшей. Например, сообщения, которые отправляются на узел, такие как: (4,0.96), (8,0.1), (15,0.8), .... первое число - это метка, а второе число - оценка. этого лейбла. в этом случае будет выбрана метка 4, которая имеет оценку 0,96, потому что она имеет наивысший балл на тот момент. В конце алгоритма каждый узел имеет список, в котором хранятся метки с наивысшими оценками на каждой итерации.

Ожидаемая окончательная структура аналогична этой структуре:

(1,List((2,0.49),(8,0.9),(13,0.79)))
(2,List((11,0.89),(6,0.68),(13,0.79),(10,0.57)))
(3,List((20,0.0.8),(1,0.66)))
...

Структура выше означает, что узел 1 получил три метки: 2,8 и 13.

Я пытаюсь использовать Pregel для своего алгоритма, но столкнулся с некоторыми проблемами с несоответствием типов. Может ли кто-нибудь помочь мне с реализацией кода с помощью Pregel? Я буду очень благодарен.

Это код, который я пытаюсь написать, но не могу его завершить sh его!

      def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VD, ED] = {

      val temp_graph: Graph[(VertexId, Double), ED] = graph.mapVertices((x, y) => (x,1.toDouble))

      def sendMessage(e: EdgeTriplet[(VertexId, Double), ED]): Iterator[(VertexId, (VertexId, Double))] = {
        Iterator((e.srcId, e.dstAttr), (e.dstId, e.srcAttr))
      }

      def mergeMessage(count1: Map[VertexId, Double], count2: Map[VertexId, Double]): mutable.Map[VertexId, Double] = {

        val map = mutable.Map[VertexId, Double]()
        (count1.keySet ++ count2.keySet).foreach { i =>
          val count1Val = count1.getOrElse(i, 0D)
          val count2Val = count2.getOrElse(i, 0D)
          map.put(i, count1Val + count2Val)
        }
        map
      }

      def vertexProgram(vid: VertexId, attr: (VertexId, Double), message: Map[VertexId, Double]): (VertexId, Double) = {

        if (message.isEmpty) attr
        else message.maxBy(_._2) 
        // I have some problem here in adding the the label with maximum score to label list of node

      }

      val initialMessage = Map[VertexId, Double]()
      Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
        vprog = vertexProgram,
        sendMsg = sendMessage,
        mergeMsg = mergeMessage)
    }

Это ошибка, которую я получаю:

введите описание изображения здесь

...