Я использую структурированные потоковые записи загрузки из Redis, код ниже
val image = spark.readStream.format("redis").schema(...).load()
val q = image.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
logger.info(batchDF.count()) // INFO: 4 (or other number)
logger.info(batchDF.count()) // INFO: 0
logger.info(batchDF.count()) // INFO: 0
}
}.start()
q.awaitTermination()
Как показано выше, первый logger.info
, кажется, получает правильный счет batchDF
, но из второго он0.
Что важнее, batchDF
пошел после первого count()
?