Как уменьшить пару записей до 1 в KStream в KTable - PullRequest
0 голосов
/ 18 июня 2020

Это ввод Kafka topi c, который содержит ConnectionEvent:

ConnectionEvent("John", "123", "CONNECTED")
ConnectionEvent("John", "123", "DISCONNECTED")
ConnectionEvent("Anna", "222", "CONNECTED")
ConnectionEvent("Rohan", "334", "CONNECTED")
ConnectionEvent("Anna", "199", "CONNECTED")
ConnectionEvent("Anna", "255", "CONNECTED")
ConnectionEvent("Anna", "255", "DISCONNECTED")
ConnectionEvent("Anna", "222", "DISCONNECTED")

Логи потоковой передачи и сокращения c

Каждый элемент в топах c отправляется с использованием ключа сообщения в качестве идентификатора пользователя . Например, «Анна».
Поток должен обрабатываться следующим образом:

  • У Джона есть только 1 сеанс 123, который подключился и отключился. Итак, он не в сети.
  • У Рохана только 1 сеанс 334, который не отключен. Итак, он онлайн
  • У Анны 3 сеанса (222, 199, 255), из которых 2 отключены. Итак, она в сети.

В KTable должны быть следующие данные:

John Offline
Rohan Online
Anna Online

Что я пробовал это:

KTable<String, String> connectedSessions = stream.groupBy((k,v) -> v.getSessionId()) //Group by user and then by sessionId
            .reduce((agg, newVal) -> agg)  //Take latest value ie, reduce pair of records for each session to 1
            .filter(x -> x.getState == CONNECTED)  //Filter only session records which has CONNECTED has last state

Но теперь, как я могу разгруппировать составной ключ (user, sessionId) только для пользователя, а затем пометить пользователя как подключенный / автономный на основе количества идентификаторов сеанса с последним состоянием как CONNECTED?

1 Ответ

0 голосов
/ 18 июня 2020

AFAIU пользователь находится в сети, пока количество его событий CONNECTED больше, чем DISCONNECTED. Таким образом, вы можете агрегировать количество подключений в своем потоке и проверить, является ли оно положительным. Что-то вроде:

        KTable<String, String> connectedSessions = stream.groupByKey()
        .aggregate(
            () -> 0,
            (k, v, numberOfConnections) -> v.getState == CONNECTED ? numberOfConnections++ : numberOfConnections--)
        .mapValues((k, numberOfConnections) -> numberOfConnections > 0 ? "Online" : "Offline");
...