В приложении Kafka Streams у нас есть KTable (не KStream), в котором мы хотим вычислить некоторую статистику, такую как минимум или максимум всех "строк" / записей, которые удовлетворяют определенному свойству.Поэтому мы преобразуем KTable в KGroupedTable, установив свойство для группировки в качестве ключа.Используя эту сгруппированную таблицу, теперь просто вычислить что-то вроде счета или суммы.Нам просто нужно использовать метод aggregate
с определенными соответствующими функциями сумматора и вычитателя.(+/- 1 для подсчета, +/- значение для суммы).
Однако для таких агрегатов, как min / max, такой простой функции субстрактора нет.Одним из решений для реализации агрегации мин / макс было бы агрегирование значения в нечто похожее на карту, где функция сумматора добавляет к карте , а вычитает , удаляя с карты .Затем на следующем шаге мы можем map
сопоставить это значение со значением min / max, просто перебирая записи.
// Example without types and serdes
KTable sums = myKTable.groupBy((k, v) -> KeyValue.pair(v.getProperty(), v)
.aggregate(() -> 0, (k, v, a) -> a + v.getValue(), , (k, v, a) -> a - v.getValue();
KTable mins = myKTable.groupBy((k, v) -> KeyValue.pair(v.getProperty(), v)
.aggregate(() -> Map.of(), (k, v, a) -> a.put(v.getId(), v.getValue()), , (k, v, a) -> a.remove(v.getId())
.mapValues((k, v) -> StatsHelper.min(v));
Это, однако, не выглядит действительно гладким , поскольку мыпришлось бы сериализовать всю карту со всеми ее записями все время.Есть ли более приятные подходы для достижения этой цели, то есть получение всех соответствующих «строк» непосредственно с помощью фреймворка?Или это просто невозможно?