Я пытаюсь использовать пары Кафки, чтобы уменьшить последовательность чисел, и я хочу получить запись только тогда, когда данные изменились. Он отлично работает, но проблема в том, что он не догоняет данные из kafka, если служба, выполняющая код, не работает. Итак, я думаю, что решение неверно? Мой код:
KGroupedStream<String, JsonNode> groupedStream = filteredStream.groupByKey( Serdes.String(), jsonSerde);
KTable<String, JsonNode> reducedTable = groupedStream.reduce(
(aggValue, newValue) -> Calculate.newValue( newValue, aggValue, logger) ,/* adder */
"reduced-stream-store" /* state store name */);
KStream<String, JsonNode> reducedStream = reducedTable.toStream();
метод "Рассчитать":
if (value != oldValue)
return value
else return null.
спасибо, если у вас есть комментарии / предложения