да, это возможно с помощью Kafka Streams State Store.Логика зависит от того, что именно вам нужно сделать для достижения количества обработанных сообщений.
если вам нужно распространить данные на следующий процессор или узел приемника, вам нужно сохранить агрегированные значения в виде списка объектов в хранилище состояний ключ-значение.внутри Processor.process(..)
вы помещаете данные в хранилище значений ключей, после чего проверяете, достигнуто ли количество элементов, и выполняете ли логику (например, processorContext.forward(..)
).пожалуйста, посмотрите на подобный пример здесь .
, если вам нужно выполнить некоторую логику после достижения числа и не нуждаться в значениях, вы можете хранить только счетчик и с шагом Processor.process(..)
это значение.