Ошибка задания кластера блоков данных из памяти - PullRequest
0 голосов
/ 23 марта 2020

Мой кластер spe c выглядит следующим образом: 1 головной узел с 140 гигабайтами памяти, 20 ЦПУ и 2 подчиненных узла, а также 140 гигабайт памяти и 20 ЦПУ.

Конфигурация Spark установлена ​​следующим образом:

spark_session = SparkSession \
        .builder \
        .appName("BatchScore") \
        .config('spark.executor.memory', '128g') \
        .config('spark.executor.cores', '4') \
        .config('spark.cores.max', '8') \
        .config('spark.driver.memory', '128g') \
        .getOrCreate() \
        .sparkContext
sql_context = SQLContext(spark_session)

Я получаю следующую трассировку стека:

java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$16.apply(TaskSetManager.scala:553)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$16.apply(TaskSetManager.scala:533)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:533)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:380)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:375)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$14.apply(TaskSchedulerImpl.scala:477)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$14.apply(TaskSchedulerImpl.scala:474)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:474)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:461)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:461)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$2.apply(CoarseGrainedSchedulerBackend.scala:253)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$2.apply(CoarseGrainedSchedulerBackend.scala:245)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:709)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:245)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:140)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:226)
        at java.lang.Thread.run(Thread.java:748)

Когда я проверяю использование кластера, память не даже 60% использовали через работу.

Пожалуйста, помогите.

Я проследил проблему до того места, где я записал прогнозы в файл паркета, используя scores.write.parquet, но я пытаюсь coalesce(1) получить все результаты в один файл, который вызывает ООМ. Любые идеи о том, как получить один выходной файл без coalesce (1) или repartition (1) ??

...