Python: Вот моя версия преобразования PySpark из версии scala. Результаты одинаковы.
from pyspark.sql.functions import col, max, struct
df = spark.read.option("header","true").csv("test.csv")
keys = [row.key for row in df.select(col("key")).distinct().collect()]
df2 = df.groupBy("id").pivot("key").agg(max("value"))
df2.show()
df2.printSchema()
for key in keys:
df2 = df2.withColumn(key, col(key).cast(key.split('_')[0]))
df2.show()
df2.printSchema()
df3 = df2.select("id", struct("int_key", "double_key", "string_key").alias("attributes"))
jsonArray = df3.toJSON().collect()
for json in jsonArray: print(json)
Scala: Я попытался разделить каждый тип значения, используя сначала pivot
.
val keys = df.select('key).distinct.rdd.map(r => r(0).toString).collect
val df2 = df.groupBy('id).pivot('key, keys).agg(max('value))
df2.show
df2.printSchema
Тогда DataFrame выглядит следующим образом:
+----+-------+----------+----------+
| id|int_key|double_key|string_key|
+----+-------+----------+----------+
|id_2| null| 2.0| null|
|id_1| 1| null| asd|
+----+-------+----------+----------+
root
|-- id: string (nullable = true)
|-- int_key: string (nullable = true)
|-- double_key: string (nullable = true)
|-- string_key: string (nullable = true)
, где тип каждого столбца - это все еще строки. Чтобы использовать его, я использовал foldLeft
,
val df3 = keys.foldLeft(df2) { (df, key) => df.withColumn(key, col(key).cast(key.split("_").head)) }
df3.show
df3.printSchema
, и у результата теперь есть совмещенные типы.
+----+-------+----------+----------+
| id|int_key|double_key|string_key|
+----+-------+----------+----------+
|id_2| null| 2.0| null|
|id_1| 1| null| asd|
+----+-------+----------+----------+
root
|-- id: string (nullable = true)
|-- int_key: integer (nullable = true)
|-- double_key: double (nullable = true)
|-- string_key: string (nullable = true)
Затем вы можете построить json, например
val df4 = df3.select('id, struct('int_key, 'double_key, 'string_key) as "attributes")
val jsonArray = df4.toJSON.collect
jsonArray.foreach(println)
, где последняя строка для проверки результата,
{"id":"id_2","attributes":{"double_key":2.0}}
{"id":"id_1","attributes":{"int_key":1,"string_key":"asd"}}