У нас есть проблема с порядком выполнения преобразований Спарка, который кажется произвольным.
У нас есть 2 RDD со связанными событиями, которые мы классифицируем.Множественные классификаторы применяются к rdd1, но для краткости я изображаю только ту часть, где возникает проблема.
Псевдокод похож на:
total_count = rdd1.size
// filter1
(organics, duplicates) = Deduplicate rdd1 events (classify events as organic and duplicates)
duplicates.persist(DISK_ONLY)
rdd1 = organics
if (specific_client) {
// filter2
Load rdd2 Events
Join rdd1 and rdd2
(organics, invalid) = Classify rdd1 events according to joined information from rdd2
invalid.persist(DISK_ONLY)
}
Check total_count == size(organics + invalid + duplicates) // here it fails
Проблема, с которой мы сталкиваемся, заключается в том, что вв некоторых случаях выполняется проверка размера, а в некоторых - нет.Рассматривая временную шкалу выполнения, мы заметили, что она завершается с ошибкой, когда фильтр1 выполняется после фильтр2.И когда это не удается, количество правых всегда больше начального total_count
.
Почему Spark планирует выполнение filter1 после filter2, когда код указывает, что выход filter1 является входом filter2?
Когда фильтр1 выполняется после фильтра2, почему подсчет всех частей возвращает больше элементов, чем загружено?(Я подозреваю, что вызов join)
Как ни странно, когда мы запускаем код локально на одном узле, он никогда не завершался ошибкой.Когда мы подключались к AWS EMR через оболочку Spark, это никогда не выходило из строя.Это не удавалось, только когда оно выполнялось как обычное задание Spark в кластере AWS EMR с 4 рабочими узлами.
Форумы предлагали нам каким-то образом заставить Spark вычислить filter1 перед filter2.Следующие вещи не работали:
- вызов
organics.persist(MEMORY_ONLY | DISK_ONLY)
после filter1 - вызов
organics.cache()
после filter1 - вызов
organics.count()
после filter1 - вызов
organics.checkpoint()
после filter1 - вызов
organics.persist(DISK_ONLY)
в конце фильтра2
Способ, которым мы сделали это, заключается в том, что после filter1 мы вызываем organics.persist(DISK_ONLY)
, а затем organics.count
.Однако мы не уверены, является ли это оптимальным решением, поскольку оно выглядит довольно жестоким и вызывает перетасовки данных.
Итак, подведем итог.
- Почему Spark планирует выполнение filter1 после filter2,когда код указывает, что выход filter1 является входом filter2?
- Почему он возвращает больше элементов, чем загружено при выполнении в обратном порядке?
- Является ли
persist
+ count
оптимальным решением для принудительного примененияпорядка преобразования?