Искра нехватки памяти - PullRequest
       43

Искра нехватки памяти

0 голосов
/ 28 февраля 2019

Я бегу из своей искровой работы, где у меня есть два набора данных, и я присоединяюсь к ним на основе ключа и запускаю над ним функцию карты и сохраняю ее в S3.

Я получаю следующееисключение

19/02/27 22:44:46 ERROR TaskMemoryManager: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2cd0d451
java.io.IOException: No space left on device
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:252)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:133)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
    at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:206)
    at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:285)
    at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:117)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:794)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:755)
    at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:917)
    at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:953)
    at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File19/02/27

Это мой код

final Dataset<Row> planData = spark.read().json(planDataInput);

final KeyValueGroupedDataset<PlanKey, Row> groupedDataset = planData.groupByKey(new PlanRowGroupKeyFunction(),
        planKeyEncoder);

final Dataset<PlanEntity> planGroupedData = groupedDataset.mapGroups(new rowGroupMapper(), planEncoder);

final Dataset<T> inputEntity = spark.read()
        .schema(inputEncoder.schema())
        .json(inputPath)
        .as(inputEncoder);

final Dataset<Tuple2<T, PlanEntity>> joinedPlanData = inputEntity.joinWith(planGroupedData,
        planGroupedData.col("id").equalTo(inputEntity.col("id")), "left");



final Dataset<T> processed = joinedPlanData.map(compuationFunction, inputEncoder);

processed.write().json(output);

У меня работает кластер EMR размером 30 r4.8x и проверил ганглии, у нас достаточно памяти

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...