KStream<Integer, Integer> stream;
KGroupedStream<Integer, Integer> grouped = stream.groupByKey();
KTable<Integer, Integer> aggregated = grouped.aggregate(
() -> 0,
(k, i, agg) -> {
if (agg == null)
agg = 0;
Integer sum = agg + i;
return sum > 100 ? null : sum;
});
Сообщения в моем потоке:
- (1, 50)
- (1, 75)
- (1, 50)
Когда приходит второе сообщение, агрегатор возвращает значение NULL.KTable aggregated
получает (1, ноль) и удаляет свое состояние для ключа = 1?
Когда приходит сообщение № 3, имеет значение agg
ноль или инициализатор вызывается снова и устанавливает agg
в 0?
Что если я использую вместо совокупного сокращение, если редуктор возвращает ноль, будет ли следующее сообщение проходить через редуктор или будет использоваться «как есть», как это было первое сообщение в группе?
Спасибо, Дэвид