Потоки и окна Кафки, чтобы вести счет во временном окне - PullRequest
0 голосов
/ 02 мая 2018

Я новичок в Stackoverflow, так что простите, если вопрос задан плохо. Любая помощь / вдохновение высоко ценится!

Я использую потоки Кафки для фильтрации входящих данных в мою базу данных. Входящие сообщения выглядят как {"ID":"X","time":"HH:MM"} и несколько других параметров, не имеющих отношения к данному случаю. Мне удалось запустить приложение Java, которое читает из темы и распечатывает входящие сообщения. Теперь я хочу использовать KTables (?) Для группировки входящих сообщений с одинаковым идентификатором, а затем использовать окно сеанса для группировки таблицы по временным интервалам. Я хочу, чтобы временное окно продолжалось непрерывно по оси X.

Прежде всего, конечно, нужно запустить KTable для подсчета входящих сообщений с одинаковым идентификатором. То, что я хотел бы сделать, должно привести к чему-то вроде этого:

ID       Count
X          1
Y          3
Z          1

, который постоянно обновляется, поэтому сообщения с устаревшей временной меткой удаляются из таблицы.

Я не уверен на сто процентов, но я думаю, что мне нужны KTables, а не KStreams, я прав? И как мне достичь скользящего окна, если это правильный способ достижения желаемых результатов?

Это код, который я использую сейчас. Он только читает из темы и печатает входящие сообщения.

private static List<String> printEvent(String o) {
    System.out.println(o);
    return Arrays.asList(o);
}

final StreamsBuilder builder = new StreamsBuilder();
    builder.<String, String>stream(srcTopic)
    .flatMapValues(value -> printEvent(value));

Я хотел бы знать, что я должен добавить для достижения желаемого результата, указанного выше, и куда я поместил его в свой код.

Заранее спасибо за помощь!

1 Ответ

0 голосов
/ 02 мая 2018

Да, вам нужен Ktable и раздвижное окно, я также рекомендую обратить внимание на функцию водяного знака , чтобы обработать сообщение о поздней доставке. Пример

KTable<Windowed<Key>, Value> oneMinuteWindowed = yourKStream

.groupByKey()

.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
        //where your adder can be as simple as (val, agg) -> agg + val
        //for primitive types or as complex as you need
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...