Вы можете сначала drop duplicates
на основе id
и counter
, затем взять max
над окном id
, и, наконец, выполнить фильтрацию, где counter
равно Максимальному значению;
Если порядок id
должен быть сохранен, нам потребуется монотонно увеличивающийся идентификатор, чтобы мы могли отсортировать его позже:
from pyspark.sql.window import Window
w = Window.partitionBy('id')
out =(df.withColumn('idx',F.monotonically_increasing_id())
.drop_duplicates(['id','counter'])
.withColumn("Maximum",F.max(F.col("counter"))
.over(w)).filter("counter==Maximum").orderBy('idx')
.drop(*['idx','Maximum']))
out.show()
+---+-------+-------+
| id|country|counter|
+---+-------+-------+
| A| RU| 1|
| C| RU| 3|
| D| FR| 5|
| B| FR| 5|
+---+-------+-------+
Если порядок id
не является беспокойство, те же логи c, но дополнительный идентификатор не требуется:
from pyspark.sql.window import Window
w = Window.partitionBy('id')
out1 = (df.drop_duplicates(['id','counter']).withColumn("Maximum",F.max(F.col("counter"))
.over(w)).filter("counter==Maximum")
.drop('Maximum'))
out1.show()
+---+-------+-------+
| id|country|counter|
+---+-------+-------+
| B| FR| 5|
| D| FR| 5|
| C| RU| 3|
| A| RU| 1|
+---+-------+-------+