Получить максимальное значение в паспарке, вставить его и удалить дубликаты - PullRequest
0 голосов
/ 17 апреля 2020

Я застрял в решении своей проблемы.

Что у меня есть: Фрейм данных Pyspark, который выглядит следующим образом:

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  B |      EN |       2 |
+----+---------+---------+
|  A |      IQ |       1 |
+----+---------+---------+
|  C |      RU |       3 |
+----+---------+---------+
|  D |      FR |       5 |
+----+---------+---------+
|  B |      FR |       5 |
+----+---------+---------+

Мне нужно принять максимальное значение страны счетчиком (или любым, если оно равно ) и удалите все дубликаты. Так должно выглядеть:

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  C |      RU |       3 |
+----+---------+---------+
|  D |      FR |       5 |
+----+---------+---------+
|  B |      FR |       5 |
+----+---------+---------+

Кто-нибудь может мне помочь?

1 Ответ

1 голос
/ 17 апреля 2020

Вы можете сначала drop duplicates на основе id и counter, затем взять max над окном id, и, наконец, выполнить фильтрацию, где counter равно Максимальному значению;

Если порядок id должен быть сохранен, нам потребуется монотонно увеличивающийся идентификатор, чтобы мы могли отсортировать его позже:

from pyspark.sql.window import Window
w = Window.partitionBy('id')

out =(df.withColumn('idx',F.monotonically_increasing_id())
        .drop_duplicates(['id','counter'])
        .withColumn("Maximum",F.max(F.col("counter"))
        .over(w)).filter("counter==Maximum").orderBy('idx')
        .drop(*['idx','Maximum']))

out.show()

+---+-------+-------+
| id|country|counter|
+---+-------+-------+
|  A|     RU|      1|
|  C|     RU|      3|
|  D|     FR|      5|
|  B|     FR|      5|
+---+-------+-------+

Если порядок id не является беспокойство, те же логи c, но дополнительный идентификатор не требуется:

from pyspark.sql.window import Window
w = Window.partitionBy('id')
out1 = (df.drop_duplicates(['id','counter']).withColumn("Maximum",F.max(F.col("counter"))
                                     .over(w)).filter("counter==Maximum")
                                     .drop('Maximum'))

out1.show()

+---+-------+-------+
| id|country|counter|
+---+-------+-------+
|  B|     FR|      5|
|  D|     FR|      5|
|  C|     RU|      3|
|  A|     RU|      1|
+---+-------+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...