Spark GraphX 【pregel】 количество итераций больше 3, что приводит к FULL G C
Ниже приведен конкретный c код реализации
Spark - 2.4. 5
def vprog(vid: VertexId, vdata: Set[(Int, ArrayBuffer[VertexId])], message: Set[(Int, ArrayBuffer[Long])]): Set[(Int, ArrayBuffer[VertexId])] = {
message.filter(_._1 > 0) ++ vdata
}
def addMaps(spmap1: Set[(Int, ArrayBuffer[Long])], spmap2: Set[(Int, ArrayBuffer[Long])]): Set[(Int, ArrayBuffer[Long])] = {
spmap1 ++ spmap2
}
def sendMsg(e: EdgeTriplet[Set[(Int, ArrayBuffer[Long])], _]): Iterator[(VertexId, Set[(Int, ArrayBuffer[VertexId])])] = {
if (e.srcAttr.isEmpty) {
Iterator.empty
}
else {
val newAttr = e.srcAttr
.filter(!_._2.contains(e.dstId))
.map(lineData => {
(lineData._2.length, lineData._2 :+ e.dstId)
})
Iterator((e.dstId, newAttr))
}
}
val graph = atomicGraph.mapVertices((vid, value) => Set((0, ArrayBuffer(vid))))
atomicGraph.unpersist(true)
val initializeMessage: Set[(Int, ArrayBuffer[Long])] = Set((0, ArrayBuffer()))
val resultGraph = graph.pregel(initializeMessage, totalDegree)(vprog, sendMsg, addMaps)
println(resultGraph.vertices.collect().mkString("\n"))
Программа потребляет много времени в G C
Количество вершин: 183
Количество ребер: 2000 +
Spark-submit:
spark-submit \
--master yarn \
--num-executors 5 \
--deploy-mode cluster \
--driver-memory 20g \
--executor-memory 20g \
--executor-cores 10 \
--driver-cores 10 \
--conf spark.driver.memoryOverhead=20g \
--conf spark.executor.memoryOverhead=30g \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \