Kafka Streams: Как сгенерировать несколько сессий из одного? - PullRequest
0 голосов
/ 18 октября 2018

Окно по сеансам из раздела просмотров страниц в KafkaStreams, логика агрегации может разбить сеанс (определенный по ключу и времени согласно объекту SessionWindows, давайте назовем их kSessions ) в один или несколько «сеансов» (определенныхкак совокупность просмотров страниц в соответствии с вашей бизнес-логикой, назовем их сеансами ).Примером логики kSession-breaker в данном случае может быть основанный на времени, например, «сократить сеансы до полуночи», но могут быть и другие, основанные на логике (например, получение определенного события прерывания сеанса, такого как заказ).

Вы хотите отслеживать эти сеансы, отправляя их в последующие темы, сохраняя связь с другими событиями, такими как просмотры страниц.

Как правильно смоделировать это в Kafka Streams?

Примеркод:

val sessionWindows = SessionWindows.`with`(TimeUnit.MINUTES.toMillis(30)).until(TimeUnit.DAYS.toMillis(360))
val pageviewWindowStream : SessionWindowedKStream[Key, Value] = topic.
  groupBy((k,v) => new Key(k.a, v.b) ).
  windowedBy(sessionWindows)

val sessionStore : KTable[Windowed[Key], List[AggValue]] = pageviewWindowStream.aggregate(List.empty[AggValue])(
  aggregator = (key: Key, value: Value, aggregator: AggValue) => { ...aggregator code },
  merger = (k, aggValue1, aggValue2) => { ...merger code } )

Примечание. В этой задаче логика сеансов в kSession будет взаимоисключающей с другими сеансами.

Итак, я могу определить AggValue как List[Session].Но затем, когда я запрашиваю его хранилище из его queryableStoreName, если я запрашиваю значение агрегатора для запроса fetch(k, t), я получу весь набор сеансов, полученных из данного значения (который будет List [Session] `и мне нужно будет пройти через них, чтобы получить соответствующую сессию.

Я думал об использовании чего-то вроде .toStream().flatMapValues(... flatmap the sessionsList ... ), но не могу найти способ.

Есть ли способ обойти это?это, или вы обязаны агрегировать для каждого сеанса и сохранять коллекцию произведенных сеансов из kSession как агрегированное значение?

...