У меня есть KStream
конвейер, который группирует по ключу, а затем через какой-то интервал окна, а затем применяет пользовательское агрегирование для этого:
KStream<String, Integer> input = /* define input stream */
/* group by key and then apply windowing */
KTable<Windowed<String>, MyAggregate> aggregateTable =
input.groupByKey()
.windowedBy(/* window defintion here */)
.aggregate(MyAggregate::new, (key, value, agg) -> agg.addAndReturn(value))
// I need to get a change log of aggregateTable so:
aggregateTable.toStream().to("output-topic");
Проблема в том, что большинство входных записей не изменитсявнутреннее состояние MyAggregate
объекта.Структура похожа на:
class MyAggregate {
private Set<Integer> checkBeforeInsert = /* some predefined values */
private List<Integer> actualState = new ArrayList<>();
public MyAggregate addAndReturn(Integer value) {
/* for 99% of records the if check passes */
if (checkBeforeInsert.contains(value)) {
/* do nothing and return. Note that the state hasn't been changed */
return this;
} else {
actualState.add(value);
return this;
}
}
}
Однако KStream
не имеет ни малейшего представления о том, что объект агрегата не был изменен, он все еще хранит агрегат (который совпадает со старым).Он также распространяется на то же старое значение в теме журнала изменений и также запускает aggregateTable.toStream()
с тем же старым значением.
Хотя семантика моего приложения работает нормально (остальная часть приложения знает об этом факте, что может появиться неизменное состояние), но это создает огромный шумовой трафик по промежуточным темам.Мне нужен способ уведомить KStream
, действительно ли агрегат был изменен и должен быть сохранен, или он совпадает с предыдущей записью (просто игнорируйте ее).