Экспорт данных Spark в виде массива JSon с пользовательскими метаданными - PullRequest
0 голосов
/ 20 марта 2020

Я сохранил в своем MongoDB несколько JSON документов. Каждый документ выглядит так: {"businessData":{"capacity":{"fuelCapacity":282}, ..}.

После прочтения всех документов я хочу экспортировать их как действительный файл JSON. В частности:

// Read JSON data from the DB
val df: DataFrame = MongoSpark.load(sparkSession, readConfig)
df.show
// Export into the file system
df.coalesce(1).write.mode(SaveMode.Overwrite).json("export.json")
// The show command only shows the .json values
+--------------------+
|        businessData|
+--------------------+
|[[282],0,[true,20...|
|[[280],0,[true,20...|
|[[290],0,[true,20...|
|[[292],0,[true,20...|
|[[282],16,[true,2...|
+--------------------+

// export.json
{"businessData":{"capacity":{"fuelCapacity":282}, ..}
{"businessData":{"capacity":{"fuelCapacity":280}, ..}
{"businessData":{"capacity":{"fuelCapacity":290}, ..}
{"businessData":{"capacity":{"fuelCapacity":292}, ..}
{"businessData":{"capacity":{"fuelCapacity":282}, ..}

Но когда я экспортирую в файловую систему, я хочу объединить эти 5 строк в массив, а также добавить некоторые пользовательские метаданные. Например:

{
  "metadata" : { "exportTime": "20/20/2020" , ...} 
  "allBusinessData" : [
    {"businessData":{"capacity":{"fuelCapacity":282}, ..},
    // all 5 rows from above
  ]
}

Я видел вопросы здесь и здесь , советующие против него. Они также частично отвечают на вопрос, так как не добавляют пользовательскую структуру json к экспорту.

Предполагая, однако, что это единственный способ продолжить, как я могу это сделать?

Большое спасибо!

1 Ответ

0 голосов
/ 21 марта 2020

Из Spark-2.2 +:

Вы можете попробовать использовать to_json (или) создать struct<array<...etc>> поле в искре, затем запишите df в формате json, чтобы получить требуемый вывод.

  • для выборочных данных, которые я принял exportedtime как current_timestamp ()

Example:

val df=spark.read.json(Seq("""[{"businessData":{"capacity":{"fuelCapacity":282}}},{"businessData":{"capacity":{"fuelCapacity":456}}}""").toDS)

//creating a struct field called metadata and write data in json format.
df.selectExpr("struct(current_timestamp() as exporttime,struct(collect_list(businessData) as businessData)as allBusinessData) as metadata").write.format("json").mode("overwrite").save("json_path")

//using .to_json to create json object in dataframe
df.selectExpr("to_json(struct(current_timestamp() as exporttime,struct(collect_list(businessData) as businessData)as allBusinessData))metadata").show(false)

//+-------------------------------------------------------------------------------------------------------------------------------------------------------+
//|metadata                                                                                                                                               |
//+-------------------------------------------------------------------------------------------------------------------------------------------------------+
//|{"exporttime":"2020-03-21T15:17:54.769-05:00","allBusinessData":{"businessData":[{"capacity":{"fuelCapacity":282}},{"capacity":{"fuelCapacity":456}}]}}|
//+-------------------------------------------------------------------------------------------------------------------------------------------------------+

//using  .toJSON to view json in shell(non-prod use only)
df.selectExpr("struct(current_timestamp() as exporttime,struct(collect_list(businessData) as businessData)as allBusinessData)metadata").toJSON.collect()

//Array[String] = Array({"metadata":{"exporttime":"2020-03-21T15:19:35.890-05:00","allBusinessData":{"businessData":[{"capacity":{"fuelCapacity":282}},{"capacity":{"fuelCapacity":456}}]}}})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...