CreateDirectStream с сообщениями avro - PullRequest
1 голос
/ 18 апреля 2019

В первый момент мне пришлось обработать информацию из текстового файла: C1-4-, C2_4, С1 ______ 10,01 / 12 / 2015,30 / 12 / 2015,123456789, S 12345

Теперь мне нужно обработать ту же информацию, но в формате avro. Как я могу это сделать?

До того, как я использовал этот код:

createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
  Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" ->    "smallest"),
  Set(topic))

Затем я обработал сообщения следующим образом:

val logStream = kafkaStream.map(pair => pair._2)

logStream.foreachRDD(
  rdd => {
    val new_rdd = rdd.map(f = line => {

      val ent = line.split(Utils.COMMA_DELIMITER)(0)
      val cenReg = line.split(Utils.COMMA_DELIMITER)(1)
      .......

      .......
    })
  })

Теперь я пытался сделать что-то вроде этого:

  createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc,
  Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest"),
  Set(topic))


Also I have to use a SchemaRegistry that the client give me.


How can I process the messages avro ?
...