Набор данных контрольных точек Spark Scala, показывающий .isCheckpointed = false после действия, но каталоги записаны - PullRequest
0 голосов
/ 02 января 2019

Кажется, есть несколько сообщений по этому вопросу, но ни одна из них не отвечает тому, что я понимаю.

Следующий код запускается на DataBricks:

spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed  

Добавлено улучшение сортов:

...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...

возвращается:

(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 []
 |  MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12 []
 |  ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12 []
 checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
 ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
 ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
 res53: Boolean = false

Вопрос 1:

ds.rdd.isCheckpointed или ds2.rdd.isCheckpointed оба возвращают False , хотя с подсчетом у меня не ленивая ситуация. Почему, когда, в частности, ../loc 7 и 10 пишутся с (частичными) файлами? Также мы видим, что ReliableCheckPoint!

Не вполне объяснил всю концепцию. Пытаюсь разобраться с этим.

Вопрос 2 - дополнительный вопрос:

Кеш действительно необходим или нет в последних версиях Spark 2.4? Новая ветка на ds, если она не кэшируется, вызовет ли она пересчет или это лучше? Кажется странным, что данные контрольных точек не будут использоваться, или мы можем сказать, что Spark действительно не знает, что лучше?

Из High Performance Spark у меня складывается смешанное впечатление, что проверка наведения не очень рекомендуется, но опять же, это так.

1 Ответ

0 голосов
/ 02 января 2019

TL; DR : Вы не проверяете объект, который фактически проверен:

ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
// Boolean = true

ds.rdd.isCheckpointed или ds2.rdd.isCheckpointed оба возвратаFalse

Это ожидаемое поведение.Указываемый контрольный объект - это не преобразованный RDD (который является результатом дополнительных преобразований, необходимых для преобразования во внешнее представление), на который вы ссылаетесь, а внутренний объект RDD (фактически, как вы видите выше, это даже не самый последнийвнутренний RDD, но его родитель).

Кроме того, в первом случае вы просто используете неправильный Dataset объект вообще - как объяснено в связанном ответе Dataset.checkpoint новый Dataset

, хотя с подсчетом у меня ленивая ситуация

Это не имеет особого смысла.По умолчанию checkpoint реализация равна eager, поэтому force оценивает .Даже если бы это было не так, Dataset.count - неправильный способ форсировать оценку.

Действительно ли необходим кэш или нет в последней версии

Как видно из связанного источника, Dataset.checkpoint использует RDD.checkpoint внутри, поэтому применяется то же правило.Однако вы уже выполняете отдельное действие для форсирования контрольной точки, поэтому дополнительное кэширование, особенно с учетом стоимости Dataset персистентности, может быть излишним.

Конечно, если вы сомневаетесь, вы могли бы рассмотреть бенчмаркинг в конкретномконтекст.

...