Я хочу сделать несколько запросов к нескольким "строкам" строки json.Под множественной строкой json я подразумеваю, что строка json организована следующим образом:
[{
"id" : 5,
"name" : "Jemmy overy",
"data" : {...},
"link" : "http:...",
},
{
"id" : 6,
"name" : "John Smith",
"data" : {...},
"link" : "http:...",
}]
Вот что я пытался сделать:
Прежде всего, у меня есть несколько jsonфайлы , которые я получаю из HDFS:
val df = spark
.read
.format(com.databricks.spark.avro)
.load(namenodeURI)
Схема моих файлов json на данный момент организована в два поля:
Столбец, над которым я хочу работать, - это body , поэтому я использовал spark-sql, чтобы выбрать только этот столбец, представляющий собой данные в формате JSON.
df.createOrReplaceTempView("Rawdata")
import spark.implicits._
val strBody = spark
.sql("SELECT body from Rawdata")
.as[String]
.collect
.mkString
Помните, что у меня было несколько файлов json , поэтому у меня несколько тел.Я действительно не знаю, как получить результат из запроса, я пытался получить его в виде строки.
С этой точки зрения моя цель - выполнить некоторые запросы со строкой strBody.Сначала я преобразую его в набор данных, используя кодировщик:
val ds = spark.createDataset(strBody :: Nil)
val schema = Encoders.product[Root].schema
val ds2 =
spark
.read
.schema(schema)
.json(ds).as[Root]
Root - это класс case, соответствующий схеме Json в телах.
Когда я хочу напечатать каждое содержимоеопределенное поле моих тел выводит только содержимое поля первого тела, возвращенного из моего sql-запроса:
ds2.map(x => x.someField.someAnotherNestedField).foreach(println(_))
// print only one element, the first element from the strBody variable
Я пытался добавить префикс-суффикс и разделитель, когда получаю строку из запроса для сопоставлениясинтаксис нескольких строк json string:
val strBody = spark
.sql("SELECT body from Rawdata")
.as[String]
.collect
.mkString("[",",\n","]")
Поскольку я использую символ вставки "\ n", я задаю параметр multiline, когда я читаю набор данных в конце:
val ds = spark.createDataset(strBody :: Nil)
val schema = Encoders.product[Root].schema
val ds2 =
spark
.read
.option("multiline","true")
.schema(schema)
.json(ds).as[Root]
Но тогда я получаю исключение NullPointerException.На самом деле ds2 не содержит никакого значения.
У кого-нибудь есть идеи, как решить эту проблему?