Агрегация Кафки: проблема при группировке ключевых изменений - PullRequest
1 голос
/ 22 мая 2019

Мы используем kafka KTable для агрегации, и ниже указан тип данных, которые мы получаем во входных данных. Входные данные - детали транзакции (идентификатор транзакции, статус, категория, сумма, ..)

Мы группируем вышеуказанные данные.ниже ключ группирования - (статус, категория)

логика приложения

Grouped Stream. Aggregate(() -> new Instance(), (key, newVal, aggVal) - > addAmount(newVal). (key, oldVal, aggVal) - > removeAmount(oldVal));

Допустим, мы получаем поток данных ниже (транзакция, статус, категория, сумма)

1 - 1, в ожидании, наличные деньги, 10 // (в ожидании, наличные деньги) - 10 агрегированное значение

2 - 2, в ожидании, наличные деньги, 20 // (в ожидании, наличные деньги) -30

3 - 3, фактический, карта, 15 // (актуальный, карта) - 15

4 - 1. Ожидание, карта 9 // (ожидающий, наличные) - 30,(в ожидании, карта) - 9 - - - Это - то, где мы получаем проблему

В # 4, хотя обновление для той же транзакцииId 1, но ключ группировки изменяется (от наличных к карте)теперь, поскольку группировка изменилась, она не вызывает метод removeAmoutn (), а вызывается только метод addAmount ().

Любая идея о том, как мы можем решить эту проблему, если при изменении группировки она должна позаботиться и о ранее агрегированных данных.

Здесь я нашел похожий вариант использования https://stackoverflow.com/a/42685866/2699756

Но не уверен, что было сделано, исправь это.

...