Spark Graphx java.lang.OutOfMemoryError - PullRequest
0 голосов
/ 12 ноября 2018

У меня проблема с модулем Spark GraphX. У меня кластер из 5 узлов, с 23,5 ГБ памяти и 24 ядрами на узел. Я использую spark-shell для отправки своего кода, поэтому я использую Spark в режиме клиента. В моей конфигурации у меня есть 1 главный узел и 4 подчиненных узла. Это мой spark-defaults.conf:

spark.executor.instances                8
spark.executor.memory                   10g
spark.driver.memory                     18g
spark.executor.cores                    10
spark.driver.cores                      18
spark.default.parallelism               144
spark.serializer                        org.apache.spark.serializer.KryoSerializer

Я читаю и храню 2 действительно маленьких файла с общим размером 40 МБ.

Это мой код:

val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))

val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))

val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)

var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
  mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
  triplet => {
    if(triplet.dstId < 0){
      triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
    }
  },
  (oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
  triplet => {
    if(triplet.dstId > 0){
      triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
    }
  },
  (oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
  (vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})


val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show

Если я выполняю вычисления, я получаю через несколько раз исключение java.lang.OutOfMemoryError: Java heap space от одного исполнителя. После этого Spark пытается вычислить его снова, поэтому он перезапускает этапы, но снова попадает в то же исключение. Почему это происходит? Вычисления заполняют всю память исполнителя 10G. У меня неправильная конфигурация Spark? Я попробовал несколько перестановок моего spark-defaults.conf. Я пробовал 3 Executor на узел и более, я изменил размер памяти и так далее. Но каждый раз это заканчивается одним и тем же исключением.

Может быть, у кого-то есть идея для этой проблемы?

С уважением

Седир Мухаммед

1 Ответ

0 голосов
/ 13 ноября 2018

spark.executor.instances 8

spark.executor.cores 10

val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4) ^^^

Нет смысла иметь 8 исполнителей с 10 ядрами в каждом, если вы делаете только 4 раздела.Имейте в виду, что все разделы на исполнителе вместе должны помещаться в памяти, чтобы избежать перебора GC.Попробуйте использовать большее количество разделов, чтобы 10 разделов легко помещались в памяти, например, несколько сотен МБ входных данных на раздел.Кроме того, убедитесь, что 10 ГБ ОЗУ фактически доступны на каждом рабочем узле, а 16 ГБ - на компьютере, на котором выполняется драйвер. Если на некоторых ваших сотрудниках нет ОЗУ, вы можете уменьшить количество ядер и количествопамяти в конфигурации Spark.

...