Как найти среднее значение подсчитанных данных в структурированной потоковой передаче Spark - PullRequest
1 голос
/ 21 июня 2019

Я хочу найти среднее значение между каждым фреймом окна количества аналогичной системы с использованием структурированной потоковой передачи Spark (концепция аналогична скользящей средней).Однако структурированная потоковая передача не позволяет нам посчитать аналогичный столбец и найти среднее значение на основе этого подсчитанного значения, у него будет ошибка, в которой говорится, что

Multiple streaming aggregations are not supported with streaming DataFrames/Datasets
Input 
+--------------------+--------------------+
|              window|              system|
+--------------------+--------------------+
|[2019-06-21 09:23...|A                   |    
|[2019-06-21 09:23...|A                   |  
|[2019-06-21 09:24...|A                   |  
|[2019-06-21 09:24...|B                   |  
|[2019-06-21 09:24...|B                   | 
|[2019-06-21 09:25...|C                   |
+--------------------+--------------------+
Output
+--------------------+--------------------+-----+-----+
|              window|              system|count|avg  |
+--------------------+--------------------+-----+-----+
|[2019-06-21 09:23...|A                   |    2|    2|
|[2019-06-21 09:24...|A                   |    1|  1.5|
|[2019-06-21 09:24...|B                   |    2|    2|
|[2019-06-21 09:25...|C                   |    1|    1|
+--------------------+--------------------+-----+-----+

Я уже пытался собрать егов HDFS и используйте его снова, чтобы выполнить отдельную агрегацию по отдельности (что не является предпочтительным решением, поскольку оно будет тратить много времени и памяти), но у него все еще есть проблема с «window.start» при попытке применить водяной знак.

Вот схема

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- system: string (nullable = true)
 |-- count: long (nullable = true)
avg_data=raw_data\
.withWatermark("window.start", "1 minute")\
.groupBy("system")\
.agg(avg("count").alias("avg"))
 An error occurred while calling o667.withWatermark.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark 'window.start, interval 1 minutes

Поскольку я делаю это в структурированной потоковой передаче, поэтому Window в этом случае не будет поддерживаться, поскольку

Non-time-based windows are not supported on streaming DataFrames/Datasets;

Вот код, когда я группирую / считаю похожую тему.

groupped_data=raw_data\
.withWatermark("timestamp", "1 minute")\
.groupBy(window('timestamp', "1 minute", "1 minute"),"system")\
.agg(count("system").alias("count"))\
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...