Почему задания Pyspark вымирают в середине процесса без какой-либо конкретной ошибки - PullRequest
1 голос
/ 25 октября 2019

Эксперты, я заметил одну странную вещь с одним из заданий Pyspark в рабочем режиме (работающим в режиме кластера YARN). После выполнения в течение часа + (около 65-75 минут) он просто вымирает, не выдавая какого-либо конкретного сообщения об ошибке. Мы анализировали журналы YARN уже около 2 недель, и в них нет особых ошибок, они просто умирают в середине при выполнении ETL-операций (чтение / запись таблицы кустов, выполнение простых карт, обрезка, лямбда-операции и т. Д.), А не каких-либоконкретный кусок кода, чтобы указать. Иногда повторный запуск исправляет это, иногда требуется больше чем один повторный запуск. Код оптимизирован, в spark-submit --conf есть все правильно оптимизированные параметры. Как мы упоминали ранее, он работает абсолютно идеально для 30 других приложений с очень хорошей статистикой производительности. Это все варианты, которые у нас есть -

spark-submit --conf spark.yarn.maxAppAttempts=1 --conf spark.sql.broadcastTimeout=36000 --conf spark.dynamicAllocation.executorIdleTimeout=1800 --conf spark.dynamicAllocation.minExecutors=8 --conf spark.dynamicAllocation.initialExecutors=8 --conf spark.dynamicAllocation.maxExecutors=32 --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.kryoserializer.buffer.max=512m --driver-memory 2G --executor-memory 8G --executor-cores 2 --deploy-mode cluster --master yarn

Мы хотим проверить, нужна ли какая-либо конфигурация привода для изменения этой проблемы? Или в режиме Spark Cluster есть некоторый автоматический тайм-аут, который можно увеличить? мы используем Spark 1.6 с Python 2.7

Ошибка выглядит следующим образом (есть несколько сообщений, в которых говорится -

ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM

Но это происходит сбой при обнаружении ошибки драйвера (происходит в конце)-

ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down

Вот журнал -

19/10/24 16:17:03 INFO compress.CodecPool: Got brand-new compressor [.gz]
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000323_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000323
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000323_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 323.0 in stage 152.0 (TID 27419). 2163 bytes result sent to driver
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000135_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000135
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000135_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 135.0 in stage 152.0 (TID 27387). 2163 bytes result sent to driver
19/10/24 16:18:04 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
19/10/24 16:18:04 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:18:04 INFO util.ShutdownHookManager: Shutdown hook called

19/10/24 16:21:12 INFO executor.Executor: Finished task 41.0 in stage 163.0 (TID 29954). 2210 bytes result sent to driver
19/10/24 16:21:12 INFO executor.Executor: Finished task 170.0 in stage 163.0 (TID 29986). 2210 bytes result sent to driver
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30047
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30079
19/10/24 16:21:13 INFO executor.Executor: Running task 10.0 in stage 165.0 (TID 30047)
19/10/24 16:21:13 INFO executor.Executor: Running task 42.0 in stage 165.0 (TID 30079)
19/10/24 16:21:13 INFO spark.MapOutputTrackerWorker: Updating epoch to 56 and clearing cache
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 210
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210_piece0 stored as bytes in memory (estimated size 29.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Reading broadcast variable 210 took 3 ms
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210 stored as values in memory (estimated size 83.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO executor.Executor: Finished task 10.0 in stage 165.0 (TID 30047). 931 bytes result sent to driver
19/10/24 16:21:13 INFO executor.Executor: Finished task 42.0 in stage 165.0 (TID 30079). 931 bytes result sent to driver
19/10/24 16:21:15 WARN executor.CoarseGrainedExecutorBackend: An unknown (rxxxxxx1.hadoop.com:XXXXX) driver disconnected.
19/10/24 16:21:15 ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down.
19/10/24 16:21:15 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:21:15 INFO util.ShutdownHookManager: Shutdown hook called

Спасибо, Сид

Ответы [ 2 ]

4 голосов
/ 26 октября 2019

Без какой-либо очевидной трассировки стека было бы неплохо подумать о проблеме с двух сторон: это либо проблема кода , либо проблема данных .

В любом случае вы должны начать с предоставления водителю достаточного объема памяти, чтобы исключить это как вероятную причину. Увеличивайте driver.memory и driver.memoryOverhead до тех пор, пока вы не диагностируете проблему.

Распространенные проблемы с кодом:

  1. Слишком большое количество преобразований приводит к тому, что происхождение становится слишком большим. Если на фрейме данных происходят какие-либо итеративные операции, то хорошей идеей будет усечь группу DAG, сделав checkpoint между ними. В Spark 2.x вы можете позвонить dataFrame.checkpoint() напрямую и не иметь доступа к RDD. Также ответ @ Sagar описывает, как это сделать для Spark 1.6

  2. Попытка транслировать слишком большие кадры данных. Это обычно приводит к исключению OOM, но иногда может привести к зависанию задания. Решение не вызывать broadcast, если вы явно делаете это. В противном случае проверьте, установлено ли для spark.sql.autoBroadcastJoinThreshold какое-либо пользовательское значение, и попробуйте уменьшить его или вообще отключить широковещательную рассылку (настройка -1).

  3. Недостаточное количество разделов может вызвать выполнение каждой задачи. запустить горячийСамый простой способ диагностировать это - проверить представление этапов в пользовательском интерфейсе Spark и увидеть размер данных, считываемых и записываемых для каждой задачи. В идеале это должно быть в диапазоне от 100 до 500 МБ. В противном случае увеличьте spark.sql.shuffle.partitions и spark.default.parallelism до более высоких значений, чем значение по умолчанию 200.

Распространенные проблемы с данными:

  1. Перекос данных. Так как ваша работа не справляется с определенной рабочей нагрузкой, это может привести к перекосу данных в конкретной работе. Диагностируйте это, проверив, что среднее время завершения задачи сравнимо с 75-процентным, что сравнимо с 90-процентным на представлении сцены в пользовательском интерфейсе Spark. Есть много способов исправить перекос данных, но я считаю, что лучше всего написать пользовательскую функцию соединения, которая объединяет ключи соединения перед соединением. Это разбивает перекошенный раздел на несколько меньших разделов за счет взрыва данных постоянного размера.

  2. Формат входного файла или количество файлов. Если ваш входной файл не разбит на разделы, и вы выполняете только узкие преобразования (те, которые не приводят к перемешиванию данных), тогда все ваши данные будут проходить через одного исполнителя и на самом деле не получат выгоду от настройки распределенного кластера. Выполните диагностику в интерфейсе Spark, проверив, сколько задач создается на каждом этапе конвейера. Это должно быть порядка вашего spark.default.parallelism значения. Если нет, то сделайте .repartition(<some value>) сразу после шага чтения данных перед любыми преобразованиями. Если формат файла CSV (не идеальный), убедитесь, что у вас отключено multiLine, если это не требуется в вашем конкретном случае, в противном случае это заставляет одного исполнителя прочитать весь файл CSV.

Счастливая отладка!

1 голос
/ 26 октября 2019

Ты нарушаешь родословную? Если нет, то проблема может быть в родословной. Можете ли вы попробовать разорвать линию между кодом где-нибудь и попробовать это.

#Spark 1.6 code
sc.setCheckpointDit('.')
#df is the original dataframe name you are performing transformations on
dfrdd = df.rdd
dfrdd.checkpoint()
df=sqlContext.createDataFrame(dfrdd)
print df.count()

Дайте мне знать, если это поможет.

...