Обработка данных JSON с Kafka и Spark Streaming - PullRequest
0 голосов
/ 31 мая 2018

Я новичок в Spark-Streaming и Kafka.С помощью следующего кода я могу использовать сообщения Кафки, которые приходят в формате JSON:

JavaDStream<String> jsonline = stream.map(new Function<ConsumerRecord<String,String>, String>() {
        @Override
        public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
        return kafkaRecord.value();
        }
    });

jsonline.print();

Вывод на консоль:

-------------------------------------------
Time: 1527776685000 ms
-------------------------------------------
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:42 CEST 2018"}
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:44 CEST 2018"}
-------------------------------------------
Time: 1527776690000 ms
-------------------------------------------
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:45 CEST 2018"}
{"logtyp":"ERROR","LogTypName":"app.warning.exception","LogZeitpunkt":"Thu May 31 16:24:46 CEST 2018"}

Можно ли использоватьМетод "foreachRDD" для извлечения JSON-полей из сообщения?

jsonline.foreachRDD...

... поэтому я мог бы записать каждую JSON-запись в mySQL следующим образом:

insert into my_table (logtyp, logtypname, logzeitpunkt) values ("ERROR", "app.warning.exception", "Thu May 31 16:24:46 CEST 2018");
...