Вы упомянули в комментарии выше
У меня огромный набор данных в kafka. Я пытаюсь читать из kafka и писать в hdfs через spark, используя scala.Я использую json parserно не удалось разобрать из-за проблемы column3. так что нужно манипулировать сообщением, чтобы оно превратилось в json
Таким образом, у вас должен быть сбор искаженных jsons, как в вопросе.Я создал список как
val kafkaMsg = List("""{"column1":"abc","column2":"123","column3":qwe"r"ty,"column4":"abc123"}""", """{"column1":"defhj","column2":"45","column3":asd"f"gh,"column4":"def12d"}""")
, и вы читаете его через Spark, так что вы должны иметь rdds как
val rdd = sc.parallelize(kafkaMsg)
Все, что вам нужно, это какой-то синтаксический анализ искаженного текста json длясделайте его действительной строкой json как
val validJson = rdd.map(msg => msg.replaceAll("[}\"{]", "").split(",").map(_.split(":").mkString("\"", "\":\"", "\"")).mkString("{", ",", "}"))
validJson
должно быть
{"column1":"abc","column2":"123","column3":"qwerty","column4":"abc123"}
{"column1":"defhj","column2":"45","column3":"asdfgh","column4":"def12d"}
Вы можете создать фрейм данных из validJson rdd как
sqlContext.read.json(validJson).show(false)
, что должно дать вам
+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|abc |123 |qwerty |abc123 |
|defhj |45 |asdfgh |def12d |
+-------+-------+-------+-------+
Или вы можете сделать согласно вашему требованию.