У меня есть следующее приложение потока kafka, которое мне нужно для агрегирования данных с помощью пользовательского ключа.Ключ меняется, но для простоты я начал со смены ключа на одно поле (textId в SampleMessage).После группы мне нужно получить сумму (сумму) - (сумма является двойным полем в классе SampleMessage).Это то, что я придумал.
StreamsBuilder builder = new StreamsBuilder();
builder = builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("myStore"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled());
KTable<String, SampleMessage> sampleMsgKtable = builder.table(TOPIC_NAME,
Consumed.with(Serdes.String(), sampleMsgSerde));
KGroupedTable<String, SampleMessage> groupByAggregation = sampleMsgKtable.groupBy((key, value) -> {
String groupBy = getGroupBy(/**Params **/); // key is now textId
return KeyValue.pair(groupBy, value);
}, Serialized.with(Serdes.String(), sampleMsgSerde));
KTable<String, SampleMessage> reduce = groupByAggregation.reduce(
(current, newValue) -> {
double currentAmount = current.getAmount();
double newAmount = newValue.getAmount();
double total = currentAmount + newAmount;
current.setAmount(total);
return current;
},
(val, agg) -> {
double valAmount = val.getAmount();
double aggAmount = agg.getAmount();
double diff = aggAmount - valAmount;
agg.setAmount(diff);
return agg;
});
KTable<String, String> finalData = myTransformer.transformToString(reduce);
finalData.toStream().to("output");
Я тестирую приведенный выше код со следующими сообщениями (используя kafka-streams-test-utils-1.1.0).5 Сообщения выдаются следующим образом:
1. textId = x , amount = 45
2. textId = x , amount = 45
3. textId = x , amount = 45
4. textId = x , amount = 45
5. textId = y , amount = 45
Я получаю следующее
1. textId = x , amount = 45
2. textId = x , amount = 90
3. textId = x , amount = 135
4. textId = x , amount = 180
5. textId = y , amount = 45
Теперь я хочу сделать это агрегирование на основе временного окна (например, агрегация по 5-минутным интервалам времени).Как это сделать с KTables?