Могу ли я выполнить несколько операций с состоянием в режиме просмотра, фильтрации или ветки приложений Kafka Stream? - PullRequest
1 голос
/ 08 апреля 2020

Как мы знаем, в потоке kafka c, peek, filter, branch являются операциями без сохранения состояния? Тем не менее, я хочу сделать несколько операций с состоянием в этом процессоре? Например, я хочу сделать какой-нибудь запрос и отфильтровать сообщения на основе результатов, могу ли я это сделать?

Ответы [ 2 ]

1 голос
/ 09 апреля 2020

Операции peek(), filter() и branch() по своей сути не имеют состояния. Когда вы говорите:

Я хочу выполнить какой-либо запрос, а фильтрация сообщений основывается на результатах

, чем это зависит от того, что вы хотите запросить? Возможно (но не рекомендуется) запросить «внешний» API. Тем не менее, нет встроенной поддержки для него, и есть много вариантов, чтобы сделать его устойчивым. Обратите внимание, что при запросе к внешней системе операция с состоянием .

не выполняется. Если вы хотите работать с состоянием, вы можете использовать transform() (и родственные элементы) и создавать собственные операторы. Если вы называете всех своих последующих операторов (через Named и аналогичные), вы можете использовать context.forward(..., To.child(...)) для реализации пользовательской ветви. Для фильтрации вы можете вернуть null, чтобы ничего не пересылать.

Не уверен, для чего будет использоваться peek () с отслеживанием состояния, но вы также можете сделать это.

В зависимости от использования В этом случае также можно реализовать «фильтр с отслеживанием состояния» через соединение потоковой таблицы или объединение stream-globalTable.

0 голосов
/ 08 апреля 2020

IMO, лучший способ сделать это - использовать поиск в таблице , используя KStream#...join или Processor API , чтобы получить доступ к базовому хранилищу состояний (используя KStream#transformValues) .

Вы могли бы сделать это, но код будет очень неприятным (не рекомендуется), но вы можете получить доступ только для чтения к ReadOnlyKeyValueStore только после того, как состояние Stream переместилось из REBALANCING в RUNNING:

kafkaStreams.setStateListener((newState, oldState) -> {
    if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
        ReadOnlyKeyValueStore<Object, Object> kvStore = kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
        //assign this kvStore to some place so you can later using this referrer access this in filter or in peek
    }
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...