Как запланировать периодическое задание в зависимости от количества обработанных сообщений? - PullRequest
0 голосов
/ 30 декабря 2018

Я хочу использовать Kafka Processor API для обработки сообщений от Kafka.Я хотел бы вызывать некоторую периодически выполняемую функцию - что-то вроде: context.schedule(IntervalMS,punctuationType, somePunctuator), где somePunctuator выполняет некоторую периодическую работу, но вместо использования интервального времени в качестве триггера я хотел бы вызвать эту задачу после обработки некоторого количества сообщений

Isможно ли сделать такой запуск в потоках Кафки?

1 Ответ

0 голосов
/ 30 декабря 2018

да, это возможно с помощью Kafka Streams State Store.Логика зависит от того, что именно вам нужно сделать для достижения количества обработанных сообщений.

если вам нужно распространить данные на следующий процессор или узел приемника, вам нужно сохранить агрегированные значения в виде списка объектов в хранилище состояний ключ-значение.внутри Processor.process(..) вы помещаете данные в хранилище значений ключей, после чего проверяете, достигнуто ли количество элементов, и выполняете ли логику (например, processorContext.forward(..)).пожалуйста, посмотрите на подобный пример здесь .

, если вам нужно выполнить некоторую логику после достижения числа и не нуждаться в значениях, вы можете хранить только счетчик и с шагом Processor.process(..)это значение.

...