Доступ к хранилищу состояний под ключ в Apache Flink, который изменяется динамически - PullRequest
0 голосов
/ 03 февраля 2020

У меня есть поток сообщений с разными ключами. Для каждого ключа я хочу создать окно сеанса времени события и выполнить некоторую обработку для него, только если:

  • MIN_EVENTS количество событий было накоплено в окне (по существу, состояние ключа)

Для каждой клавиши MIN_EVENTS отличается и может измениться во время выполнения . Я испытываю трудности с реализацией этого. В частности, я реализую эту логику c следующим образом:

        inputStream.keyBy(key).
        window(EventTimeSessionWindow(INACTIVITY_PERIOD).
        trigger(new MyCustomCountTrigger()).
        apply(new MyProcessFn())

Я пытаюсь создать пользовательский MyCustomCountTrigger(), который должен быть способен читать из хранилища состояний, такого как MapState<String, Integer> stateStore, который отображает key к параметру MIN_EVENTS. Мне известно, что я могу получить доступ к хранилищу состояний с помощью объекта TriggerContext ctx, который доступен всем триггерам.

Как инициализировать это хранилище состояний вне класса CountTrigger ()? Мне не удалось найти примеры для этого.

1 Ответ

0 голосов
/ 03 февраля 2020

Вы можете инициализировать состояние на основе параметров, отправленных в конструктор вашего класса Trigger. Но вы не можете получить доступ к состоянию за пределами этого класса.

Если вам нужна большая гибкость, я предлагаю вам использовать функцию процесса вместо окна.

...