Пример использования следующий. Данные кафки загружаются на основе структурированной потоковой передачи искры. Мы надеемся, что данные для каждой микропакции каждые 10 секунд можно будет обрабатывать и агрегировать в единый фрейм данных, который продолжает отслеживать сумму некоторых значений для каждого идентификатора. Правильно ли следующий способ? Похоже, что старые данные из предыдущих микропакетов все еще находятся в «таблице мониторинга» для каждого запроса, что вызывает нежелательное агрегирование. Как лучше всего решить эту проблему? Спасибо!
val monitoring_stream = df.writeStream
.outputMode("append")
.format("memory")
.queryName("monitoring_table")
.start()
var rounds = 0
while(monitoring_stream.isActive) {
Thread.sleep(10000)
spark.sql("SELECT * from monitoring_table").show()
var tempDF = spark.sql("SELECT * from monitoring_table")
var batchDF_group = tempDF.withWatermark("timestamp", "10 seconds").groupBy("id").sum("download_volume", "upload_volume").withColumnRenamed("sum(download_volume)","total_download_volume_batch").withColumnRenamed("sum(upload_volume)","total_upload_volume_batch")
monitoring_df = monitoring_df.join(batchDF_group, monitoring_df("id") === batchDF_group("id"), "left").select(monitoring_df("id"), monitoring_df("total_download_volume"), monitoring_df("upload_volume"), monitoring_df("total_volume"), batchDF_group("total_download_volume_batch"), batchDF_group("total_upload_volume_batch")).na.fill(0)
monitoring_df = monitoring_df.withColumn("total_upload_volume", monitoring_df("total_upload_volume")+monitoring_df("total_upload_volume_batch"))
monitoring_df = monitoring_df.withColumn("total_download_volume", monitoring_df("total_download_volume")+monitoring_df("total_download_volume_batch"))
monitoring_df = monitoring_df.withColumn("total_volume", monitoring_df("total_download_volume")+monitoring_df("total_upload_volume"))
monitoring_df.show()
}```