Мне нравится нормализовать результаты (...) Мне нравится использовать MinMaxScaler
Эти два требования являются взаимоисключающими. MinMaxScaler
нельзя использовать для работы с группами. Вы можете использовать оконные функции
from pyspark.sql.functions import min, max, col
from pyspark.sql.window import Window
df = spark.createDataFrame(
[(1, 6, 0.5), (1, 7, 10.2), (1, 8, 5.7), (2, 6, 11.0), (2, 10, 22.3)],
("day", "time", "result"))
w = Window.partitionBy("day")
scaled_result = (col("result") - min("result").over(w)) / (max("result").over(w) - min("result").over(w))
df.withColumn("scaled_result", scaled_result).show()
# +---+----+------+------------------+
# |day|time|result| scaled_result|
# +---+----+------+------------------+
# | 1| 6| 0.5| 0.0|
# | 1| 7| 10.2| 1.0|
# | 1| 8| 5.7|0.5360824742268042|
# | 2| 6| 11.0| 0.0|
# | 2| 10| 22.3| 1.0|
# +---+----+------+------------------+
или группировать, объединять и объединять:
minmax_result = df.groupBy("day").agg(min("result").alias("min_result"), max("result").alias("max_result"))
minmax_result.join(df, ["day"]).select(
"day", "time", "result",
((col("result") - col("min_result")) / (col("max_result") - col("min_result"))).alias("scaled_result")
).show()
# +---+----+------+------------------+
# |day|time|result| scaled_result|
# +---+----+------+------------------+
# | 1| 6| 0.5| 0.0|
# | 1| 7| 10.2| 1.0|
# | 1| 8| 5.7|0.5360824742268042|
# | 2| 6| 11.0| 0.0|
# | 2| 10| 22.3| 1.0|
# +---+----+------+------------------+