У нас есть следующие зависимости:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
Мы используем генератор кода для генерации Scala классов наблюдений из файлов схемы AVRO. Один такой сгенерированный класс наблюдений имеет в качестве одного из своих полей значение Either. В схеме AVRO это выражается с помощью type = [t1, t2], поэтому генерация выглядит приличной, то есть типом суммы: может быть тип t1 или тип t2.
Вопрос становится тем, чего не хватает на путь десериализации от topi c к классу дел (двоичный файл -> Avro Map -> класс дел).
В основном я получаю эту ошибку в настоящее время:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")
Первой мыслью было kafka-streams-avro-serde, но, возможно, эта библиотека обеспечивает только Serde [GenericRecord] для AVRO Map, не для тематических классов. Таким образом, одна из других зависимостей помогает с AVRO GenericRecord для отображения классов и обратно. У нас также есть некоторый рукописный код, который генерирует классы дел из схем, который, кажется, работает напрямую со спреем json.
Я думаю, что в (двоичном <-> Avro GenericRecord <-> случае преобразований, существует пробел, и это может быть тот факт, что в случае дела есть поле Either?
Сейчас я беру путь, чтобы попытаться создать экземпляр Serde [UserEvent] , Так что, в моем понимании, потребуется преобразование между UserEvent и AVRO GenericRecord, аналогично Map, а затем между AVRO Record и двоичным файлом - что, вероятно, покрывается зависимостью kafka-streams-avro-serde, как будто должен быть Serde [GenericRecord] или аналогичный.
Мудрый импорт, у нас есть это для импорта последствий:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed