Мы используем kafka KTable для агрегации, и ниже указан тип данных, которые мы получаем во входных данных. Входные данные - детали транзакции (идентификатор транзакции, статус, категория, сумма, ..)
Мы группируем вышеуказанные данные.ниже ключ группирования - (статус, категория)
логика приложения
Grouped Stream. Aggregate(() -> new Instance(), (key, newVal, aggVal) - > addAmount(newVal). (key, oldVal, aggVal) - > removeAmount(oldVal));
Допустим, мы получаем поток данных ниже (транзакция, статус, категория, сумма)
1 - 1, в ожидании, наличные деньги, 10 // (в ожидании, наличные деньги) - 10 агрегированное значение
2 - 2, в ожидании, наличные деньги, 20 // (в ожидании, наличные деньги) -30
3 - 3, фактический, карта, 15 // (актуальный, карта) - 15
4 - 1. Ожидание, карта 9 // (ожидающий, наличные) - 30,(в ожидании, карта) - 9 - - - Это - то, где мы получаем проблему
В # 4, хотя обновление для той же транзакцииId 1, но ключ группировки изменяется (от наличных к карте)теперь, поскольку группировка изменилась, она не вызывает метод removeAmoutn (), а вызывается только метод addAmount ().
Любая идея о том, как мы можем решить эту проблему, если при изменении группировки она должна позаботиться и о ранее агрегированных данных.
Здесь я нашел похожий вариант использования https://stackoverflow.com/a/42685866/2699756
Но не уверен, что было сделано, исправь это.