Что эквивалентно потокам java / scala kafka для соединения KSQL WHERE? - PullRequest
2 голосов
/ 03 мая 2019

Допустим, у меня есть 2 потока kafka (библиотека kafka-streams-scala, версия 2.2.0):

val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")

и их объединение:

val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) =>  MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))

Что эквивалентнопредложения WHERE доступно в KSQL ?(см. поток late_orders) для API потоков?Это хорошая идея, чтобы просто использовать stream3.filter?Будет ли этот подход иметь ту же эффективность, что и поток, созданный в KSQL?

1 Ответ

2 голосов
/ 06 мая 2019

Что эквивалентно предложению WHERE, доступному в KSQL?(см. поток late_orders) для API потоков?

Это:

  • KStream#filter(), который возвращает отфильтрованный KStream
  • KTable#filter(), который возвращает отфильтрованный KTable

https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#stateless-transformations

Это хорошая идея, просто использовать stream3.filter?

Да.

Будет ли этот подход иметь ту же эффективность, что и поток, созданный в KSQL?

Да.

...