Почему при вызове нескольких groupBy с помощью pyspark вызывается файл «java.nio.channels.ClosedByInterruptExceptio»? - PullRequest
0 голосов
/ 01 июня 2018

Я запускаю задание pyspark (python 3.5, spark 2.1, java8) в режиме Yarn-Client с пограничного узла с помощью spark2-submit.Задание выполнено успешно, результирующий фрейм данных записан на HDFS и кажется правильным (мы еще не обнаружили ошибок с данными в таком фрейме).

Проблема в том, что я вижу много (6'000) сообщений об ОШИБКАХ и хотел бы понять, что не так, и если это повлияет или нет на окончательный фрейм данных.

Все сообщения об ОШИБКАХ выглядяткак этот:

18/06/01 14:08:36 INFO codegen.CodeGenerator: Code generated in 45.712788 ms

18/06/01 14:08:37 INFO executor.Executor: Finished task 33.0 in stage 34.0 (TID 2312). 4600 bytes result sent to driver

18/06/01 14:08:37 INFO executor.Executor: Finished task 117.0 in stage 34.0 (TID 2316). 3801 bytes result sent to driver

18/06/01 14:08:40 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 2512

18/06/01 14:08:40 INFO executor.Executor: Running task 190.1 in stage 34.0 (TID 2512)

18/06/01 14:08:40 INFO storage.ShuffleBlockFetcherIterator: Getting 28 non-empty blocks out of 193 blocks

18/06/01 14:08:40 INFO storage.ShuffleBlockFetcherIterator: Started 5 remote fetches in 1 ms

18/06/01 14:08:40 INFO executor.Executor: Executor is trying to kill task 190.1 in stage 34.0 (TID 2512)

18/06/01 14:08:40 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /...../yarn/nm/usercache/../appcache/application_xxxx/blockmgr-xxxx/temp_shuffle_xxxxx

java.nio.channels.ClosedByInterruptException

            at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
            at sun.nio.ch.FileChannelImpl.truncate(FileChannelImpl.java:372)

            at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)

            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:238)

            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)

            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

            at org.apache.spark.scheduler.Task.run(Task.scala:99)

            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

            at java.lang.Thread.run(Thread.java:748)

ОШИБКА запускается после некоторой инженерии фетров (select, groupby ..), и я вижу ОШИБКУ при добавлении этих строк:

df = (df.groupby('x','y')
        .agg(func.sum('x').alias('x_sum'))
        .groupby('y')
        .agg(func.mean('y').alias('py_sum_avg')))

Так что я предполагаюПерестановка данных запускается groupBy.

Сначала я подумал, что это проблема с памятью, поэтому я добавил гораздо больше памяти и служебной памяти для драйвера и исполнителя без реального успеха (это то, что вы можете найтив какой-то другой теме).В коде у меня есть другой groupBy, и кажется, что это вызывает некоторые проблемы на этом этапе.

Я также вижу, что это может быть связано с слишком большим количеством открытых файлов или если диск заполнен, но сообщения об ошибкахнемного отличается в этих 2 случаях.

Я новичок в pysaprk, поэтому я ищу совет для устранения такой проблемы.

Как я могу найти причину, по которой это называется java.nio.channels.ClosedByInterruptException?Я думаю, что это причина, по которой ERROR storage.DiskBlockObjectWriter.Это правильно ?Это срабатывает Executor: Executor is trying to kill task 190 Если это стандартный процесс для уничтожения некоторых задач, почему это вызывает ОШИБКИ?Могу ли я получить некоторую подсказку, посмотрев на интерфейс Sprak (я вижу, что некоторые задачи были убиты). Могу ли я получить больше информации из трассировки?

Как можно исправить эти проблемы?Любое предложение, как приступить к отладке таких вещей?Я не уверен, как приступить к устранению этой проблемы и где искать (память, проблема в коде pysaprk, проблема с настройкой кластера или моих параметров искры)

Я работаю над HadoopData Lake с Cloudera CDH 5.8.

1 Ответ

0 голосов
/ 06 июля 2018

Существует проблема с использованием spark.speculation в Spark 2.1, который я использую.Связанная ошибка вверх по течению - SPARK-19293.Трассировка стека исключений в моей ситуации немного отличается от трассировки в SPARK-19293.Установка

- conf spark.speculation = false

и ОШИБКА исчезли в моем тесте

...