Я выполняю задание на потоковую структурированную обработку, которое включает создание пустого информационного кадра и его обновление с использованием каждой микропакета, как показано ниже. С каждым выполнением микропакета число этапов увеличивается на 4. Чтобы избежать повторного вычисления, я сохраняю обновленный StaticDF в памяти после каждого обновления внутри l oop. Это помогает пропустить те дополнительные этапы, которые создаются с каждой новой микропакетой.
Мои вопросы -
1) Даже если общее количество завершенных этапов остается таким же, как увеличенные этапы всегда пропущено, но может ли это привести к проблемам с производительностью, поскольку в один момент времени на пропущенных этапах могут быть миллионы?
2) Что происходит, когда какая-то часть или весь кэшированный RDD недоступен? (ошибка узла / исполнителя). Документация Spark гласит, что до сих пор не материализованы все данные, полученные из нескольких микропакетов, поэтому это означает, что ему потребуется снова прочитать все события из Kafka для регенерации staticDF?
// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
.add("product_id", LongType)
.add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)
// Note : streamingDF was created from Kafka source
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.foreachBatch {
(micro_batch_DF: DataFrame) => {
// fetching max created_at for each product_id in current micro-batch
val staging_df = micro_batch_DF.groupBy("product_id")
.agg(max("created").alias("created"))
// Updating staticDF using current micro batch
staticDF = staticDF.unionByName(staging_df)
staticDF = staticDF
.withColumn("rnk",
row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
).filter("rnk = 1")
.drop("rnk")
.cache()
}