Пожалуйста, проверьте код ниже, я использовал groupBy
, pivot
и agg
:
scala> val js = Seq(""" {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'], 'values': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656', '10.5', '121', '64']]}""").toDS
js: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val jdf = spark.read.json(js)
jdf: org.apache.spark.sql.DataFrame = [eid: string, keys: array<string> ... 2 more fields]
scala> jdf.printSchema
root
|-- eid: string (nullable = true)
|-- keys: array (nullable = true)
| |-- element: string (containsNull = true)
|-- type: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
scala> jdf.show(false)
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|eid|keys |type|values |
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|1 |[crt_ts, id, upd_ts, km, pivl, distance, speed]|logs|[[12343.0000.012, AAGA1567, 1333.333.333, 565656, 10.5, 121, 64]]|
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
scala> :paste
// Entering paste mode (ctrl-D to finish)
jdf.select($"eid",$"keys",explode($"values").as("values"),$"type")
.select($"eid",$"type",explode(arrays_zip($"keys",$"values")).as("azip"))
.select($"eid",$"azip.*",$"type")
.groupBy($"type",$"eid")
.pivot($"keys")
.agg(first("values"))
.show(false)
// Exiting paste mode, now interpreting.
+----+---+--------------+--------+--------+------+----+-----+------------+
|type|eid|crt_ts |distance|id |km |pivl|speed|upd_ts |
+----+---+--------------+--------+--------+------+----+-----+------------+
|logs|1 |12343.0000.012|121 |AAGA1567|565656|10.5|64 |1333.333.333|
+----+---+--------------+--------+--------+------+----+-----+------------+