Из потока (k, v) я хочу вычислить поток (k, (v, f)), где f - частота появления данного ключа за последние n секунд.Дайте тему (t1), если я использую оконную таблицу для вычисления частоты:
KTable<Windowed<Integer>,Long> t1_velocity_table = t1_stream.groupByKey().windowedBy(TimeWindows.of(n*1000)).count();
Это даст оконную таблицу с частотой каждого ключа.
Предполагая, что я не будубыть в состоянии соединиться с оконным ключом, вместо таблицы выше я сопоставляю поток с таблицей с простым ключом:
t1_Stream.groupByKey()
.windowedBy(TimeWindows.of( n*1000)).count()
.toStream().map((k,v)->new KeyValue<>(k.key(), Math.toIntExact(v))).to(frequency_topic);
KTable<Integer,Integer> t1_frequency_table = builder.table(frequency_topic);
Если я теперь ищу в этой таблице, когда новый ключ прибывает в мой потокКак я узнаю, будет ли эта таблица поиска обновляться первой или сначала произойдет объединение (что приведет к тому, что устаревшая частота будет добавлена в запись, а не текущая обновленная).Будет ли лучше создать поток вместо таблицы, а затем выполнить оконное соединение?Я хочу посмотреть в таблице что-то вроде этого:
KStream<Integer,Tuple<Integer,Integer>> t1_enriched = t1_Stream.join(t1_frequency_table, (l,r) -> new Tuple<>(l, r));
Так что вместо того, чтобы иметь только поток (k, v), у меня есть поток (k, (v, f)), где f эточастота нажатия клавиши k за последние n секунд.
Есть какие-нибудь мысли о том, как правильно достичь этого?Благодарю.