Используйте flatten, array_distinct
с функциями groupBy, collect_list
для этого случая.
Example:
df.show(10,False)
#+----+-----+---------------------------------------------------------+
#|year|month|json_col |
#+----+-----+---------------------------------------------------------+
#|2010|09 |[{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]|
#|2010|09 |[{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}] |
#|2007|10 |[{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}] |
#|2007|10 |[{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}] |
#+----+-----+---------------------------------------------------------+
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(StructType(
[
StructField('p_id', StringType(), True)
]
))
df1=df.withColumn("ff",from_json(col("json_col"),schema)).\
select("year","month",expr('transform(ff , f -> f.p_id)').alias("tmp"))
df1.groupBy("year","month").\
agg(to_json(array_distinct(flatten(collect_list(col("tmp"))))).alias("p_id")).\
show(10,False)
#+----+-----+-------------------------------------------+
#|year|month|p_id |
#+----+-----+-------------------------------------------+
#|2010|09 |["vfdvtbe","cdscs","usdvwq","ujhbe","yjev"]|
#|2007|10 |["ukerge","ikrtw","ikwca","unvwq","cqwcq"] |
#+----+-----+-------------------------------------------+