Я запускаю задание pyspark (python 3.5, spark 2.1, java8) в режиме Yarn-Client с пограничного узла с помощью spark2-submit.Задание выполнено успешно, результирующий фрейм данных записан на HDFS и кажется правильным (мы еще не обнаружили ошибок с данными в таком фрейме).
Проблема в том, что я вижу много (6'000) сообщений об ОШИБКАХ и хотел бы понять, что не так, и если это повлияет или нет на окончательный фрейм данных.
Все сообщения об ОШИБКАХ выглядяткак этот:
18/06/01 14:08:36 INFO codegen.CodeGenerator: Code generated in 45.712788 ms
18/06/01 14:08:37 INFO executor.Executor: Finished task 33.0 in stage 34.0 (TID 2312). 4600 bytes result sent to driver
18/06/01 14:08:37 INFO executor.Executor: Finished task 117.0 in stage 34.0 (TID 2316). 3801 bytes result sent to driver
18/06/01 14:08:40 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 2512
18/06/01 14:08:40 INFO executor.Executor: Running task 190.1 in stage 34.0 (TID 2512)
18/06/01 14:08:40 INFO storage.ShuffleBlockFetcherIterator: Getting 28 non-empty blocks out of 193 blocks
18/06/01 14:08:40 INFO storage.ShuffleBlockFetcherIterator: Started 5 remote fetches in 1 ms
18/06/01 14:08:40 INFO executor.Executor: Executor is trying to kill task 190.1 in stage 34.0 (TID 2512)
18/06/01 14:08:40 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /...../yarn/nm/usercache/../appcache/application_xxxx/blockmgr-xxxx/temp_shuffle_xxxxx
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.FileChannelImpl.truncate(FileChannelImpl.java:372)
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:238)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
ОШИБКА запускается после некоторой инженерии фетров (select, groupby ..), и я вижу ОШИБКУ при добавлении этих строк:
df = (df.groupby('x','y')
.agg(func.sum('x').alias('x_sum'))
.groupby('y')
.agg(func.mean('y').alias('py_sum_avg')))
Так что я предполагаюПерестановка данных запускается groupBy.
Сначала я подумал, что это проблема с памятью, поэтому я добавил гораздо больше памяти и служебной памяти для драйвера и исполнителя без реального успеха (это то, что вы можете найтив какой-то другой теме).В коде у меня есть другой groupBy, и кажется, что это вызывает некоторые проблемы на этом этапе.
Я также вижу, что это может быть связано с слишком большим количеством открытых файлов или если диск заполнен, но сообщения об ошибкахнемного отличается в этих 2 случаях.
Я новичок в pysaprk, поэтому я ищу совет для устранения такой проблемы.
Как я могу найти причину, по которой это называется java.nio.channels.ClosedByInterruptException
?Я думаю, что это причина, по которой ERROR storage.DiskBlockObjectWriter
.Это правильно ?Это срабатывает Executor: Executor is trying to kill task 190
Если это стандартный процесс для уничтожения некоторых задач, почему это вызывает ОШИБКИ?Могу ли я получить некоторую подсказку, посмотрев на интерфейс Sprak (я вижу, что некоторые задачи были убиты). Могу ли я получить больше информации из трассировки?
Как можно исправить эти проблемы?Любое предложение, как приступить к отладке таких вещей?Я не уверен, как приступить к устранению этой проблемы и где искать (память, проблема в коде pysaprk, проблема с настройкой кластера или моих параметров искры)
Я работаю над HadoopData Lake с Cloudera CDH 5.8.