Обрабатывать большой набор иерархических данных с помощью PREGEL API очень плохо - PullRequest
0 голосов
/ 18 июня 2019

Используя Spark2.4 Graphx, я попытался реализовать одну иерархическую проблему. Реализация дает ожидаемый результат, но требует много времени для полного набора данных. Я использовал 4 различных типа стратегии разделения, но производительность не могла быть улучшена. Во время этой реализации используется PREGEL API.

// создаем вершину RDD. первичный ключ, корень, путь

val verticesRDD = vertexDF.rdd
  .map { x => (x.get(0), x.get(1), x.get(2)) }
  .map { x => (MurmurHash3.stringHash(x._1.toString).toLong, (x._1.asInstanceOf[Any], x._2.asInstanceOf[Any], x._3.asInstanceOf[String])) }

// создать ребро RDD. Сверху вниз отношения

val edgesRDD = edgeDF.rdd.map { x => (x.get(0), x.get(1)) }
      .map { x => Edge(MurmurHash3.stringHash(x._1.toString).toLong, MurmurHash3.stringHash(x._2.toString).toLong, "topdown") }  
graph = Graph(verticesRDD, edgesRDD).partitionBy(EdgePartition2D).cache()
val pathSeperator = """/"""
val initialMsg = (0L, 0, 0.asInstanceOf[Any], List("dummy"), 0, 1)
val initialGraph = graph.mapVertices((id, v) => (id, 0, v._2, List(v._3), 0, v._3, 1, v._1))
val hrchyRDD = initialGraph.pregel(initialMsg,Int.MaxValue,     EdgeDirection.Out)(setMsg,sendMsg,mergeMsg)   

def setMsg(vertexId: VertexId, value: (Long, Int, Any, List[String], Int, String, Int, Any), message: (Long, Int, Any, List[String], Int, Int)): (Long, Int, Any, List[String], Int, String, Int, Any) = {
    if (message._2 < 1) { //superstep 0 - initialize
      (value._1, value._2 + 1, value._3, value._4, value._5, value._6, value._7, value._8)
    } else if (message._5 == 1) { // set isCyclic
      (value._1, value._2, value._3, value._4, message._5, value._6, value._7, value._8)
    } else if (message._6 == 0) { // set isleaf
      (value._1, value._2, value._3, value._4, value._5, value._6, message._6, value._8)
    } else { // set new values
      (message._1, value._2 + 1, message._3, value._6 :: message._4, value._5, value._6, value._7, value._8)
    }
  }

  // send the value to vertices
  def sendMsg(triplet: EdgeTriplet[(Long, Int, Any, List[String], Int, String, Int, Any), _]): Iterator[(VertexId, (Long, Int, Any, List[String], Int, Int))] = {
    val sourceVertex = triplet.srcAttr
    val destinationVertex = triplet.dstAttr
    // check for icyclic
    if (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1)
      if (destinationVertex._5 == 0) { //set iscyclic
        Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 1, sourceVertex._7)))
      } else {
        Iterator.empty
      }
    else {
      if (sourceVertex._7 == 1) //is NOT leaf
      {
        Iterator((triplet.srcId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 0)))
      } else { // set new values
        Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 1)))
      }
    }
  }

  def mergeMsg(msg1: (Long, Int, Any, List[String], Int, Int), msg2: (Long, Int, Any, List[String], Int, Int)): (Long, Int, Any, List[String], Int, Int) = {
    msg2
  }

Нужна помощь для решения проблемы производительности.

...