Вы можете добиться всего с помощью функций pyspark и window , как показано ниже.
Создание фрейма данных pyspark:
import pandas as pd
data = {
'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
}
pddf = pd.DataFrame(data)
df = spark.createDataFrame(pddf)
df.show()
Вывод:
+-----------+----+------+
|Instruments|Sers|Sounds|
+-----------+----+------+
| A|Wind| 42|
| B|Tool| 21|
| A|Wind| 34|
| B|Wind| 56|
| A|Tool| 43|
| C|Tool| 61|
| C|Tool| 24|
| B|Wind| 23|
+-----------+----+------+
Расчеты:
from pyspark.sql import Window
from pyspark.sql import functions as F
wI = Window.partitionBy('Instruments')
wIS = Window.partitionBy('Instruments', 'Sers')
df = df.withColumn('q', F.expr('percentile_approx(Sounds, 0.99)').over(wI))
df = df.filter(df.Sounds < df.q)
#this is our marker for Chosen_Sers
#a higher number indicates that this is the most-abundant sers
df = df.withColumn('tmpCount', F.count('Sers').over(wIS))
#this is the most-abundant sers as string
df = df.withColumn('Chosen_Sers', F.first('Sers').over(wI.orderBy(F.desc('tmpCount'))))
#mean sound for each sers within a instrument
df = df.withColumn('Average_Sounds_99_Percent', F.mean('Sounds').over(wIS))
#mean sound of the chosen sers
df = df.withColumn('avg_sounds_of_sers_to_use', F.first(F.col('Average_Sounds_99_Percent')).over(wI.orderBy(F.desc('tmpCount'))))
df = df.withColumn('mean_ratios', F.col('avg_sounds_of_sers_to_use')/F.mean('Sounds').over(wIS))
df = df.withColumn('percent_differences', 100 * F.round(F.abs(F.col('avg_sounds_of_sers_to_use') - F.col('Average_Sounds_99_Percent'))/ ((F.col('avg_sounds_of_sers_to_use') + F.col('Average_Sounds_99_Percent'))/2),2))
#until now we flat table
df.show()
#now we create the desired structure and drop all unneeded columns
df.dropDuplicates(['Instruments','Sers']).groupby('Instruments', 'Chosen_Sers').agg(F.collect_list('Sers').alias('Sers')
, F.collect_list('Average_Sounds_99_Percent').alias('Average_Sounds_99_Percent')
, F.collect_list('mean_ratios').alias('mean_ratios')
, F.collect_list('percent_differences').alias('percent_differences')
).show(truncate=False)
Вывод:
#just a flat table
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|Instruments|Sers|Sounds| q|tmpCount|Chosen_Sers|Average_Sounds_99_Percent|avg_sounds_of_sers_to_use| mean_ratios|percent_differences|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
| B|Tool| 21| 56| 1| Tool| 21.0| 21.0| 1.0| 0.0|
| B|Wind| 23| 56| 1| Tool| 23.0| 21.0|1.0952380952380953| 9.0|
| C|Tool| 24| 61| 1| Tool| 24.0| 24.0| 1.0| 0.0|
| A|Wind| 42| 43| 2| Wind| 38.0| 38.0| 1.0| 0.0|
| A|Wind| 34| 43| 2| Wind| 38.0| 38.0| 1.0| 0.0|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
#desired structure
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|Instruments|Chosen_Sers|Sers |Average_Sounds_99_Percent|mean_ratios |percent_differences|
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|B |Tool |[Tool, Wind]|[21.0, 23.0] |[1.0, 0.9130434782608695]|[0.0, 9.0] |
|C |Tool |[Tool] |[24.0] |[1.0] |[0.0] |
|A |Wind |[Wind] |[38.0] |[1.0] |[0.0] |
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+