Операции peek()
, filter()
и branch()
по своей сути не имеют состояния. Когда вы говорите:
Я хочу выполнить какой-либо запрос, а фильтрация сообщений основывается на результатах
, чем это зависит от того, что вы хотите запросить? Возможно (но не рекомендуется) запросить «внешний» API. Тем не менее, нет встроенной поддержки для него, и есть много вариантов, чтобы сделать его устойчивым. Обратите внимание, что при запросе к внешней системе операция с состоянием .
не выполняется. Если вы хотите работать с состоянием, вы можете использовать transform()
(и родственные элементы) и создавать собственные операторы. Если вы называете всех своих последующих операторов (через Named
и аналогичные), вы можете использовать context.forward(..., To.child(...))
для реализации пользовательской ветви. Для фильтрации вы можете вернуть null
, чтобы ничего не пересылать.
Не уверен, для чего будет использоваться peek () с отслеживанием состояния, но вы также можете сделать это.
В зависимости от использования В этом случае также можно реализовать «фильтр с отслеживанием состояния» через соединение потоковой таблицы или объединение stream-globalTable.