У меня есть рабочий процесс для приема и десериализации сообщения kafka avro с использованием схемы reg.Он прекрасно работает в REPL, но когда я пытаюсь скомпилировать, я получаю
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] .map(x => {
Я не уверен, нужно ли мне изменять мой объект, но зачем мне это нужно, если REPL работает нормально.
object AgentDeserializerWrapper {
val props = new Properties()
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val vProps = new kafka.utils.VerifiableProperties(props)
val deser = new KafkaAvroDecoder(vProps)
val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueNameAgentRead)
val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
}
case class DeserializedFromKafkaRecord( value: String)
import spark.implicits._
val agentStringDF = spark
.readStream
.format("kafka")
.option("subscribe", "agent")
.options(kafkaParams)
.load()
.map(x => {
DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
})