Kafka Streams - Как сопоставить 2 соответствующих сообщения для создания другого сообщения - PullRequest
1 голос
/ 03 ноября 2019

Я пытаюсь реализовать поток Kafka, который обрабатывает сообщения о посещении веб-сайта (PageEntered и PageLeft) и создает другое сообщение (PageVisited).

Событие ввода выглядит следующим образом:

case class WebsiteEvent(session: String, time: Long, event: Action)
sealed trait Action {
  def url: String
}
final case class PageEntered(url: String) extends Action
final case class PageLeft(url: String) extends Action

и вывод:

case class PageVisited(session: String, url: String, duration: Long)

Я сгруппирован по сеансу и URL, но не могу понять, как сопоставить события PageEntered и PageLeft.

val builder = new StreamsBuilder
val stream = builder.stream[String, WebsiteEvent]("website-events-topic")
                    .groupBy((_, event) => event.session -> event.action.url)
                    ... // stuck here
                    .map{ case ((session, url), (entered, left)) =>
                      PageVisited(session, url, left.time - entered.time)
                    }
                    .to("page-visited-topic")

Поскольку я новичок в Kafka-Streams, мне было интересно, движусь ли я в правильном направлении и как я могу этого достичь.

PS У меня нет контроля над структурой ввода.

...