Что произойдет, если агрегатор KGroupedStream вернет ноль? - PullRequest
0 голосов
/ 07 октября 2018
KStream<Integer, Integer> stream;
KGroupedStream<Integer, Integer> grouped = stream.groupByKey();
KTable<Integer, Integer> aggregated = grouped.aggregate(
    () -> 0,
    (k, i, agg) -> {
       if (agg == null)
         agg = 0;
       Integer sum = agg + i;
       return sum > 100 ? null : sum;
    });

Сообщения в моем потоке:

  1. (1, 50)
  2. (1, 75)
  3. (1, 50)

Когда приходит второе сообщение, агрегатор возвращает значение NULL.KTable aggregated получает (1, ноль) и удаляет свое состояние для ключа = 1?

Когда приходит сообщение № 3, имеет значение agg ноль или инициализатор вызывается снова и устанавливает agg в 0?

Что если я использую вместо совокупного сокращение, если редуктор возвращает ноль, будет ли следующее сообщение проходить через редуктор или будет использоваться «как есть», как это было первое сообщение в группе?

Спасибо, Дэвид

1 Ответ

0 голосов
/ 08 октября 2018

Когда приходит второе сообщение, агрегатор возвращает значение NULL.Получает ли агрегированный KTable (1, ноль) и удаляет его состояние для ключа = 1?

Да.

Когда приходит сообщение № 3, оно равно null или является инициализаторомвызывается снова и устанавливает agg равным 0?

Инициализатор вызывается снова.

Что если я использую сокращение вместо агрегата, если редуктор возвращает ноль, то следующее сообщение будет отправленочерез Редуктор или будет использоваться «как есть», как если бы это было первое сообщение в группе?

Редакция работает как агрегат.Таким образом, если вы вернете null, следующее сообщение будет обработано так, как если бы оно было первым сообщением.

Мета-комментарий: почему вы не просто запустите код и попробуете его ???

...