В DataFrameAPI используйте groupBy
и agg
с функцией collect_list
.
df1.show()
#+---+----------+
#| id|value_tier|
#+---+----------+
#|105| 5|
#|117| 5|
#|108| 10|
#|110| 12|
#|105| 10|
#|112| 10|
#+---+----------+
from pyspark.sql.functions import *
df1.groupBy("id").
agg(concat_ws(',',collect_list(col("value_tier"))).alias("value_tier")).\
filter(size(split(col("value_tier"),",")) <=1).\
groupBy("value_tier").\
agg(count(col("id")).alias("num"),concat_ws(",",collect_list(col("id"))).alias("ids")).\
show()
#+----------+---+-------+
#|value_tier|num| ids|
#+----------+---+-------+
#| 5| 1| 117|
#| 10| 2|112,108|
#| 12| 1| 110|
#+----------+---+-------+
#use collect_set to eliminate duplicates
df1.groupBy("id").
agg(concat_ws(',',collect_set(col("value_tier"))).alias("value_tier")).\
filter(size(split(col("value_tier"),",")) <=1).\
groupBy("value_tier").\
agg(count(col("id")).alias("num"),concat_ws(",",collect_list(col("id"))).alias("ids")).\
show()