В принципе, я вижу три возможных подхода.
- Вычисление макс.
M_Max
отдельно и использование join
(которого вы хотите избежать. - Использование окна, как предлагается в другом ответе.
- Вычисление max с помощью пивота и агрегирование полученных столбцов с помощью
array_max
.
Скорее всего, подход 1 был бы менее эффективным. Между 2 и 3, однако я не уверен. Вы можете попробовать с вашими данными и сообщите нам; -)
Подход 3 будет go следующим образом:
val df = Seq(
("M1", 100, 200, 1), ("M1", 100, 175, 2), ("M1", 101, 150, 3),
("M1", 100, 125, 4), ("M1", 100, 90, 5), ("M1", 100, 85, 6),
("M2", 200, 1001, 1), ("M2", 200, 500, 2), ("M2", 200, 456, 3),
("M2", 200, 345, 4), ("M2", 200, 231, 5), ("M2", 201, 123, 6)
).toDF("M","M_Max","Sales","Rank")
// we include the max in the pivot, so we have one max column per rank
val df_pivot = df
.groupBy("M").pivot("Rank")
.agg(first('Sales) as "first", max('M_Max) as "max")
val max_cols = df_pivot.columns.filter(_ endsWith "max").map(col)
// then we aggregate these max columns into one
val max_col = array_max(array(max_cols : _*)) as "M_Max"
// let's rename the first columns to match your expected output
val first_cols = df_pivot.columns.filter(_ endsWith "first")
.map(name => col(name) as name.split("_")(0))
// And finally, we wrap everything together
df_pivot
.select($"M" +: first_cols :+ max_col : _*)
.show(false)
, что дает
+---+----+---+---+---+---+---+-----+
|M |1 |2 |3 |4 |5 |6 |M_Max|
+---+----+---+---+---+---+---+-----+
|M1 |200 |175|150|125|90 |85 |101 |
|M2 |1001|500|456|345|231|123|201 |
+---+----+---+---+---+---+---+-----+