Вы можете использовать struct
и to_json
:
import org.apache.spark.sql.functions.{to_json, struct}
to_json(struct($"end_dt", $"start_dt", $"status_cd", $"update_start_dt"))
В качестве обходного пути для устаревших версий Spark вы можете преобразовать весь объект в JSON и извлечь его:
import org.apache.spark.sql.functions.get_json_object
// List of column names to be kept as-is
val scalarColumns: Seq[String] = Seq("bill_id", "trans_id", ...)
// List of column names to be put in JSON
val jsonColumns: Seq[String] = Seq(
"end_dt", "start_dt", "status_cd", "update_start_dt"
)
// Convert all records to JSON, keeping selected fields as a nested document
val json = df.select(
scalarColumns.map(col _) :+
struct(jsonColumns map col: _*).alias("json"): _*
).toJSON
json.select(
// Extract selected columns from JSON field and cast to required types
scalarColumns.map(c =>
get_json_object($"value", s"$$.$c").alias(c).cast(df.schema(c).dataType)) :+
// Extract JSON struct
get_json_object($"value", "$.json").alias("json"): _*
)
Это будет работать только до тех пор, пока у вас есть атомарные типы.В качестве альтернативы вы можете использовать стандартную программу чтения JSON и указать схему для поля JSON.
import org.apache.spark.sql.types._
val combined = df.select(
scalarColumns.map(col _) :+
struct(jsonColumns map col: _*).alias("json"): _*
)
val newSchema = StructType(combined.schema.fields map {
case StructField("json", _, _, _) => StructField("json", StringType)
case s => s
})
spark.read.schema(newSchema).json(combined.toJSON.rdd)