Аккумулятор Spark, вызывающий сбой приложения - PullRequest
0 голосов
/ 04 февраля 2020

У меня есть приложение, которое обрабатывает записи в rdd и помещает их в кеш. Я поместил пару Spark Accumulators в свое приложение, чтобы отслеживать обработанные и неудачные записи. Эти статистические данные отправляются в statsD до закрытия приложения. Вот простой пример кода:

val sc: SparkContext = new SparkContext(conf)
val jdbcDF: DataFrame = sqlContext.read.format("jdbc").options(Map(...)).load().persist(StorageLevel.MEMORY_AND_DISK)
logger.info("Processing table with " + jdbcDF.count + " rows")

val processedRecords = sc.accumulator(0L, "processed records")
val erroredRecords = sc.accumulator(0L, "errored records")

jdbcDF.rdd.foreachPartition(iterator => {
    processedRecords += iterator.length // Problematic line
    val cache = getCacheInstanceFromBroadcast()
    processPartition(iterator, cache, erroredRecords) // updates cache with iterator documents
}

submitStats(processedRecords, erroredRecords)

Я собрал и запустил его в своем кластере, и он, кажется, функционировал правильно, задание было помечено Spark как УСПЕХ. Я запросил статистику с использованием Grafana, и оба счета были точными.

Однако, когда я запросил свой кеш, Couchbase, ни один из документов не был там. Я просмотрел журналы как драйверов, так и исполнителей, чтобы выяснить, нет ли каких-либо ошибок, но я ничего не смог найти. Я думаю, что это некоторая проблема с памятью, но пары длинных аккумуляторов достаточно, чтобы вызвать проблему?

Мне удалось заставить этот код работать, закомментировав строку, которая увеличивает processingRecords - см. строку во фрагменте, отмеченную Problemati c line .

Кто-нибудь знает, почему комментирование этой строки решает проблему? Кроме того, почему Spark отказывается молча и не помечает задание как НЕИСПРАВНОСТЬ?

1 Ответ

1 голос
/ 04 февраля 2020

Приложение само по себе не «перестает работать». Основная проблема в том, что итераторы могут быть «повторены» только один раз.

Вызов iterator.length фактически проходит и исчерпывает итератор. Таким образом, когда processPartition получает iterator, итератор уже исчерпан и выглядит пустым (поэтому записи не будут обрабатываться).

Ссылка Scala документы для подтверждения того, что size - это «количество возвращаемых им элементов. Примечание: после этой операции он будет в конце!» - вы также можете просмотреть исходный код, чтобы подтвердить это.

Обходное решение

Если переписать processPartition, чтобы вернуть длинное значение, которое может быть передано в аккумулятор. Кроме того, sc.accumulator устарела в последних версиях Spark. Обходной путь может выглядеть примерно так:

val acc = sc.longAccumulator("total processed records")
...
df.rdd.foreachPartition(iterator => {
    val cache = getCacheInstanceFromBroadcast()
    acc.add(processPartition(iterator, cache, erroredRecords))
})
...
// do something else
...