Это ввод Kafka topi c, который содержит ConnectionEvent
:
ConnectionEvent("John", "123", "CONNECTED")
ConnectionEvent("John", "123", "DISCONNECTED")
ConnectionEvent("Anna", "222", "CONNECTED")
ConnectionEvent("Rohan", "334", "CONNECTED")
ConnectionEvent("Anna", "199", "CONNECTED")
ConnectionEvent("Anna", "255", "CONNECTED")
ConnectionEvent("Anna", "255", "DISCONNECTED")
ConnectionEvent("Anna", "222", "DISCONNECTED")
Логи потоковой передачи и сокращения c
Каждый элемент в топах c отправляется с использованием ключа сообщения в качестве идентификатора пользователя . Например, «Анна».
Поток должен обрабатываться следующим образом:
- У Джона есть только 1 сеанс 123, который подключился и отключился. Итак, он не в сети.
- У Рохана только 1 сеанс 334, который не отключен. Итак, он онлайн
- У Анны 3 сеанса (222, 199, 255), из которых 2 отключены. Итак, она в сети.
В KTable должны быть следующие данные:
John Offline
Rohan Online
Anna Online
Что я пробовал это:
KTable<String, String> connectedSessions = stream.groupBy((k,v) -> v.getSessionId()) //Group by user and then by sessionId
.reduce((agg, newVal) -> agg) //Take latest value ie, reduce pair of records for each session to 1
.filter(x -> x.getState == CONNECTED) //Filter only session records which has CONNECTED has last state
Но теперь, как я могу разгруппировать составной ключ (user, sessionId) только для пользователя, а затем пометить пользователя как подключенный / автономный на основе количества идентификаторов сеанса с последним состоянием как CONNECTED?