Я не думаю, что вы можете получить именно такой результат, но вы можете подойти ближе. Проблема заключается в именах ваших ключей для столбца 4. В Spark структурам необходимо иметь фиксированный набор заранее известных столбцов. Но оставим это на потом, во-первых, агрегация:
import pyspark
from pyspark.sql import functions as F
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
data = [('A', 'B', 1), ('A', 'B', 2), ('A', 'C', 1)]
columns = ['Column1', 'Column2', 'Column3']
data = spark.createDataFrame(data, columns)
data.createOrReplaceTempView("data")
data.show()
# Result
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| A| B| 1|
| A| B| 2|
| A| C| 1|
+-------+-------+-------+
nested = spark.sql("SELECT Column1, Column2, STRUCT(COLLECT_LIST(Column3) AS data) AS Column4 FROM data GROUP BY Column1, Column2")
nested.toJSON().collect()
# Result
['{"Column1":"A","Column2":"C","Column4":{"data":[1]}}',
'{"Column1":"A","Column2":"B","Column4":{"data":[1,2]}}']
Что является почти тем, что вы хотите, верно? Проблема заключается в том, что если вы заранее не знаете названия своих ключей (то есть значений в столбце 2), Spark не сможет определить структуру ваших данных. Кроме того, я не совсем уверен, как вы можете использовать значение столбца в качестве ключа для структуры, если вы не используете UDF (возможно, с PIVOT
?):
datatype = 'struct<B:array<bigint>,C:array<bigint>>' # Add any other potential keys here.
@F.udf(datatype)
def replace_struct_name(column2_value, column4_value):
return {column2_value: column4_value['data']}
nested.withColumn('Column5', replace_struct_name(F.col("Column2"), F.col("Column4"))).toJSON().collect()
# Output
['{"Column1":"A","Column2":"C","Column4":{"C":[1]}}',
'{"Column1":"A","Column2":"B","Column4":{"B":[1,2]}}']
Это, конечно, имеет тот недостаток, что количество ключей должно быть дискретным и известным заранее, в противном случае другие значения ключей будут игнорироваться.