Приложение My Spark считывает набор данных из базы данных JDBC, перепечатывает его с преобразованиями и сохраняет его для использования в вычислениях ветвлений.
Приложение выполняется в кластере YARN с 3 исполнителями.Я обнаружил, что мой кэшированный набор данных создается из базы данных 3 раза (для каждого исполнителя).Spark не использует кэшированный RDD.
Я хочу устранить избыточные чтения с помощью кэша.
Я создал пример Hello World, чтобы воспроизвести проблему:
object UseAnotherExecutorCache {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(getClass.getSimpleName)
val sc = new SparkContext(conf)
val rddCached = sc.parallelize(Seq(1, 2, 3), numSlices = 1)
.map(number => {
println(s"====== Process number $number ======")
number * 2
})
.persist(StorageLevel.MEMORY_AND_DISK)
assert(rddCached.count == 3)
assert(rddCached.sum == 12)
}
}
Команда отправки:
spark-submit --master yarn --deploy-mode cluster --num-executors 2 .....
Я вижу в журналах исполнителя, что операции распараллеливания и сопоставления выполняются на обоих исполнителях.Я ожидаю, что второй исполнитель будет использовать кэшированный RDD из первого.
Верны ли мои ожидания?Разделяет ли Spark кэшированный RDD с другими исполнителями?