Я пытаюсь реализовать поток Kafka, который обрабатывает сообщения о посещении веб-сайта (PageEntered
и PageLeft
) и создает другое сообщение (PageVisited
).
Событие ввода выглядит следующим образом:
case class WebsiteEvent(session: String, time: Long, event: Action)
sealed trait Action {
def url: String
}
final case class PageEntered(url: String) extends Action
final case class PageLeft(url: String) extends Action
и вывод:
case class PageVisited(session: String, url: String, duration: Long)
Я сгруппирован по сеансу и URL, но не могу понять, как сопоставить события PageEntered и PageLeft.
val builder = new StreamsBuilder
val stream = builder.stream[String, WebsiteEvent]("website-events-topic")
.groupBy((_, event) => event.session -> event.action.url)
... // stuck here
.map{ case ((session, url), (entered, left)) =>
PageVisited(session, url, left.time - entered.time)
}
.to("page-visited-topic")
Поскольку я новичок в Kafka-Streams, мне было интересно, движусь ли я в правильном направлении и как я могу этого достичь.
PS У меня нет контроля над структурой ввода.