Допустим, у меня есть 2 потока kafka (библиотека kafka-streams-scala, версия 2.2.0):
val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")
и их объединение:
val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) => MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))
Что эквивалентнопредложения WHERE доступно в KSQL ?(см. поток late_orders) для API потоков?Это хорошая идея, чтобы просто использовать stream3.filter?Будет ли этот подход иметь ту же эффективность, что и поток, созданный в KSQL?