Kafka Streams - Несоответствие количества сообщений в теме - PullRequest
0 голосов
/ 25 апреля 2018

Предположим, у нас есть тема Kafka, которая содержит 1000 сообщений.Мы создаем из него поток (в последующем мы называем его st) и делаем следующее:

int count = 0;

st.groupByKey().count().foreach((key, value) -> {

       count += value)
       System.out.println(count)
});

Когда обработка «заканчивается», она возвращает число, немного превышающее 1000. Что может вызватьэто странное поведение?

1 Ответ

0 голосов
/ 25 апреля 2018

Если в некоторых ваших сообщениях используется один и тот же ключ, ваш код считается с двойным счетом. Обратите внимание, что функция, переданная методу foreach () в KTable, выполняется не один раз для каждой строки, а один раз для обновления строки (возможно, не каждое обновление из-за кэширования). Смотри: https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KTable.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-

Выполните действие для каждой обновленной записи этого KTable. Обратите внимание, что это терминальная операция, которая возвращает void.

Обратите внимание, что foreach () не применяется к внутреннему хранилищу состояний и вызывается только для каждой новой обновленной записи KTable.

Представьте, что у вас есть 3 сообщения с клавишей "A". KTable, созданный агрегатом count (), будет обновлен 3 раза, а ваша функция (лямбда-выражение) будет вызвана 3 раза со следующими параметрами: («A», 1), («A», 2), («A», 3), в результате чего счет увеличивается на 1 + 2 + 3 = 6 вместо увеличения на 3.

KStream и KTable представляют «данные в движении», а их методы обычно работают с потоком данных. Если вы хотите работать с текущим снимком данных, попробуйте вместо этого использовать интерактивные запросы. Возможно, из-за того, что метод KTable.foreach поначалу может сбивать с толку, он не рекомендуется с таким комментарием:

Запрещены. Используйте API-интерфейсы интерактивных запросов (например, KafkaStreams.store (String, QueryableStoreType), за которым следует ReadOnlyKeyValueStore.all ()) для перебора ключей KTable. В качестве альтернативы преобразуйте в KStream с помощью toStream (), а затем используйте foreach (действие) на результат.

...