есть ли метод потоков Кафки, чтобы уменьшить поток чисел, чтобы "выводить" только при изменении числа - PullRequest
0 голосов
/ 18 октября 2019

Я пытаюсь использовать пары Кафки, чтобы уменьшить последовательность чисел, и я хочу получить запись только тогда, когда данные изменились. Он отлично работает, но проблема в том, что он не догоняет данные из kafka, если служба, выполняющая код, не работает. Итак, я думаю, что решение неверно? Мой код:

KGroupedStream<String, JsonNode> groupedStream = filteredStream.groupByKey( Serdes.String(), jsonSerde);
KTable<String, JsonNode> reducedTable = groupedStream.reduce(
                (aggValue, newValue) ->  Calculate.newValue( newValue, aggValue, logger) ,/* adder */
                "reduced-stream-store" /* state store name */);
KStream<String, JsonNode> reducedStream =  reducedTable.toStream();

метод "Рассчитать":

if (value != oldValue)
 return value
else return  null.

спасибо, если у вас есть комментарии / предложения

1 Ответ

0 голосов
/ 20 октября 2019

return null в вашем коде удалит запись из таблицы результатов. Следовательно, ваш код не выполняет то, что вы ожидаете.

Фактически операторы DSL выдают «при обновлении», а не «при изменении», и, таким образом, вы не можете использовать DSL для своего варианта использования. Есть билет, в котором предлагается добавить семантику «при изменении» (https://issues.apache.org/jira/browse/KAFKA-8770).

. В качестве обходного пути вам нужно будет вместо этого использовать пользовательский transform() с хранилищем статистики. Для каждой входной записи вы проверяетеесли она существует в хранилище. Если нет, отправьте запись и поместите ее в хранилище. Если существует и то же самое, не отправляйте ничего. Если это не так, создайте и обновите хранилище.

...