как сохранить сообщение JSONLines RDD от kafka - PullRequest
0 голосов
/ 27 декабря 2018

я получил 1 сообщение в кафке, состоящее из нескольких независимых строк json.я хочу передать это сообщение в формате hdf.проблема в том, что мой код сохраняет только самый первый json и игнорирует остальные.

пример 1 сообщение kafka (не несколько сообщений):

{"field": "1"}
{"field": "2"}
{"field": "3"}

часть кода scala:

 val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
      streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {

        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

        val df = spark.sqlContext.read.format(rdd.map(m => m._2))

        df.write.mode(SaveMode.Append).format("json").save(outputPath)
      }

    })

конкретное решение лежит в части rdd.map(m => m._2), где мне нужно отобразить все линии, а не только первую.мне кажется, что rdd сам по себе уже вырезан и не содержит остальных строк json.

1 Ответ

0 голосов
/ 18 февраля 2019

Я решил это, работая с текстом вместо JSON.основная разница лежит в преобразовании toDF():

stream.foreachRDD(rdd => {

      if (!rdd.isEmpty) {        
        //works as .txt file: 
        rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)


      }
    })
...