Сбой задания Pyspark в Google Cloud Dataproc - PullRequest
0 голосов
/ 04 июня 2018

Я создал кластер Dataproc с 1 мастером и 10 узлами.Все они имеют одинаковую конфигурацию процессора и памяти: 32 vCPU, 120 ГБ памяти.Когда я представил работу, которая обрабатывает большой объем данных и расчетов.Работа не удалась.

Из журнала я не совсем уверен, что вызвало сбой.Но я видел сообщение об ошибке, связанной с памятью, от tJob #: job-c46fc848-6: Контейнер уничтожен YARN за превышение пределов памяти.Используется 24,1 ГБ из 24 ГБ физической памяти.Подумайте над улучшением spark.yarn.executor.memoryOverhead.

Поэтому я попробовал несколько решений, которые я нашел из других постов.Например, я пытался увеличить spark.executor.memoryOverhead и spark.driver.maxResultSize в разделе «Свойства» при отправке задания из консоли «Задания».Задание # find-duplicate-job-c46fc848-7 все еще не выполнено.

Я также видел предупреждающие сообщения и не совсем уверен, что это значит: 18/06/04 17:13:25 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_43_155!

Я попытаюсь создать кластер более высокого уровня, чтобы посмотреть, работает ли он.Но я сомневаюсь, что это решит проблему, так как кластер с 1 главным и 10 узлами с 32 vCPU, 120 ГБ памяти уже очень мощный.

Надеемся получить помощь от опытных пользователей и экспертов.Заранее спасибо!

1 Ответ

0 голосов
/ 28 июня 2018

Основная причина сбоя была связана с памятью, вызванной само перекрестным соединением.Это все еще терпело неудачу, даже когда я продолжал увеличивать мощность процессора и память.Таким образом, решение этой проблемы состоит в следующем:

  1. Используйте функцию repartition () для перераспределения после объединения перед следующим преобразованием.Это исправит проблему с перекосом данных.Пример: df_joined = df_joined.repartition (partitions)
  2. Трансляция правой таблицы.
  3. Разбейте его на 10 итераций.В каждой итерации я обрабатываю только 1/10 левой таблицы, объединенной с полными данными правой таблицы.

См. Пример кода:

groups = 10 <br/>
for x in range(0, groups): 
  df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 

При объединении трех вышеуказанных методов я смог завершить работу за 1,5 часа и использовал только 1 главный и 4 рабочих узла (8 ЦП и 30 ГБ для каждого виртуального компьютера).

...