KTable агрегат пересылает те же сообщения - PullRequest
0 голосов
/ 17 октября 2019

Я использую kafka-streams для объединения сообщений в KTable. Внутри моей логики агрегации я всегда возвращаю один и тот же аккумулятор, как показано ниже:

  streamOfInts
    .groupByKey()
    .aggregate(Accumulator.empty()) {k,v,acc -> acc}
    .toStream()
    .to(...)

Я ожидал бы, что - поскольку значение таблицы KTable не изменилось - никакое значение не будет отправлено в нисходящем направлении. Однако, это не так. Функция агрегирования всегда пересылает обновления.

Как лучше всего убедиться, что обновления, которые приводят к одинаковому (или равному) значению, не приведут к пересылке вниз по потоку?

1 Ответ

0 голосов
/ 20 октября 2019

Операторы DSL выдают «при обновлении», а не «при изменении» по атм. Существует билет JIRA, в котором предлагается добавить семантику «emit on change» (https://issues.apache.org/jira/browse/KAFKA-8770).

. В качестве обходного пути вы можете реализовать пользовательский transform() с хранилищем состояний - для каждой входной записи вы проверяетесохранить, если он новый (-> emit and put to store) или если он изменился (-> emit and update store). Если он существует и не изменился, ничего не генерировать.

...