я получил 1 сообщение в кафке, состоящее из нескольких независимых строк json.я хочу передать это сообщение в формате hdf.проблема в том, что мой код сохраняет только самый первый json и игнорирует остальные.
пример 1 сообщение kafka (не несколько сообщений):
{"field": "1"}
{"field": "2"}
{"field": "3"}
часть кода scala:
val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.sqlContext.read.format(rdd.map(m => m._2))
df.write.mode(SaveMode.Append).format("json").save(outputPath)
}
})
конкретное решение лежит в части rdd.map(m => m._2)
, где мне нужно отобразить все линии, а не только первую.мне кажется, что rdd
сам по себе уже вырезан и не содержит остальных строк json.