Идея состоит в том, чтобы сначала упорядочить DataFrame со столбцом e
и с attempt
(в порядке убывания). Как только это будет сделано, мы выбираем верхнюю строку.
# Loading the requisite packages and creating the DataFrame.
from pyspark.sql.window import Window
from pyspark.sql.functions import col, first, row_number
valuesCol = [(1,1,100),(2,1,95),(2,2,55),(3,1,78),(3,2,100),(3,3,88)]
df = spark.createDataFrame(valuesCol,['e','attempt','grade'])
df.show()
+---+-------+-----+
| e|attempt|grade|
+---+-------+-----+
| 1| 1| 100|
| 2| 1| 95|
| 2| 2| 55|
| 3| 1| 78|
| 3| 2| 100|
| 3| 3| 88|
+---+-------+-----+
Теперь мы выбираем значение X
. Как говорит OP, значение attempt
не должно быть больше X
, поэтому мы отфильтровываем все те строки, где attempt
больше X
, а затем упорядочиваем, используя функцию orderBy()
.
X=2
w = Window.partitionBy(col('e')).orderBy(col('attempt').desc())
df = df.where(col('attempt')<=X).orderBy(['e','attempt'], ascending=[1,0])
df.show()
+---+-------+-----+
| e|attempt|grade|
+---+-------+-----+
| 1| 1| 100|
| 2| 2| 55|
| 2| 1| 95|
| 3| 2| 100|
| 3| 1| 78|
+---+-------+-----+
Как только это будет сделано, мы используем функции row_number()
и window
, чтобы выбрать верхнюю строку в этом отсортированном кадре данных.
df = df.withColumn('row_num', row_number().over(w)).where(col('row_num') == 1).drop('row_num')
df.show()
+---+-------+-----+
| e|attempt|grade|
+---+-------+-----+
| 1| 1| 100|
| 3| 2| 100|
| 2| 2| 55|
+---+-------+-----+