Мне нужен ваш совет, в действительности, в моей задаче мне нужно агрегировать события по двум типам агрегации.Первый тип - onCount
, второй тип - onTime
.
Если событие для агрегации onCount
- оно имеет номер поля - number
события, а totalCount
- какое количествособытия, которые мы должны накопить перед агрегированием.
Если событие для onTime
агрегации - оно имеет поле time
- это date
, после которого мы должны получить все события накопления и начать агрегирование.
Я могу сгруппировать события по типу, окну запуска и установить триггер:
stream
.keyBy(e => (e.clientSystemId, e.onMode))
.window(GlobalWindows.create())
.trigger(new WindowAggregationTrigger())
Но в триггере мне нужно иметь состояние - общее количество или время.И в лучшем решении - мне нужны два разных триггера - первый - для подсчета, а второй - для агрегации времени.
Мой вопрос - как красиво решить эту проблему?Когда мне нужны два триггера с разной логикой - первый о счетчике, второй - о временном триггере.
Я не прошу решить проблему для меня, я прошу совета.
Мы развиваемся наApache Flink 1.4.