Spark 2.0 (не 2.1) Dataset [Row] или Dataframe - выберите несколько столбцов для JSON - PullRequest
0 голосов
/ 23 мая 2018

У меня есть Spark Dataframe с 10 столбцами, и мне нужно сохранить его в Postgres / RDBMS.Таблица содержит 7 столбцов, а 7-й столбец принимает текст (в формате JSON) для дальнейшей обработки.

Как выбрать 6 столбцов и преобразовать оставшиеся 4 столбца в формате DF в формат JSON?

Если весь DF должен храниться как JSON, то мы могли бы использовать DF.write.format ("json"), но только последние 4 столбца должны быть в формате JSON.

Я попытался создать UDF (с помощью библиотеки Джексона или Lift), но не смог отправить 4 столбца в UDF.

для JSON, ключом является имя столбца DF, DFзначением столбца является значение.

например:

dataset name: ds_base
root
 |-- bill_id: string (nullable = true)
 |-- trans_id: integer (nullable = true)
 |-- billing_id: decimal(3,-10) (nullable = true)
 |-- asset_id: string (nullable = true)
 |-- row_id: string (nullable = true)
 |-- created: string (nullable = true)
 |-- end_dt: string (nullable = true)
 |-- start_dt: string (nullable = true)
 |-- status_cd: string (nullable = true)
 |-- update_start_dt: string (nullable = true)

I want to do,
ds_base
 .select ( $"bill_id",
    $"trans_id",
    $"billing_id",
    $"asset_id",
    $"row_id",
    $"created",
    ?? <JSON format of 4 remaining columns>
    )

1 Ответ

0 голосов
/ 23 мая 2018

Вы можете использовать struct и to_json:

import org.apache.spark.sql.functions.{to_json, struct}

to_json(struct($"end_dt", $"start_dt", $"status_cd", $"update_start_dt"))

В качестве обходного пути для устаревших версий Spark вы можете преобразовать весь объект в JSON и извлечь его:

import org.apache.spark.sql.functions.get_json_object

// List of column names to be kept as-is
val scalarColumns: Seq[String] = Seq("bill_id", "trans_id", ...)
// List of column names to be put in JSON
val jsonColumns: Seq[String] = Seq(
  "end_dt", "start_dt", "status_cd", "update_start_dt"
)

// Convert all records to JSON, keeping selected fields as a nested document
val json = df.select(
  scalarColumns.map(col _) :+ 
  struct(jsonColumns map col: _*).alias("json"): _*
).toJSON

json.select(
  // Extract selected columns from JSON field and cast to required types
  scalarColumns.map(c => 
    get_json_object($"value", s"$$.$c").alias(c).cast(df.schema(c).dataType)) :+ 
  // Extract JSON struct
  get_json_object($"value", "$.json").alias("json"): _*
)

Это будет работать только до тех пор, пока у вас есть атомарные типы.В качестве альтернативы вы можете использовать стандартную программу чтения JSON и указать схему для поля JSON.

import org.apache.spark.sql.types._

val combined = df.select(
  scalarColumns.map(col _) :+ 
  struct(jsonColumns map col: _*).alias("json"): _*
)

val newSchema = StructType(combined.schema.fields map {
   case StructField("json", _, _, _) => StructField("json", StringType)
   case s => s
})

spark.read.schema(newSchema).json(combined.toJSON.rdd)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...