Если в некоторых ваших сообщениях используется один и тот же ключ, ваш код считается с двойным счетом.
Обратите внимание, что функция, переданная методу foreach () в KTable, выполняется не один раз для каждой строки, а один раз для обновления строки (возможно, не каждое обновление из-за кэширования). Смотри: https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KTable.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-
Выполните действие для каждой обновленной записи этого KTable. Обратите внимание, что
это терминальная операция, которая возвращает void.
Обратите внимание, что foreach () не применяется к внутреннему хранилищу состояний и
вызывается только для каждой новой обновленной записи KTable.
Представьте, что у вас есть 3 сообщения с клавишей "A". KTable, созданный агрегатом count (), будет обновлен 3 раза, а ваша функция (лямбда-выражение) будет вызвана 3 раза со следующими параметрами:
(«A», 1), («A», 2), («A», 3), в результате чего счет увеличивается на 1 + 2 + 3 = 6 вместо увеличения на 3.
KStream и KTable представляют «данные в движении», а их методы обычно работают с потоком данных. Если вы хотите работать с текущим снимком данных, попробуйте вместо этого использовать интерактивные запросы. Возможно, из-за того, что метод KTable.foreach поначалу может сбивать с толку, он не рекомендуется с таким комментарием:
Запрещены. Используйте API-интерфейсы интерактивных запросов (например,
KafkaStreams.store (String, QueryableStoreType), за которым следует
ReadOnlyKeyValueStore.all ()) для перебора ключей KTable.
В качестве альтернативы преобразуйте в KStream с помощью toStream (), а затем используйте
foreach (действие) на результат.