Почему я получаю эту ошибку компиляции: «не удалось найти неявное значение для kstream.Consumed» и как я могу это исправить? - PullRequest
0 голосов
/ 08 апреля 2020

У нас есть следующие зависимости:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"

Мы используем генератор кода для генерации Scala классов наблюдений из файлов схемы AVRO. Один такой сгенерированный класс наблюдений имеет в качестве одного из своих полей значение Either. В схеме AVRO это выражается с помощью type = [t1, t2], поэтому генерация выглядит приличной, то есть типом суммы: может быть тип t1 или тип t2.

Вопрос становится тем, чего не хватает на путь десериализации от topi c к классу дел (двоичный файл -> Avro Map -> класс дел).

В основном я получаю эту ошибку в настоящее время:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")

Первой мыслью было kafka-streams-avro-serde, но, возможно, эта библиотека обеспечивает только Serde [GenericRecord] для AVRO Map, не для тематических классов. Таким образом, одна из других зависимостей помогает с AVRO GenericRecord для отображения классов и обратно. У нас также есть некоторый рукописный код, который генерирует классы дел из схем, который, кажется, работает напрямую со спреем json.

Я думаю, что в (двоичном <-> Avro GenericRecord <-> случае преобразований, существует пробел, и это может быть тот факт, что в случае дела есть поле Either?

Сейчас я беру путь, чтобы попытаться создать экземпляр Serde [UserEvent] , Так что, в моем понимании, потребуется преобразование между UserEvent и AVRO GenericRecord, аналогично Map, а затем между AVRO Record и двоичным файлом - что, вероятно, покрывается зависимостью kafka-streams-avro-serde, как будто должен быть Serde [GenericRecord] или аналогичный.

Мудрый импорт, у нас есть это для импорта последствий:


import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed

Ответы [ 2 ]

0 голосов
/ 09 апреля 2020

На самом деле импорт отсутствовал. Теперь это работает, чтобы скомпилировать. Вот импорт:

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
0 голосов
/ 09 апреля 2020

Вы импортировали соответствующую упаковку?

import org.apache.kafka.streams.scala.ImplicitConversions._

Ср. https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala -dsl

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...