Я хочу найти среднее значение между каждым фреймом окна количества аналогичной системы с использованием структурированной потоковой передачи Spark (концепция аналогична скользящей средней).Однако структурированная потоковая передача не позволяет нам посчитать аналогичный столбец и найти среднее значение на основе этого подсчитанного значения, у него будет ошибка, в которой говорится, что
Multiple streaming aggregations are not supported with streaming DataFrames/Datasets
Input
+--------------------+--------------------+
| window| system|
+--------------------+--------------------+
|[2019-06-21 09:23...|A |
|[2019-06-21 09:23...|A |
|[2019-06-21 09:24...|A |
|[2019-06-21 09:24...|B |
|[2019-06-21 09:24...|B |
|[2019-06-21 09:25...|C |
+--------------------+--------------------+
Output
+--------------------+--------------------+-----+-----+
| window| system|count|avg |
+--------------------+--------------------+-----+-----+
|[2019-06-21 09:23...|A | 2| 2|
|[2019-06-21 09:24...|A | 1| 1.5|
|[2019-06-21 09:24...|B | 2| 2|
|[2019-06-21 09:25...|C | 1| 1|
+--------------------+--------------------+-----+-----+
Я уже пытался собрать егов HDFS и используйте его снова, чтобы выполнить отдельную агрегацию по отдельности (что не является предпочтительным решением, поскольку оно будет тратить много времени и памяти), но у него все еще есть проблема с «window.start» при попытке применить водяной знак.
Вот схема
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- system: string (nullable = true)
|-- count: long (nullable = true)
avg_data=raw_data\
.withWatermark("window.start", "1 minute")\
.groupBy("system")\
.agg(avg("count").alias("avg"))
An error occurred while calling o667.withWatermark.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark 'window.start, interval 1 minutes
Поскольку я делаю это в структурированной потоковой передаче, поэтому Window в этом случае не будет поддерживаться, поскольку
Non-time-based windows are not supported on streaming DataFrames/Datasets;
Вот код, когда я группирую / считаю похожую тему.
groupped_data=raw_data\
.withWatermark("timestamp", "1 minute")\
.groupBy(window('timestamp', "1 minute", "1 minute"),"system")\
.agg(count("system").alias("count"))\