В своей простейшей форме СДР является просто заполнителем цепных вычислений, которые могут быть произвольно запланированы для выполнения на любой машине:
val src = sc.parallelize(0 to 1000)
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/
Это поведение, очевидно, может быть переопределено путем создания любого из восходящих СДР. сохраняется, так что планировщик Spark минимизирует избыточные вычисления:
val src = sc.parallelize(0 to 1000)
src.persist()
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/
Теперь моя проблема:
Мне не нравится механизм кэширования vanilla (см. этот пост : В Apache Spark, могу ли я постепенно кэшировать раздел RDD? ) и написал свой собственный механизм кэширования (путем реализации нового RDD). Поскольку новый механизм кэширования способен только читать существующие значения с локального диска / памяти, при наличии нескольких исполнителей мой кэш для каждого раздела будет часто пропадать при каждом выполнении раздела в задаче на другом компьютере.
Итак, мой вопрос:
Как мне подражать c Постоянная реализация Spark RDD, чтобы попросить планировщик DAG принудительно включить / предложить планирование задач с учетом локальности? Без фактического вызова метода .persist()
, потому что он не нужен.