Написание вывода JSON с массивом объектов с использованием Spark - PullRequest
0 голосов
/ 05 июня 2018

Я хочу переформатировать структуру json, используя искровой процесс, в структуру, содержащую массив объектов.Мой входной файл содержит строки:

{ "keyvals" : [[1,"a"], [2, "b"]] }, 
{ "keyvals" : [[3,"c"], [4, "d"]] }

, и я хочу, чтобы мой процесс вывел

{ "keyvals": [{"id": 1, "value": "a"}, {"id": 2, "value": "c"}] },
{ "keyvals": [{"id": 3, "value": "c"}, {"id": 4, "value": "d"}] }

Какой лучший способ сделать это?

Для просмотраПример ввода, который вы можете запустить в scala spark-shell:

var jsonStrings = Seq("""{"keyvals": [[1,"a"], [2, "b"]] }""", """{ "keyvals" : [[3,"c"], [4, "d"]] }""") 
var inputRDD = sc.parallelize(jsonStrings)
var df = spark.sqlContext.read.json(inputRDD)
// reformat goes here ?
df.write.json("myfile.json")

спасибо

1 Ответ

0 голосов
/ 05 июня 2018

Если вы проверите схему, вы увидите, что следующая структура фактически сопоставлена ​​с array<array<string>>

df.printSchema
// root
//  |-- keyvals: array (nullable = true)
//  |    |-- element: array (containsNull = true)
//  |    |    |-- element: string (containsNull = true)

Если количество элементов не фиксировано, вам потребуется udf:

import org.apache.spark.sql.functions._   

case class Record(id: Long, value: String)

val parse = udf((xs: Seq[Seq[String]]) => xs.map {
  case Seq(id, value) => Record(id.toLong, value)
})


val result = df.select(parse($"keyvals").alias("keyvals"))

и результат может быть конвертирован toJSON

result.toJSON.toDF("keyvals").show(false)
// +-------------------------------------------------------+
// |keyvals                                                |
// +-------------------------------------------------------+
// |{"keyvals":[{"id":1,"value":"a"},{"id":2,"value":"b"}]}|
// |{"keyvals":[{"id":3,"value":"c"},{"id":4,"value":"d"}]}|
// +-------------------------------------------------------+

или записан с использованием JSON Writer (result.write.json).

Также можно использовать строго типизированныйDataset:

df.as[Seq[Seq[String]]].map { xs => xs.map {
  case Seq(id, value) => Record(id.toLong, value)
}}.toDF("keyvals")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...