У меня есть приложение IoT, в котором я получаю данные от различных счетчиков энергии и счетчиков инверторов.Эти счетчики непрерывно отправляют значение счетчика, так как количество потребляемых единиц отсутствует.Это значение непрерывно увеличивается с течением времени, и мне приходится рассчитывать одночасовое потребление энергии для каждого счетчика.
Все эти данные я получаю в теме кафки, из которой я создаю структурированный потоковый кадр данных.
На этом расширенном фрейме данных я применяю функцию mapGroupsWithState.Это должно вернуть мне один час поколения.
Проблема: я не могу выполнить какие-либо операции подсчета, показа, фильтрации, агрегирования внутри функции обновления с использованием искрового фрейма данных.
val df_output = final_df
.selectExpr("*")
.as[input_row_druid]
.groupByKey(_.plant_slug)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateAcrossEvents)
.writeStream
.format("console")
.outputMode("update")
.start()
df_output.awaitTermination()
def updateAcrossEvents(plant_slug:String, inputs: Iterator[input_row_druid],
oldState: GroupState[source_state]):out_state = {
val spark_session = SparkSession.builder().getOrCreate()
import spark_session.implicits._
val list_of_list = inputs.toList
val new_df = list_of_list.toDF
var my_state:source_state = if (oldState.exists) oldState.get else source_state(plant_slug,list_of_list)
println("Printing inverter and merter df with counts")
val inverter_df = new_df.filter($"device_type" === "INVERTER")
val meter_df = new_df.filter($"device_type" === "METER")
println(inverter_df.show())
val inv_count = inverter_df.count()
println(meter_df.show())
val meter_count = meter_df.count()
println(inv_count)
println(meter_count)
val new_state = source_state(plant_slug, list_of_list)
oldState.update(new_state)
var out = out_state(plant_slug,inv_count.toString,meter_count.toString)
out
}
Если я не выполняю никаких операций фильтрацииnew_df.show печатает все данные в датафрейме.Но считать, показывать, фильтр не работает.
Одна вещь, которую я заметил, это то, что всякий раз, когда я отправляю задание на запуск, он запускается в нескольких пакетах, пакет 0 всегда успешен, но зависает в пакете 0. Он никогда не продолжается в следующемпакет.
Это ожидаемый результат
+--------------------+-------+---------+
| plant_slug|inv_gen|meter_gen|
+--------------------+-------+---------+
| plant1| 11| 10|
| plant2| 20| 19|
| plant3| 40| 38|
| plant4| 59| 57|
| plant5| 37| 35|
+--------------------+-------+---------+