Как удалить дубликаты из фрейма данных Spark, сохранив последние? - PullRequest
4 голосов
/ 13 апреля 2019

Я использую spark для загрузки файлов json из Amazon S3.Я хотел бы удалить дубликаты на основе двух столбцов фрейма данных, сохраняя самые новые (у меня есть столбец метки времени).Каков будет лучший способ сделать это?Обратите внимание, что дубликаты могут быть распределены по разделам.Можно ли удалить дубликаты с сохранением последней записи без перемешивания?Я имею дело с 1 ТБ данных.

Я думал о разбиении фрейма данных на эти два столбца таким образом, чтобы все дублирующиеся записи были «последовательно хешированы» в одном и том же разделе и, следовательно, на уровне раздела.сортировка с последующим удалением дубликатов удалит все дубликаты, оставив только один.Я не знаю, если это возможно.Любая информация приветствуется.

1 Ответ

3 голосов
/ 16 апреля 2019

Использование row_number () Возможно, оконная функция проще для вашей задачи, ниже c1 - столбец метки времени, c2, c3 - столбцы, используемые для разделения ваших данных:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

Редактировать:

Если вам просто нужны дубликаты и отбрасывать уникальные строки, добавьте еще одно поле:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
           .withColumn('cnt', F.count('c1').over(win2)) \
           .where('rn = 1 and cnt > 1') \
           .drop('rn', 'cnt')
df_new.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...