Spark SQL CSV до JSON с различными типами данных - PullRequest
0 голосов
/ 29 февраля 2020

В настоящее время у меня есть данные в формате csv, такие как:

id,key,value
id_1,int_key,1
id_1,string_key,asd
id_1,double_key,null
id_2,double_key,2.0

Я хотел бы преобразовать эти атрибуты, сгруппированные по их идентификатору, с соответствующим им правильным типом данных в json.

Я ожидаю, что у меня будет json структура, подобная этой:

[{
  id: "id_1"
  attributes: {
    int_key: 1,
    string_key: "asd"
    double_key: null
  }
},
  id: "id_2"
  attributes: {
    double_key: 2.0
  }]

Мое текущее решение - собирать список с to_ json в Spark, который выглядел так:

SELECT to_json(id, map_from_arrays(collect_list(key), collect_list(value)) as attributes GROUP BY id)

Это будет работать, однако, я не могу найти способ привести их к правильным типам данных.

[{
  id: "id_1"
  attributes: {
    int_key: "1",
    string_key: "asd"
    double_key: "null"
  }
},
  id: "id_2"
  attributes: {
    double_key: "2.0"
  }]

Мне также нужно добавить поддержку нулевых значений. Но я уже нашел решение для этого. Я использую опцию ignoreNulls в to_ json. Поэтому, если я попытаюсь перечислить все атрибуты и привести их к соответствующему типу, я включу все определенные атрибуты. Я просто хочу включить атрибуты пользователя, определенные в файле csv.

Кстати, я использую Spark 2.4.

1 Ответ

0 голосов
/ 29 февраля 2020

Python: Вот моя версия преобразования PySpark из версии scala. Результаты одинаковы.

from pyspark.sql.functions import col, max, struct

df = spark.read.option("header","true").csv("test.csv")
keys = [row.key for row in df.select(col("key")).distinct().collect()]

df2 = df.groupBy("id").pivot("key").agg(max("value"))
df2.show()
df2.printSchema()

for key in keys:
    df2 = df2.withColumn(key, col(key).cast(key.split('_')[0]))
df2.show()
df2.printSchema()

df3 = df2.select("id", struct("int_key", "double_key", "string_key").alias("attributes"))
jsonArray = df3.toJSON().collect()

for json in jsonArray: print(json)

Scala: Я попытался разделить каждый тип значения, используя сначала pivot.

val keys = df.select('key).distinct.rdd.map(r => r(0).toString).collect
val df2 = df.groupBy('id).pivot('key, keys).agg(max('value))
df2.show
df2.printSchema

Тогда DataFrame выглядит следующим образом:

+----+-------+----------+----------+
|  id|int_key|double_key|string_key|
+----+-------+----------+----------+
|id_2|   null|       2.0|      null|
|id_1|      1|      null|       asd|
+----+-------+----------+----------+

root
 |-- id: string (nullable = true)
 |-- int_key: string (nullable = true)
 |-- double_key: string (nullable = true)
 |-- string_key: string (nullable = true)

, где тип каждого столбца - это все еще строки. Чтобы использовать его, я использовал foldLeft,

val df3 = keys.foldLeft(df2) { (df, key) => df.withColumn(key, col(key).cast(key.split("_").head)) }
df3.show
df3.printSchema

, и у результата теперь есть совмещенные типы.

+----+-------+----------+----------+
|  id|int_key|double_key|string_key|
+----+-------+----------+----------+
|id_2|   null|       2.0|      null|
|id_1|      1|      null|       asd|
+----+-------+----------+----------+

root
 |-- id: string (nullable = true)
 |-- int_key: integer (nullable = true)
 |-- double_key: double (nullable = true)
 |-- string_key: string (nullable = true)

Затем вы можете построить json, например

val df4 = df3.select('id, struct('int_key, 'double_key, 'string_key) as "attributes")
val jsonArray = df4.toJSON.collect
jsonArray.foreach(println)

, где последняя строка для проверки результата,

{"id":"id_2","attributes":{"double_key":2.0}}
{"id":"id_1","attributes":{"int_key":1,"string_key":"asd"}}
...