Преобразование структурированных данных в формат JSON с помощью Spark Scala - PullRequest
1 голос
/ 20 сентября 2019

У меня есть «Структурированные данные», как показано ниже, мне нужно преобразовать их в показанный ниже тип «Ожидаемые результаты».Моя «Схема вывода» также показана.Благодарим Вас за помощь в том, как мне этого добиться, используя код Spark Scala.

Примечание: Группировка по структурированным данным должна выполняться по столбцам SN и VIN.Должна быть одна строка для тех же SN и VIN, если либо SN, либо VIN изменяется, то данные должны присутствовать в следующей строке.

Структурированные данные:

+-----------------+-------------+--------------------+---+
|VIN              |ST           |SV                  |SN |
|FU74HZ501740XXXXX|1566799999225|44.0                |APP|
|FU74HZ501740XXXXX|1566800002758|61.0                |APP|
|FU74HZ501740XXXXX|1566800009446|23.39               |ASP|

Ожидаемые результаты:

enter image description here

Схема вывода:

val outputSchema = StructType(
  List(
    StructField("VIN", StringType, true),
    StructField("EVENTS", ArrayType(
        StructType(Array(
          StructField("SN", StringType, true),
          StructField("ST", IntegerType, true),
          StructField("SV", DoubleType, true)
        ))))
  )
)

Ответы [ 2 ]

3 голосов
/ 20 сентября 2019

В Spark 2.1 вы можете добиться этого, используя struct и collect_list.

val df_2 = Seq(
  ("FU74HZ501740XXXX",1566799999225.0,44.0,"APP"),
  ("FU74HZ501740XXXX",1566800002758.0,61.0,"APP"),
  ("FU74HZ501740XXXX",1566800009446.0,23.39,"ASP")
).toDF("vin","st","sv","sn") 

df_2.show(false)
+----------------+-----------------+-----+---+
|vin             |st               |sv   |sn |
+----------------+-----------------+-----+---+
|FU74HZ501740XXXX|1.566799999225E12|44.0 |APP|
|FU74HZ501740XXXX|1.566800002758E12|61.0 |APP|
|FU74HZ501740XXXX|1.566800009446E12|23.39|ASP|
+----------------+-----------------+-----+---+

Используйте collect_list с struct:

df_2.groupBy("vin","sn")
  .agg(collect_list(struct($"st", $"sv",$"sn")).as("events"))
  .withColumn("events",to_json($"events"))
  .drop(col("sn"))

Это дастнежелательный вывод:

+----------------+---------------------------------------------------------------------------------------------+
|vin             |events                                                                                       |
+----------------+---------------------------------------------------------------------------------------------+
|FU74HZ501740XXXX|[{"st":1.566800009446E12,"sv":23.39,"sn":"ASP"}]                                             |
|FU74HZ501740XXXX|[{"st":1.566799999225E12,"sv":44.0,"sn":"APP"},{"st":1.566800002758E12,"sv":61.0,"sn":"APP"}]|
+----------------+---------------------------------------------------------------------------------------------+
1 голос
/ 20 сентября 2019

Вы можете получить его через SparkSession.


val df = spark.read.json("/path/to/json/file/test.json")

здесь искрой является объект SparkSession

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...