У меня проблемы с преобразованием root записи JSOM в фрейм данных для неопределенного количества записей.
У меня есть фрейм данных, сгенерированный с помощью JSON, подобный следующему:
val exampleJson = spark.createDataset(
"""
{"ITEM1512":
{"name":"Yin",
"address":{"city":"Columbus",
"state":"Ohio"}
},
"ITEM1518":
{"name":"Yang",
"address":{"city":"Working",
"state":"Marc"}
}
}""" :: Nil)
Когда я читаю его со следующей инструкцией
val itemsExample = spark.read.json(exampleJson)
Схема и сгенерированный фрейм данных выглядит следующим образом:
+-----------------------+-----------------------+
|ITEM1512 |ITEM1518 |
+-----------------------+-----------------------+
|[[Columbus, Ohio], Yin]|[[Working, Marc], Yang]|
+-----------------------+-----------------------+
root
|-- ITEM1512: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
|-- ITEM1518: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
Но я хочу сгенерировать что-то вроде этого:
+-----------------------+-----------------------+
|Item |Values |
+-----------------------+-----------------------+
|ITEM1512 |[[Columbus, Ohio], Yin]|
|ITEM1518 |[[Working, Marc], Yang]|
+-----------------------+-----------------------+
Итак, чтобы проанализировать эти JSON данные, мне нужно прочитать все столбцы и добавил их в запись в фрейме данных, потому что я пишу в качестве примера более двух элементов. Фактически, есть миллионы элементов, которые я хотел бы добавить во фрейм данных.
Я пытаюсь воспроизвести решение, найденное здесь: Как анализировать данные JSON с помощью Spark- Scala с этим кодом:
val columns:Array[String] = itemsExample.columns
var arrayOfDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = itemsExample.selectExpr("explode("+col_name+") as element")
.select(
lit(col_name).as("Item"),
col("element.E").as("Value"))
arrayOfDFs = arrayOfDFs :+ temp
}
val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
Но я столкнулся с проблемой, когда в примере, читающем другой вопрос, root находится в массиве, в моем случае root это StrucType. Поэтому генерируется следующее исключение:
org. apache .spark. sql .AnalysisException: невозможно разрешить 'explode (ITEM1512
)' из-за несоответствия типа данных: вход для функции разнесен должен быть типом массива или карты, а не структурой, имя: строка>