В первый момент мне пришлось обработать информацию из текстового файла:
C1-4-, C2_4, С1 ______ 10,01 / 12 / 2015,30 / 12 / 2015,123456789, S 12345
Теперь мне нужно обработать ту же информацию, но в формате avro. Как я могу это сделать?
До того, как я использовал этот код:
createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest"),
Set(topic))
Затем я обработал сообщения следующим образом:
val logStream = kafkaStream.map(pair => pair._2)
logStream.foreachRDD(
rdd => {
val new_rdd = rdd.map(f = line => {
val ent = line.split(Utils.COMMA_DELIMITER)(0)
val cenReg = line.split(Utils.COMMA_DELIMITER)(1)
.......
.......
})
})
Теперь я пытался сделать что-то вроде этого:
createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc,
Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest"),
Set(topic))
Also I have to use a SchemaRegistry that the client give me.
How can I process the messages avro ?