Я новичок в Spark-Streaming и Kafka.С помощью следующего кода я могу использовать сообщения Кафки, которые приходят в формате JSON:
JavaDStream<String> jsonline = stream.map(new Function<ConsumerRecord<String,String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
jsonline.print();
Вывод на консоль:
-------------------------------------------
Time: 1527776685000 ms
-------------------------------------------
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:42 CEST 2018"}
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:44 CEST 2018"}
-------------------------------------------
Time: 1527776690000 ms
-------------------------------------------
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:45 CEST 2018"}
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:46 CEST 2018"}
Можно ли использоватьМетод "foreachRDD" для извлечения JSON-полей из сообщения?
jsonline.foreachRDD...
... поэтому я мог бы записать каждую JSON-запись в mySQL следующим образом:
insert into my_table (logtyp, logtypname, logzeitpunkt) values ("ERROR", "app.warning.exception", "Thu May 31 16:24:46 CEST 2018");