Необходимо разобрать файл json - PullRequest
0 голосов
/ 09 мая 2020
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

Необходимо проанализировать jsonfile с приведенной выше схемой с использованием искрового фрейма данных в структурированный формат. столбец ключей имеет имена столбцов, которые имеют значения в столбце «значения».

образец файла данных: {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'], 'values': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656' , '10 .5 ',' 121 ',' 64 ']]}

ожидаемый результат:

eid crt_ts id  upd_ts km  pivl distance speed type
  1  12343.0000.012 AAGA1567 1333.333.333 565656 10.5 121 64 logs

1 Ответ

0 голосов
/ 10 мая 2020

Пожалуйста, проверьте код ниже, я использовал groupBy, pivot и agg:

scala> val js = Seq(""" {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'], 'values': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656', '10.5', '121', '64']]}""").toDS
js: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val jdf = spark.read.json(js)
jdf: org.apache.spark.sql.DataFrame = [eid: string, keys: array<string> ... 2 more fields]

scala> jdf.printSchema
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)


scala> jdf.show(false)
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|eid|keys                                           |type|values                                                           |
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|1  |[crt_ts, id, upd_ts, km, pivl, distance, speed]|logs|[[12343.0000.012, AAGA1567, 1333.333.333, 565656, 10.5, 121, 64]]|
+---+-----------------------------------------------+----+-----------------------------------------------------------------+

scala> :paste
// Entering paste mode (ctrl-D to finish)

jdf.select($"eid",$"keys",explode($"values").as("values"),$"type")
.select($"eid",$"type",explode(arrays_zip($"keys",$"values")).as("azip"))
.select($"eid",$"azip.*",$"type")
.groupBy($"type",$"eid")
.pivot($"keys")
.agg(first("values"))
.show(false)

// Exiting paste mode, now interpreting.

+----+---+--------------+--------+--------+------+----+-----+------------+
|type|eid|crt_ts        |distance|id      |km    |pivl|speed|upd_ts      |
+----+---+--------------+--------+--------+------+----+-----+------------+
|logs|1  |12343.0000.012|121     |AAGA1567|565656|10.5|64   |1333.333.333|
+----+---+--------------+--------+--------+------+----+-----+------------+

...