Используя Spark2.4 Graphx, я попытался реализовать одну иерархическую проблему. Реализация дает ожидаемый результат, но требует много времени для полного набора данных. Я использовал 4 различных типа стратегии разделения, но производительность не могла быть улучшена. Во время этой реализации используется PREGEL API.
// создаем вершину RDD. первичный ключ, корень, путь
val verticesRDD = vertexDF.rdd
.map { x => (x.get(0), x.get(1), x.get(2)) }
.map { x => (MurmurHash3.stringHash(x._1.toString).toLong, (x._1.asInstanceOf[Any], x._2.asInstanceOf[Any], x._3.asInstanceOf[String])) }
// создать ребро RDD. Сверху вниз отношения
val edgesRDD = edgeDF.rdd.map { x => (x.get(0), x.get(1)) }
.map { x => Edge(MurmurHash3.stringHash(x._1.toString).toLong, MurmurHash3.stringHash(x._2.toString).toLong, "topdown") }
graph = Graph(verticesRDD, edgesRDD).partitionBy(EdgePartition2D).cache()
val pathSeperator = """/"""
val initialMsg = (0L, 0, 0.asInstanceOf[Any], List("dummy"), 0, 1)
val initialGraph = graph.mapVertices((id, v) => (id, 0, v._2, List(v._3), 0, v._3, 1, v._1))
val hrchyRDD = initialGraph.pregel(initialMsg,Int.MaxValue, EdgeDirection.Out)(setMsg,sendMsg,mergeMsg)
def setMsg(vertexId: VertexId, value: (Long, Int, Any, List[String], Int, String, Int, Any), message: (Long, Int, Any, List[String], Int, Int)): (Long, Int, Any, List[String], Int, String, Int, Any) = {
if (message._2 < 1) { //superstep 0 - initialize
(value._1, value._2 + 1, value._3, value._4, value._5, value._6, value._7, value._8)
} else if (message._5 == 1) { // set isCyclic
(value._1, value._2, value._3, value._4, message._5, value._6, value._7, value._8)
} else if (message._6 == 0) { // set isleaf
(value._1, value._2, value._3, value._4, value._5, value._6, message._6, value._8)
} else { // set new values
(message._1, value._2 + 1, message._3, value._6 :: message._4, value._5, value._6, value._7, value._8)
}
}
// send the value to vertices
def sendMsg(triplet: EdgeTriplet[(Long, Int, Any, List[String], Int, String, Int, Any), _]): Iterator[(VertexId, (Long, Int, Any, List[String], Int, Int))] = {
val sourceVertex = triplet.srcAttr
val destinationVertex = triplet.dstAttr
// check for icyclic
if (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1)
if (destinationVertex._5 == 0) { //set iscyclic
Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 1, sourceVertex._7)))
} else {
Iterator.empty
}
else {
if (sourceVertex._7 == 1) //is NOT leaf
{
Iterator((triplet.srcId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 0)))
} else { // set new values
Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 1)))
}
}
}
def mergeMsg(msg1: (Long, Int, Any, List[String], Int, Int), msg2: (Long, Int, Any, List[String], Int, Int)): (Long, Int, Any, List[String], Int, Int) = {
msg2
}
Нужна помощь для решения проблемы производительности.