Вы определенно можете сделать вышеописанное с помощью функции Window!
Это в PySpark, а не Scala, но почти нет разницы, когда используются только собственные функции Spark.
Приведенный ниже код работает только для столбца карты, который содержит 1 ключ, пару значений на строку, как это показано в примере данных, но его можно настроить для работы со столбцами карты с несколькими записями.
from pyspark.sql import Window
map_col = 'mappingColumn'
group_cols = ['id', 'fruit', 'misc']
# or, a lazier way if you have a lot of columns to group on
cols = df.columns # save as list
group_cols_2 = cols.remove('mappingCol') # remove what you're not grouping by
w = Window.partitionBy(group_cols)
# unpack map value and key into a pair struct column
df1 = df.withColumn(map_col , F.struct(F.map_keys(map_col)[0], F.map_values(map_col)[0]))
# Collect all key values into an array of structs, here each row
# contains the map entries for all rows in the group/window
df1 = df1.withColumn(map_col , F.collect_list(map_col).over(w))
# drop duplicate values, as you only want one row per group
df1 = df1.dropDuplicates(group_cols)
# return the values for map type
df1 = df1.withColumn(map_col , F.map_from_entries(map_col))
Вы можете сохранить выходные данные каждого шага в новом столбце, чтобы увидеть, как работает каждый шаг, как я сделал ниже.
from pyspark.sql import Window
map_col = 'mappingColumn'
group_cols = list('id', 'fruit', 'misc')
w = Window.partitionBy(group_cols)
df1 = df.withColumn('test', F.struct(F.map_keys(map_col)[0], F.map_values(map_col)[0]))
df1 = df1.withColumn('test1', F.collect_list('test').over(w))
df1 = df1.withColumn('test2', F.map_from_entries('test1'))
df1.show(truncate=False)
df1.printSchema()
df1 = df1.dropDuplicates(group_cols)