Окно по сеансам из раздела просмотров страниц в KafkaStreams, логика агрегации может разбить сеанс (определенный по ключу и времени согласно объекту SessionWindows, давайте назовем их kSessions ) в один или несколько «сеансов» (определенныхкак совокупность просмотров страниц в соответствии с вашей бизнес-логикой, назовем их сеансами ).Примером логики kSession-breaker в данном случае может быть основанный на времени, например, «сократить сеансы до полуночи», но могут быть и другие, основанные на логике (например, получение определенного события прерывания сеанса, такого как заказ).
Вы хотите отслеживать эти сеансы, отправляя их в последующие темы, сохраняя связь с другими событиями, такими как просмотры страниц.
Как правильно смоделировать это в Kafka Streams?
Примеркод:
val sessionWindows = SessionWindows.`with`(TimeUnit.MINUTES.toMillis(30)).until(TimeUnit.DAYS.toMillis(360))
val pageviewWindowStream : SessionWindowedKStream[Key, Value] = topic.
groupBy((k,v) => new Key(k.a, v.b) ).
windowedBy(sessionWindows)
val sessionStore : KTable[Windowed[Key], List[AggValue]] = pageviewWindowStream.aggregate(List.empty[AggValue])(
aggregator = (key: Key, value: Value, aggregator: AggValue) => { ...aggregator code },
merger = (k, aggValue1, aggValue2) => { ...merger code } )
Примечание. В этой задаче логика сеансов в kSession будет взаимоисключающей с другими сеансами.
Итак, я могу определить AggValue
как List[Session]
.Но затем, когда я запрашиваю его хранилище из его queryableStoreName
, если я запрашиваю значение агрегатора для запроса fetch(k, t)
, я получу весь набор сеансов, полученных из данного значения (который будет List [Session] `и мне нужно будет пройти через них, чтобы получить соответствующую сессию.
Я думал об использовании чего-то вроде .toStream().flatMapValues(... flatmap the sessionsList ... )
, но не могу найти способ.
Есть ли способ обойти это?это, или вы обязаны агрегировать для каждого сеанса и сохранять коллекцию произведенных сеансов из kSession
как агрегированное значение?