У меня сложилось впечатление, что выполнение RDD и кэширование ленивы: а именно, если RDD кэшируется и используется только его часть, то механизм кэширования будет кэшировать только эту часть, а другая часть будет вычислена по требованию.
К сожалению, следующий эксперимент, похоже, указывает на обратное:
val acc = new LongAccumulator()
TestSC.register(acc)
val rdd = TestSC.parallelize(1 to 100, 16).map { v =>
acc add 1
v
}
rdd.persist()
val sliced = rdd
.mapPartitions { itr =>
itr.slice(0, 2)
}
sliced.count()
assert(acc.value == 32)
При его запуске выдается следующее исключение:
100 did not equal 32
ScalaTestFailureLocation:
Expected :32
Actual :100
Выдает весь СДР был вычислен вместо только первых 2 элементов в каждом разделе. В некоторых случаях это очень неэффективно (например, когда вам нужно определить, быстро ли опустошен СДР). В идеале диспетчер кэширования должен позволять инкрементно записывать и кэшировать буфер кэширования, существует ли эта функция? Если нет, что я должен сделать, чтобы это произошло? (желательно с использованием существующего механизма кэширования памяти и диска)
Большое спасибо за ваше мнение
ОБНОВЛЕНИЕ 1 Похоже, что Spark уже имеет 2 класса:
- ExternalAppendOnlyMap
- ExternalAppendOnlyUnsafeRowArray
, который поддерживает более детальное кэширование многих значений. Более того, они не полагаются на StorageLevel, а сами принимают решение, какое устройство хранения использовать. Я, однако, удивлен, что они не являются вариантами для кэширования RDD / Dataset напрямую, а не для совместной группы / join / streamOps или аккумуляторов.