Я пытаюсь найти размер информационного кадра в заданиях потоковой передачи в каждой партии. Я могу найти размер в пакетных заданиях успешно, но когда дело доходит до потоковой передачи, я не могу это сделать.
Я разрабатывал приложение spark для блоков данных и пробовал df.queryExecution.optimizedPlan.stats.sizeInBytes в потоковом задании.
Но я получаю следующее исключение:
Запросы с потоковыми источниками должны выполняться с помощью writeStream.start () ;;
Я попытался поместить df.queryExecution.optimizedPlan.stats.sizeInBytes в функцию forEachBatch ():
data.writeStream.foreachBatch { (df: DataFrame, batchId: Long) =>
df.persist()
println("The size of the read is : " + df.queryExecution.optimizedPlan.stats.sizeInBytes)
}.start.option("checkpointLocation", outpath + "/_checkpoint")
Но это создаст новый поток, которого мы должны избегать из-за некоторых ограничений.
val data = spark.readStream
.format("kafka")
.option(....)
.load()
println("The size of the read is : " + data.queryExecution.optimizedPlan.stats.sizeInBytes)
Существует ли какой-либо хак или какой-либо вызов API, который возвращает размер кадра данных при потоковой передаче без использования forEachBatch () или без создания нового потока?