Здесь вы идете с решением. Не стесняйтесь спрашивать, если вам нужно что-то понять:
val data = spark.read.json("sample.json")
val readJsonDf = data.select($"index", $"type", $"id", $"source.link_data.source_id".as("source_id"), $"source.attribute_data.*")
readJsonDf.show()
Начальный выход:
+--------+--------+------+---------+--------------------+--------------------+
| index| type| id|source_id| first| second|
+--------+--------+------+---------+--------------------+--------------------+
|identity|identity|100000| 0011245|[2011,WrappedArra...|[2010,WrappedArra...|
+--------+--------+------+---------+--------------------+--------------------+
Затем я сделал динамическое преобразование, используя следующие строки кода:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
def transposeColumnstoRows(df: DataFrame, constantCols: Seq[String]): DataFrame = {
val (cols, types) = df.dtypes.filter{ case (c, _) => !constantCols.contains(c)}.unzip
//a check if the required columns that needs to be transformed to rows are of the same structure
require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")
val keyColsWIthValues = explode(array(
cols.map(c => struct(lit(c).alias("columnKey"), col(c).alias("value"))): _*
))
df.select(constantCols.map(col(_)) :+ keyColsWIthValues.alias("keyColsWIthValues"): _*)
}
val newDf = transposeColumnstoRows(readJsonDf, Seq("index","type","id","source_id"))
val requiredDf = newDf.select($"index",$"type",$"id",$"source_id",$"keyColsWIthValues.columnKey".as("attribute_data"),$"keyColsWIthValues.value.updated_at".as("updated_at"),$"keyColsWIthValues.value.val".as("val"))
requiredDf.show()
Окончательный вывод:
| index| type| id|source_id|attribute_data|updated_at| val|
+--------+--------+------+---------+--------------+----------+------+
|identity|identity|100000| 0011245| first| 2011|[true]|
|identity|identity|100000| 0011245| second| 2010|[true]|
Надеюсь, это решит вашу проблему!