Функция Spark MapGroupsWithState Update не позволяет выполнять запросы действий, такие как фильтрация, подсчет, показ и т. Д. - PullRequest
1 голос
/ 07 июня 2019

У меня есть приложение IoT, в котором я получаю данные от различных счетчиков энергии и счетчиков инверторов.Эти счетчики непрерывно отправляют значение счетчика, так как количество потребляемых единиц отсутствует.Это значение непрерывно увеличивается с течением времени, и мне приходится рассчитывать одночасовое потребление энергии для каждого счетчика.

Все эти данные я получаю в теме кафки, из которой я создаю структурированный потоковый кадр данных.

На этом расширенном фрейме данных я применяю функцию mapGroupsWithState.Это должно вернуть мне один час поколения.

Проблема: я не могу выполнить какие-либо операции подсчета, показа, фильтрации, агрегирования внутри функции обновления с использованием искрового фрейма данных.

    val df_output = final_df
      .selectExpr("*")
      .as[input_row_druid]
      .groupByKey(_.plant_slug)
      .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateAcrossEvents)
      .writeStream
      .format("console")
      .outputMode("update")
      .start()

    df_output.awaitTermination()


  def updateAcrossEvents(plant_slug:String, inputs: Iterator[input_row_druid],
                         oldState: GroupState[source_state]):out_state = {
    val spark_session = SparkSession.builder().getOrCreate()
    import spark_session.implicits._

    val list_of_list = inputs.toList
    val new_df = list_of_list.toDF
    var my_state:source_state = if (oldState.exists) oldState.get else source_state(plant_slug,list_of_list)

    println("Printing inverter and merter df with counts")
    val inverter_df = new_df.filter($"device_type" === "INVERTER")
    val meter_df = new_df.filter($"device_type" === "METER")
    println(inverter_df.show())
    val inv_count = inverter_df.count()
    println(meter_df.show())
    val meter_count = meter_df.count()
    println(inv_count)
    println(meter_count)

    val new_state = source_state(plant_slug, list_of_list)
    oldState.update(new_state)
    var out = out_state(plant_slug,inv_count.toString,meter_count.toString)
    out
  }

Если я не выполняю никаких операций фильтрацииnew_df.show печатает все данные в датафрейме.Но считать, показывать, фильтр не работает.

Одна вещь, которую я заметил, это то, что всякий раз, когда я отправляю задание на запуск, он запускается в нескольких пакетах, пакет 0 всегда успешен, но зависает в пакете 0. Он никогда не продолжается в следующемпакет.

Это ожидаемый результат

+--------------------+-------+---------+
|          plant_slug|inv_gen|meter_gen|
+--------------------+-------+---------+
|              plant1|     11|       10|
|              plant2|     20|       19|
|              plant3|     40|       38|
|              plant4|     59|       57|
|              plant5|     37|       35|
+--------------------+-------+---------+
...