Spark - не основанные на времени окна не поддерживаются при потоковой передаче данных DataFrames / Datasets; - PullRequest
0 голосов
/ 14 ноября 2018

Мне нужно написать Spark SQL-запрос с внутренним выбором и разделить на. Проблема в том, что у меня есть AnalysisException. Я уже потратил несколько часов на это, но с другим подходом я не добился успеха.

Исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]

Я предполагаю, что это слишком сложный запрос, чтобы реализовать его следующим образом. Но я не знаю, как это исправить.

 SparkSession spark = SparkUtils.getSparkSession("RawModel");

 Dataset<RawModel> datasetMap = readFromKafka(spark);

 datasetMap.registerTempTable("test");

 Dataset<Row> res = datasetMap.sqlContext().sql("" +
                " select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
                " from (select test.*,  sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
                "  from test " +
                "  ) test " +
                " group by deviceid, grp ");

Любое предложение будет очень оценено. Спасибо.

1 Ответ

0 голосов
/ 20 апреля 2019

Я полагаю, что проблема в спецификации windowing :

over (partition by deviceId order by timestamp) 

раздел должен быть в столбце на основе времени - в вашем случае отметка времени .Должно работать следующее:

over (partition by timestamp order by timestamp) 

Это, конечно, не решит намерение вашего запроса.Можно попытаться сделать следующее: но неясно, будет ли его поддерживать spark:

over (partition by timestamp, deviceId order by timestamp) 

Даже если spark поддерживает , это все равно изменит семантику вашего запроса.

Обновление

Вот окончательный источник: от Татхагата дас , который является ключевым / основным коммиттером на spark streaming : http://apache -spark-user-list.1001560.n3.nabble.com / «Делает-по-заказу-только-по-делу-в-состоянии-случае»td31816.html

enter image description here

...