У меня есть БД, которая хранит просмотры страниц для каждой веб-страницы. Это достигается за счет использования Kafka topi c с именем pageviews
, где каждое сообщение имеет имя страницы в виде key
и value
в качестве количества просмотров со времени предыдущего сообщения .
Это пример сообщений, ожидаемых в pageviews
topi c:
просмотров страниц topi c:
key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...
Потребитель pageviews
добавляет каждый раз вышеуказанный values
к таблице PAGEVIEWS.
Теперь я создаю производителя pageviews
topi c. Источником данных этого приложения является viewstream
topi c, где для каждого представления создается одно сообщение, например:
viewstream topi c:
key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...
В приложении Kafka Stream у меня следующая топология:
PageViewsStreamer:
builder.stream("viewstream")
.groupByKey()
.aggregate(...) // this builds a KTable with the sums of views per page
.toStream()
.to("pageviews")
У меня 2 проблемы с этой топологией:
KTable, который содержит агрегаты, не получает сброс / очистку после выдачи выходного сообщения в pageviews
, поэтому, просто добавив агрегированное значение в таблицу БД, мы получим неправильные результаты. Как мне добиться, чтобы каждое сообщение, отправленное на pageviews
, не включало просмотры, уже отправленные в предыдущих сообщениях?
Я хочу, чтобы pageviews
сообщения отправлялись каждые 15 минут (по умолчанию примерно каждые 30 секунд).
Я пытаюсь работать с окнами для обоих, но пока не получилось.