Потоки Кафки: агрегированные результаты с использованием временных окон? - PullRequest
0 голосов
/ 04 июня 2018

У меня есть следующее приложение потока kafka, которое мне нужно для агрегирования данных с помощью пользовательского ключа.Ключ меняется, но для простоты я начал со смены ключа на одно поле (textId в SampleMessage).После группы мне нужно получить сумму (сумму) - (сумма является двойным полем в классе SampleMessage).Это то, что я придумал.

StreamsBuilder builder = new StreamsBuilder();

builder = builder.addStateStore(Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("myStore"),
            Serdes.String(),
            Serdes.Long()).withLoggingDisabled());


KTable<String, SampleMessage> sampleMsgKtable = builder.table(TOPIC_NAME,
            Consumed.with(Serdes.String(), sampleMsgSerde));

KGroupedTable<String, SampleMessage> groupByAggregation = sampleMsgKtable.groupBy((key, value) -> {
       String groupBy = getGroupBy(/**Params **/); // key is now textId
       return KeyValue.pair(groupBy, value);
    }, Serialized.with(Serdes.String(), sampleMsgSerde));

KTable<String, SampleMessage> reduce = groupByAggregation.reduce(
            (current, newValue) -> {

                double currentAmount = current.getAmount();
                double newAmount = newValue.getAmount();
                double total = currentAmount + newAmount;
                current.setAmount(total);

                return current;
            },
            (val, agg) -> {

                double valAmount = val.getAmount();
                double aggAmount = agg.getAmount();
                double diff = aggAmount - valAmount;
                agg.setAmount(diff);

                return agg;
            });

 KTable<String, String> finalData = myTransformer.transformToString(reduce);

 finalData.toStream().to("output");

Я тестирую приведенный выше код со следующими сообщениями (используя kafka-streams-test-utils-1.1.0).5 Сообщения выдаются следующим образом:

1. textId = x , amount = 45
2. textId = x , amount = 45
3. textId = x , amount = 45
4. textId = x , amount = 45
5. textId = y , amount = 45

Я получаю следующее

1. textId = x , amount = 45
2. textId = x , amount = 90
3. textId = x , amount = 135
4. textId = x , amount = 180
5. textId = y , amount = 45

Теперь я хочу сделать это агрегирование на основе временного окна (например, агрегация по 5-минутным интервалам времени).Как это сделать с KTables?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...