Несправедливое распределение нагрузки между искровыми исполнителями - PullRequest
0 голосов
/ 08 мая 2018

В настоящее время я использую spark для обработки документов. В моем распоряжении два сервера (innov1 и innov2), и я использую yarn в качестве менеджера ресурсов.

Первым шагом является сбор путей к файлам из базы данных, их фильтрация, перераспределение и сохранение их в RDD[String]. Тем не менее, я не могу добиться честного распределения настойчивости между всеми исполнителями:

постоянная память RDD, занятая исполнителями

и это приводит к тому, что исполнители не выполняют тот же объем работы после этого:

Работа, выполняемая каждым исполнителем (здесь нет дела до «мертвых», это другая проблема)

И это происходит случайным образом, иногда это innov1, который берет все персистенты, и тогда только исполнители на innov1 работают (но обычно это innov2 в целом). Прямо сейчас, каждый раз, когда два исполнителя находятся на innov1, я просто убиваю работу, чтобы перезапустить, и я молюсь, чтобы они были на innov2 (что совершенно глупо и ломает цель использования искры).

То, что я пробовал до сих пор (и это не сработало):

  1. переводит драйвер в спящий режим за 60 секунд до загрузки из базы данных (может innov1 занимает больше времени для пробуждения?)
  2. добавить spark.scheduler.minRegisteredResourcesRatio=1.0 при отправке работы (та же идея, что и выше)
  3. сохранится с репликацией x2 (идея из этой ссылки ), надеясь, что часть блока будет реплицирована на innov1

    Обратите внимание, что для пункта 3 иногда сохранялась репликация на одном и том же исполнителе (что немного противоречит интуиции), или даже страннее, не реплицировалась вообще (innov2 не может связаться с innov1?) .

Я открыт для любых предложений или ссылок на подобные проблемы, которые я бы пропустил.

Edit:

Я не могу поместить здесь код, так как это часть продукта моей компании. Я могу дать упрощенную версию, однако:

val rawHBaseRDD : RDD[(ImmutableBytesWritable, Result)] = sc
 .newAPIHadoopRDD(...)
 .map(x => (x._1, x._2)) // from doc of newAPIHadoopRDD
 .repartition(200)
 .persist(MEMORY_ONLY)

val pathsRDD: RDD[(String, String)] = rawHBaseRDD
 .mapPartitions {
  ...
  extract the key and the path from ImmutableBytesWritable and 
  Result.rawCells()
  ...
 }
 .filter(some cond)
 .repartition(200)
 .persist(MEMORY_ONLY)

Для обоих сохраняются, все включено innov2. Возможно ли это потому, что данные только на innov2? даже если это так, я бы предположил, что перераспределение помогает разделить строки между innov1 и innov2, но здесь этого не происходит.

Ответы [ 2 ]

0 голосов
/ 11 мая 2018

К сожалению (к счастью?) Проблема решилась сама собой сегодня. Я предполагаю, что это не связано с искрой, так как я не модифицировал код до разрешения.

Вероятно, это связано с полной перезагрузкой всех сервисов с помощью Ambari (даже если я не уверен на 100%, потому что я уже пробовал это раньше), поскольку это единственное «существенное» изменение, которое произошло сегодня.

0 голосов
/ 09 мая 2018

Ваш постоянный набор данных не очень большой - около 100 МБ, согласно вашему скриншоту. Вы выделили 10 ядер с 20 ГБ памяти, поэтому 100 МБ легко помещаются в память одного исполнителя, и это в основном то, что происходит.

Другими словами, вы выделили гораздо больше ресурсов, чем фактически необходимо, поэтому Spark просто случайным образом выбирает подмножество ресурсов, необходимых для выполнения задания. Иногда эти ресурсы оказываются на одном работнике, иногда на другом, а иногда используются ресурсы обоих работников.

Вы должны помнить, что для Spark не имеет значения, если все ресурсы размещены на одном компьютере или на 100 разных машинах - если вы не пытаетесь использовать больше ресурсов, чем доступно (в этом случае вы получите OOM).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...