Kafka Streams - Эффективное вычисление мин / макс на KTable - PullRequest
0 голосов
/ 10 декабря 2018

В приложении Kafka Streams у нас есть KTable (не KStream), в котором мы хотим вычислить некоторую статистику, такую ​​как минимум или максимум всех "строк" / записей, которые удовлетворяют определенному свойству.Поэтому мы преобразуем KTable в KGroupedTable, установив свойство для группировки в качестве ключа.Используя эту сгруппированную таблицу, теперь просто вычислить что-то вроде счета или суммы.Нам просто нужно использовать метод aggregate с определенными соответствующими функциями сумматора и вычитателя.(+/- 1 для подсчета, +/- значение для суммы).

Однако для таких агрегатов, как min / max, такой простой функции субстрактора нет.Одним из решений для реализации агрегации мин / макс было бы агрегирование значения в нечто похожее на карту, где функция сумматора добавляет к карте , а вычитает , удаляя с карты .Затем на следующем шаге мы можем map сопоставить это значение со значением min / max, просто перебирая записи.

// Example without types and serdes
KTable sums = myKTable.groupBy((k, v) -> KeyValue.pair(v.getProperty(), v)
                      .aggregate(() -> 0, (k, v, a) -> a + v.getValue(), , (k, v, a) -> a - v.getValue();

KTable mins = myKTable.groupBy((k, v) -> KeyValue.pair(v.getProperty(), v)
                      .aggregate(() -> Map.of(), (k, v, a) -> a.put(v.getId(), v.getValue()), , (k, v, a) -> a.remove(v.getId())
                      .mapValues((k, v) -> StatsHelper.min(v));

Это, однако, не выглядит действительно гладким , поскольку мыпришлось бы сериализовать всю карту со всеми ее записями все время.Есть ли более приятные подходы для достижения этой цели, то есть получение всех соответствующих «строк» ​​непосредственно с помощью фреймворка?Или это просто невозможно?

...