У меня есть поток сообщений с разными ключами. Для каждого ключа я хочу создать окно сеанса времени события и выполнить некоторую обработку для него, только если:
MIN_EVENTS
количество событий было накоплено в окне (по существу, состояние ключа)
Для каждой клавиши MIN_EVENTS
отличается и может измениться во время выполнения . Я испытываю трудности с реализацией этого. В частности, я реализую эту логику c следующим образом:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
Я пытаюсь создать пользовательский MyCustomCountTrigger()
, который должен быть способен читать из хранилища состояний, такого как MapState<String, Integer> stateStore
, который отображает key
к параметру MIN_EVENTS
. Мне известно, что я могу получить доступ к хранилищу состояний с помощью объекта TriggerContext ctx
, который доступен всем триггерам.
Как инициализировать это хранилище состояний вне класса CountTrigger ()? Мне не удалось найти примеры для этого.