Агрегирование (суммирование) количества событий из разных тем Кафки - PullRequest
0 голосов
/ 27 января 2020

В моем приложении есть три темы, которые получают некоторые события, принадлежащие пользователям:

Event Type A -> Topic A
Event Type B -> Topic B
Event Type C -> Topic C

Это будет пример потока сообщений:

Message(user 1 - event A - 2020-01-03) 
Message(user 2 - event A - 2020-01-03) 
Message(user 1 - event C - 2020-01-20)
Message(user 1 - event B - 2020-01-22)

Я хочу иметь возможность генерировать отчеты с общим количеством событий на пользователя в месяц, объединяя все события из трех тем, например:

User 1 - 2020-01 -> 3 total events
User 2 - 2020-01 -> 1 total events

Имея три KStreams (по одному на топи c), как я могу выполнять это сложение в месяц, чтобы иметь суммирование всех событий из трех разных тем? Можете показать код для этого?

1 Ответ

0 голосов
/ 29 января 2020

Поскольку вас интересует только подсчет, самым простым способом было бы просто сохранить идентификатор пользователя в качестве ключа и какое-нибудь фиктивное значение для каждого KStream, объединить все три потока и выполнить счетчик окон после этого (обратите внимание, что windows на основе календаря не поддерживаются "из коробки", вы можете использовать 31-дневное окно в качестве приблизительного значения или создать свой собственный windows):

// just map to dummy empty string (note, that `null` would not work
KStream<UserId, String> streamA = builder.stream("topic-A").mapValues(v -> "");
KStream<UserId, String> streamB = builder.stream("topic-B").mapValues(v -> "");
KStream<UserId, String> streamC = builder.stream("topic-C").mapValues(v -> "");

streamA.merge(streamB).merge(streamC).groupByKey().windowBy(...).count();

Вас также может заинтересовать оператор suppress().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...