В настоящее время я использую spark
для обработки документов. В моем распоряжении два сервера (innov1
и innov2
), и я использую yarn
в качестве менеджера ресурсов.
Первым шагом является сбор путей к файлам из базы данных, их фильтрация, перераспределение и сохранение их в RDD[String]
. Тем не менее, я не могу добиться честного распределения настойчивости между всеми исполнителями:
постоянная память RDD, занятая исполнителями
и это приводит к тому, что исполнители не выполняют тот же объем работы после этого:
Работа, выполняемая каждым исполнителем (здесь нет дела до «мертвых», это другая проблема)
И это происходит случайным образом, иногда это innov1
, который берет все персистенты, и тогда только исполнители на innov1
работают (но обычно это innov2
в целом). Прямо сейчас, каждый раз, когда два исполнителя находятся на innov1
, я просто убиваю работу, чтобы перезапустить, и я молюсь, чтобы они были на innov2
(что совершенно глупо и ломает цель использования искры).
То, что я пробовал до сих пор (и это не сработало):
- переводит драйвер в спящий режим за 60 секунд до загрузки из базы данных (может
innov1
занимает больше времени для пробуждения?)
- добавить
spark.scheduler.minRegisteredResourcesRatio=1.0
при отправке работы (та же идея, что и выше)
сохранится с репликацией x2 (идея из этой ссылки ), надеясь, что часть блока будет реплицирована на innov1
Обратите внимание, что для пункта 3 иногда сохранялась репликация на одном и том же исполнителе (что немного противоречит интуиции), или даже страннее, не реплицировалась вообще (innov2
не может связаться с innov1
?) .
Я открыт для любых предложений или ссылок на подобные проблемы, которые я бы пропустил.
Edit:
Я не могу поместить здесь код, так как это часть продукта моей компании. Я могу дать упрощенную версию, однако:
val rawHBaseRDD : RDD[(ImmutableBytesWritable, Result)] = sc
.newAPIHadoopRDD(...)
.map(x => (x._1, x._2)) // from doc of newAPIHadoopRDD
.repartition(200)
.persist(MEMORY_ONLY)
val pathsRDD: RDD[(String, String)] = rawHBaseRDD
.mapPartitions {
...
extract the key and the path from ImmutableBytesWritable and
Result.rawCells()
...
}
.filter(some cond)
.repartition(200)
.persist(MEMORY_ONLY)
Для обоих сохраняются, все включено innov2
. Возможно ли это потому, что данные только на innov2
? даже если это так, я бы предположил, что перераспределение помогает разделить строки между innov1
и innov2
, но здесь этого не происходит.