Как преобразовать Java API-интерфейс KStream в Scala-API KStream? - PullRequest
1 голос
/ 27 сентября 2019

В нашей библиотеке есть процессоры данных (код Scala), которые используют типичный поток kafka:

import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream

Причиной перехода на kafka-stream-scala являются странные типы возврата всякий раз, когда я использую такие функции, как groupBy, selectByKey,и т. д.

(builder: StreamsBuilder, stream: KStream[String, Event]) =>
 val wgCreatedStream = stream.groupBy((_,v) => 
  v.payload match {
   case x:WorkgroupCreated => x.id //this is a String
  })

Возьмите этот код для примера.StreamsBuilder приводит к KGroupedStream[Nothing,Event]

Когда я использовал этот импорт:

import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}

тип возвращаемого значения окончательно изменился на KGroupedStream[String,Event]

Что ядействительно надеемся на: Чтобы использовать kafka-stream-scala без рефакторинга нашего датапроцессора

Если ДА, это фантастика, особенно.если есть примеры!
Если НЕТ ... это будет болезненное путешествие.:( (но все равно спасибо)

1 Ответ

1 голос
/ 27 сентября 2019

Хорошо, после разговора с моим коллегой по работе я сразу же нашел ответ.-___-

из org.apache.kafka.streams.scala.ImplicitConversions,
есть wrapKStream(), который преобразует
streams.kstream.KStream в streams.scala.kstream.KStream

Что касается обратного, просто вызовитеinner метод напрямую.То же самое касается KTable.

...