enocder Issue - Spark Структурированный поток - работает только в репле - PullRequest
0 голосов
/ 30 мая 2018

У меня есть рабочий процесс для приема и десериализации сообщения kafka avro с использованием схемы reg.Он прекрасно работает в REPL, но когда я пытаюсь скомпилировать, я получаю

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]       .map(x => {

Я не уверен, нужно ли мне изменять мой объект, но зачем мне это нужно, если REPL работает нормально.

object AgentDeserializerWrapper {
      val props = new Properties()
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
      val vProps = new kafka.utils.VerifiableProperties(props)
      val deser = new KafkaAvroDecoder(vProps)
      val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueNameAgentRead)
      val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
    }

    case class DeserializedFromKafkaRecord( value: String)

    import spark.implicits._

    val agentStringDF = spark
      .readStream
      .format("kafka")
      .option("subscribe", "agent")
      .options(kafkaParams)
      .load()
      .map(x => {
        DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
      })

1 Ответ

0 голосов
/ 31 мая 2018

Добавить как [DeserializedFromKafkaRecord], чтобы статически набрать ваш набор данных:

val agentStringDF = spark
      .readStream
      .format("kafka")
      .option("subscribe", "agent")
      .options(kafkaParams)
      .load()
      .as[DeserializedFromKafkaRecord]
      .map(x => {
        DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
      })
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...