Я пытаюсь выполнить пользовательскую агрегацию при структурированной потоковой передаче с оконным временем событий.
Сначала я попытался использовать интерфейс #Aggregator (typed-UDAF) с функцией .agg, что-то вроде:
val aggregatedDataset = streamDataset
.select($"id",$"time", $"eventType", $"eventValue"))
.groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))
Тем не менее, эта агрегация (в функции сокращения) работает только с новым элементом ввода, а не со всей группой
Поэтому я пытаюсь использовать функцию GroupState (mapGroupsWithState, flapMapGroupWithState) или даже просто функцию mapGroups (без состояния) для выполнения агрегации
Но моя операция groupBy возвращает RelationalGroupedDataset, и мне нужен KeyValueGroupedDataset, чтобы использовать функции карты. groupByKey не работает с окнами.
Как мне управлять пользовательской агрегацией со структурированным потоковым и синхронизированным событием?
Спасибо!