Spark Structured Streaming - настраиваемая агрегация с событием времени окна - PullRequest
0 голосов
/ 09 мая 2018

Я пытаюсь выполнить пользовательскую агрегацию при структурированной потоковой передаче с оконным временем событий.
Сначала я попытался использовать интерфейс #Aggregator (typed-UDAF) с функцией .agg, что-то вроде:

val aggregatedDataset = streamDataset
  .select($"id",$"time", $"eventType", $"eventValue"))
  .groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))

Тем не менее, эта агрегация (в функции сокращения) работает только с новым элементом ввода, а не со всей группой

Поэтому я пытаюсь использовать функцию GroupState (mapGroupsWithState, flapMapGroupWithState) или даже просто функцию mapGroups (без состояния) для выполнения агрегации

Но моя операция groupBy возвращает RelationalGroupedDataset, и мне нужен KeyValueGroupedDataset, чтобы использовать функции карты. groupByKey не работает с окнами.

Как мне управлять пользовательской агрегацией со структурированным потоковым и синхронизированным событием?

Спасибо!

1 Ответ

0 голосов
/ 09 мая 2018

Функции GroupState (s) - mapGroupsWithState, flapMapGroupWithState или mapGroups (без состояния) используются для выполнения агрегации только тогда, когда нам нужно работать в режиме вывода Update.

Но если мы используем режим вывода Complete, нам не нужны функции GroupState.

Итак, если вы измените режим вывода запроса aggregatedDataset на Complete, тогда он будет работать как положено.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...