Вот как я это сделал.
Код
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
df = spark.createDataFrame([(1,'t1','a'),(1,'t2','b'),(2,'t3','b'),(2,'t4','c'),(2,'t5','b'),\
(3,'t6','a'),(3,'t7','a'),(3,'t8','a')],\
('id','time','cat'))
(df.groupBy(['id', 'cat'])
.agg(F.count(F.col('cat')).cast(StringType()).alias('counted'))
.select(['id', F.concat_ws('->', F.col('cat'), F.col('counted')).alias('arrowed')])
.groupBy('id')
.agg(F.collect_list('arrowed'))
.show()
)
Вывод
+-------+---------------------+
| id|collect_list(arrowed)|
+-------+---------------------+
| 1 | [a -> 1, b -> 1] |
| 3 | [a -> 3] |
| 2 | [b -> 2, c -> 1] |
+-------+---------------------+
Редактировать
(df.groupBy(['id', 'cat'])
.count()
.select(['id',F.create_map('cat', 'count').alias('map')])
.groupBy('id')
.agg(F.collect_list('map').alias('cat'))
.show()
)
#+---+--------------------+
#| id| cat|
#+---+--------------------+
#| 1|[[a -> 1], [b -> 1]]|
#| 3| [[a -> 3]]|
#| 2|[[b -> 2], [c -> 1]]|
#+---+--------------------+