Я использую kafka-streams для объединения сообщений в KTable. Внутри моей логики агрегации я всегда возвращаю один и тот же аккумулятор, как показано ниже:
streamOfInts
.groupByKey()
.aggregate(Accumulator.empty()) {k,v,acc -> acc}
.toStream()
.to(...)
Я ожидал бы, что - поскольку значение таблицы KTable не изменилось - никакое значение не будет отправлено в нисходящем направлении. Однако, это не так. Функция агрегирования всегда пересылает обновления.
Как лучше всего убедиться, что обновления, которые приводят к одинаковому (или равному) значению, не приведут к пересылке вниз по потоку?