Почему я не могу правильно запустить программу Pregel в Spark GraphX? - PullRequest
0 голосов
/ 15 марта 2020

Я рассмотрел исходный код алгоритмов распространения Label из inte rnet, и я использовал в одной части моего пользовательского алгоритма. код, который я использовал, выглядит следующим образом:


    def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, ED] = {
      require(maxSteps > 0, s"Maximum of steps must be greater than 0, but got ${maxSteps}")

      val lpaGraph: Graph[VD, ED] = graph.mapVertices { case (y,x) => x}
      def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, Map[VertexId, Long])] = {
        Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))
      }
      def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
      : Map[VertexId, Long] = {
        // Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
        val map = mutable.Map[VertexId, Long]()
        (count1.keySet ++ count2.keySet).map { i =>
          val count1Val = count1.getOrElse(i, 0L)
          val count2Val = count2.getOrElse(i, 0L)
          map.put(i, count1Val + count2Val)
        }
        map
      }
      def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
        if (message.isEmpty) attr else message.maxBy(_._2)._1
      }
      val initialMessage = Map[VertexId, Long]()
      Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)(
        vprog = vertexProgram,
        sendMsg = sendMessage,
        mergeMsg = mergeMessage)
    }

в исходном исходном коде в начале каждого узла получает свой Id в качестве начальной метки (свойства), такой же, как этот код:

val lpaGraph: Graph[VD, ED] = graph.mapVertices { case (y,x) => y}

но в моем собственном алгоритме я хочу, чтобы узлы имели свою текущую метку (свойство) в качестве своей начальной метки, используя этот код:

val lpaGraph: Graph[VD, ED] = graph.mapVertices { case (y,x) => x}

, но когда я использую приведенный выше код, я получаю некоторые ошибки, например, ошибку в vprog = vertexProgram и ошибка - несоответствие типов:

Error:(176, 17) type mismatch;
 found   : org.apache.spark.graphx.VertexId (which expands to)  Long
 required: VD
    vprog = vertexProgram,

Может ли кто-нибудь помочь мне с этой проблемой ?? я использовал GraphLoader для загрузки списка ребер, чтобы сделать график, и СДР выглядит следующим образом:

(1,2)
(2,2)
(3,8) 
...
...