Очистить записи KTable после отправки значений в тему вывода - PullRequest
1 голос
/ 06 апреля 2020

У меня есть БД, которая хранит просмотры страниц для каждой веб-страницы. Это достигается за счет использования Kafka topi c с именем pageviews, где каждое сообщение имеет имя страницы в виде key и value в качестве количества просмотров со времени предыдущего сообщения .

Это пример сообщений, ожидаемых в pageviews topi c:

просмотров страниц topi c:

key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...

Потребитель pageviews добавляет каждый раз вышеуказанный values к таблице PAGEVIEWS.

Теперь я создаю производителя pageviews topi c. Источником данных этого приложения является viewstream topi c, где для каждого представления создается одно сообщение, например:

viewstream topi c:

key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...

В приложении Kafka Stream у меня следующая топология:

PageViewsStreamer:

builder.stream("viewstream")
    .groupByKey()
    .aggregate(...) // this builds a KTable with the sums of views per page
    .toStream()
    .to("pageviews")

У меня 2 проблемы с этой топологией:

  1. KTable, который содержит агрегаты, не получает сброс / очистку после выдачи выходного сообщения в pageviews, поэтому, просто добавив агрегированное значение в таблицу БД, мы получим неправильные результаты. Как мне добиться, чтобы каждое сообщение, отправленное на pageviews, не включало просмотры, уже отправленные в предыдущих сообщениях?

  2. Я хочу, чтобы pageviews сообщения отправлялись каждые 15 минут (по умолчанию примерно каждые 30 секунд).

Я пытаюсь работать с окнами для обоих, но пока не получилось.

1 Ответ

2 голосов
/ 06 апреля 2020

Вы можете добиться этого поведения, используя 15-минутное переключение windows и подавлять результаты до тех пор, пока не пройдет время windows (не забудьте добавить льготное время, чтобы ограничить запоздалые события, которые будет принимать предыдущее окно) , Подробнее здесь . Я бы сделал что-то вроде этого:

builder.stream("viewstream")
                .groupByKey()
                //window by a 15-minute time windows, accept event late in 30 second, you can set grace time smaller
                .windowedBy(TimeWindows.of(Duration.ofMinutes(15)).grace(Duration.ofSeconds(30)))
                .aggregate(...) // this builds a KTable with the sums of views per page
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                //re-select key : from window to key
                .selectKey((key, value) -> key.key())
                .to("pageviews");
...