Я новичок в Stackoverflow, так что простите, если вопрос задан плохо. Любая помощь / вдохновение высоко ценится!
Я использую потоки Кафки для фильтрации входящих данных в мою базу данных. Входящие сообщения выглядят как {"ID":"X","time":"HH:MM"}
и несколько других параметров, не имеющих отношения к данному случаю. Мне удалось запустить приложение Java, которое читает из темы и распечатывает входящие сообщения. Теперь я хочу использовать KTables (?) Для группировки входящих сообщений с одинаковым идентификатором, а затем использовать окно сеанса для группировки таблицы по временным интервалам. Я хочу, чтобы временное окно продолжалось непрерывно по оси X.
Прежде всего, конечно, нужно запустить KTable для подсчета входящих сообщений с одинаковым идентификатором. То, что я хотел бы сделать, должно привести к чему-то вроде этого:
ID Count
X 1
Y 3
Z 1
, который постоянно обновляется, поэтому сообщения с устаревшей временной меткой удаляются из таблицы.
Я не уверен на сто процентов, но я думаю, что мне нужны KTables, а не KStreams, я прав? И как мне достичь скользящего окна, если это правильный способ достижения желаемых результатов?
Это код, который я использую сейчас. Он только читает из темы и печатает входящие сообщения.
private static List<String> printEvent(String o) {
System.out.println(o);
return Arrays.asList(o);
}
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(srcTopic)
.flatMapValues(value -> printEvent(value));
Я хотел бы знать, что я должен добавить для достижения желаемого результата, указанного выше, и куда я поместил его в свой код.
Заранее спасибо за помощь!