У меня есть вопрос о TimeWindows с Kafka Streams, некоторые концепции меня действительно смущают.
У нас есть тема, которая получает 10 миллионов событий в день, и мы сохраняем журнал 6 дней, так что в целом темасодержит 60 миллионов событий.
На самом деле нам интересны только события текущего дня, остальное мы оставляем на 5 дней только по причинам одитинга.
Теперь я создал KTable из него, я делаюзагрузить все операции и перебрать события.Как я упоминал ранее, на самом деле нас интересуют только события текущего дня, а не 60 миллионов событий, поэтому я поместил эти данные в окно определения KTable.
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
теперь, когда я загружаю все события с помощью следующего оператора, все выполняетсяхорошо.
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
Проблема с этим, в то время как в начале дня, это загрузит позволяет 1 миллион событий, а затем 10 миллионов, поэтому мне нужно перебрать более 10 миллионов событий, пока мы работаем в пакетном режимеЯ думал, что смогу еще больше оптимизировать это и загружать только события за последний час, поэтому для той же конфигурации KTable я попытался использовать следующее утверждение.
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
, но, к моему удивлению, это не вернуло никаких данных.
Может кто-нибудь объяснить, почему это не возвращает никаких результатов, я полагаю, что я неправильно истолковываю что-то в концепции TimeWindow.
Затем я провел некоторые дополнительные тесты и изменил свою конфигурацию KTable на следующую.
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
теперь этот запрос работает так, как мне хотелось бы
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
но я неЕсли я нахожусь на правильном пути ...
Если я буду использовать следующее утверждение для последней конфигурации KTable, это доставит мне 10 миллионов событий с текущего дня?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())