Кафка Стрим - TimeWindows - PullRequest
       12

Кафка Стрим - TimeWindows

0 голосов
/ 13 декабря 2018

У меня есть вопрос о TimeWindows с Kafka Streams, некоторые концепции меня действительно смущают.

У нас есть тема, которая получает 10 миллионов событий в день, и мы сохраняем журнал 6 дней, так что в целом темасодержит 60 миллионов событий.

На самом деле нам интересны только события текущего дня, остальное мы оставляем на 5 дней только по причинам одитинга.

Теперь я создал KTable из него, я делаюзагрузить все операции и перебрать события.Как я упоминал ранее, на самом деле нас интересуют только события текущего дня, а не 60 миллионов событий, поэтому я поместил эти данные в окно определения KTable.

.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))

теперь, когда я загружаю все события с помощью следующего оператора, все выполняетсяхорошо.

store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())

Проблема с этим, в то время как в начале дня, это загрузит позволяет 1 миллион событий, а затем 10 миллионов, поэтому мне нужно перебрать более 10 миллионов событий, пока мы работаем в пакетном режимеЯ думал, что смогу еще больше оптимизировать это и загружать только события за последний час, поэтому для той же конфигурации KTable я попытался использовать следующее утверждение.

store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())

, но, к моему удивлению, это не вернуло никаких данных.

Может кто-нибудь объяснить, почему это не возвращает никаких результатов, я полагаю, что я неправильно истолковываю что-то в концепции TimeWindow.

Затем я провел некоторые дополнительные тесты и изменил свою конфигурацию KTable на следующую.

.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))

теперь этот запрос работает так, как мне хотелось бы

store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())

но я неЕсли я нахожусь на правильном пути ...

Если я буду использовать следующее утверждение для последней конфигурации KTable, это доставит мне 10 миллионов событий с текущего дня?

store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())

1 Ответ

0 голосов
/ 13 декабря 2018

Когда вы используете интерактивные запросы в оконном хранилище, временной диапазон применяется к отметке времени начала окна.Таким образом, если у вас есть 1-дневное окно, и запросите данные с отметкой времени начала окна от [now - 1 hour, now), вы не найдете подходящих окон, потому что в этом диапазоне времени окно не запускается.

...