Конвертировать FluemEvent в CaseClass - PullRequest
0 голосов
/ 19 декабря 2018
//Streaming read from Kafka
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerPool)
  .option("subscribe", topic)
  .option("startingOffsets", "latest")
  .load()

//Trying to decode it to a Dataset of type MyClass (avro generated class)
val result: DataSet[MyCaseClass] = df.mapPartitions(
  partition => {
    val flumeReader = new SpecificDatumReader(classOf[AvroFlumeEvent])
    val datumReader = new SpecificDatumReader(classOf[MyCaseClass])
    partition
      .map(
        row => {
          val flumeBytes = row.getAs[Array[Byte]]("value")
          val flumeBinaryDecoder =
            DecoderFactory.get.binaryDecoder(flumeBytes, null)
          flumeReader.read(null, flumeBinaryDecoder)
        }
      )
      .map(flumeRecord => {
        val recordBytes = flumeRecord.getBody
        val recordBinaryDecoder =
          DecoderFactory.get.binaryDecoder(recordBytes.array(), null)
        datumReader.read(null, recordBinaryDecoder)
      })
  }
)

//Streaming write to HDFS
val streamingQuery = result.writeStream
  .format("parquet")
  .option("startingOffsets", "latest")
  .option("path", "/foo/bar")
  .option("checkpointLocation", "bar/foo")
  .start()
  .awaitTermination()

Когда я пытаюсь использовать паркетные инструменты для чтения, я вижу только двоичные данные.

Единственный способ увидеть фактические данные - это прочитать данные вот так:

result.map(_.getName.toString)(Encoders.STRING)
  1. Есть ли лучший способ сделать это?
  2. Как сделатьЯ читаю более сложные структуры данных, такие как Map?
  3. Вместо всего этого, могу ли я преобразовать AvroFlumeEvent непосредственно в класс случая scala.Если да, то как?
...