У меня есть приложение, которое обрабатывает записи в rdd и помещает их в кеш. Я поместил пару Spark Accumulators в свое приложение, чтобы отслеживать обработанные и неудачные записи. Эти статистические данные отправляются в statsD до закрытия приложения. Вот простой пример кода:
val sc: SparkContext = new SparkContext(conf)
val jdbcDF: DataFrame = sqlContext.read.format("jdbc").options(Map(...)).load().persist(StorageLevel.MEMORY_AND_DISK)
logger.info("Processing table with " + jdbcDF.count + " rows")
val processedRecords = sc.accumulator(0L, "processed records")
val erroredRecords = sc.accumulator(0L, "errored records")
jdbcDF.rdd.foreachPartition(iterator => {
processedRecords += iterator.length // Problematic line
val cache = getCacheInstanceFromBroadcast()
processPartition(iterator, cache, erroredRecords) // updates cache with iterator documents
}
submitStats(processedRecords, erroredRecords)
Я собрал и запустил его в своем кластере, и он, кажется, функционировал правильно, задание было помечено Spark как УСПЕХ. Я запросил статистику с использованием Grafana, и оба счета были точными.
Однако, когда я запросил свой кеш, Couchbase, ни один из документов не был там. Я просмотрел журналы как драйверов, так и исполнителей, чтобы выяснить, нет ли каких-либо ошибок, но я ничего не смог найти. Я думаю, что это некоторая проблема с памятью, но пары длинных аккумуляторов достаточно, чтобы вызвать проблему?
Мне удалось заставить этот код работать, закомментировав строку, которая увеличивает processingRecords - см. строку во фрагменте, отмеченную Problemati c line .
Кто-нибудь знает, почему комментирование этой строки решает проблему? Кроме того, почему Spark отказывается молча и не помечает задание как НЕИСПРАВНОСТЬ?