Spark dropduplicates, но выберите столбец с нулевым - PullRequest
0 голосов
/ 06 марта 2020

У меня есть таблица, которая выглядит следующим образом:

+---------+-------------+--------------+-----------+--------+--------------+--------------+
| cust_num|valid_from_dt|valid_until_dt|cust_row_id| cust_id|insert_load_dt|update_load_dt|
+---------+-------------+--------------+-----------+--------+--------------+--------------+
|950379405|   2018-08-24|    2018-08-24|   06885247|06885247|    2018-08-24|    2018-08-25|
|950379405|   2018-08-25|    2018-08-28|   06885247|06885247|    2018-08-25|    2018-08-29|
|950379405|   2018-08-29|    2019-12-16|   27344328|06885247|    2018-08-29|    2019-12-17|<- pair 1
|950379405|   2018-08-29|    2019-12-16|   27344328|06885247|    2018-08-29|              |<- pair 1
|950379405|   2019-12-17|    2019-12-24|   91778710|06885247|    2019-12-17|              |<- pair 2
|950379405|   2019-12-17|    2019-12-24|   91778710|06885247|    2019-12-17|    2019-12-25|<- pair 2
|950379405|   2019-12-25|    2019-12-25|   08396180|06885247|    2019-12-25|    2019-12-26|<- pair 3 
|950379405|   2019-12-25|    2019-12-25|   08396180|06885247|    2019-12-25|              |<- pair 3

Как вы можете видеть, у меня есть несколько дублированных строк в моей таблице, и они отличаются только в том, что update_load_dt пусто или с датой.
Я хотел бы удалить дубликаты в моем фрейме данных таким образом:

cable_dv_customer_fixed.dropDuplicates(['cust_num',
'valid_from_dt',
'valid_until_dt',
'cust_row_id',
'cust_id'])

, но я хотел бы сохранить строку с дополнительной информацией.
Я имею в виду, что хотел бы сохранить строку where update_load_dt <> ''

Можно ли изменить функцию dropduplicates (), чтобы я мог выбрать, какую строку выбрать из дубликатов? или есть другой (лучший) способ сделать это?

Ответы [ 2 ]

0 голосов
/ 07 марта 2020

Вот как я бы go сказал: F.max () сделает то, что вы хотите, и сохраните строку с самым высоким значением. (на дату col max () сохраняет последнюю запись даты, если есть несколько).

from pyspark.sql.window import Window
key_cols = ['cust_num','valid_from_dt','valid_until_dt','cust_row_id','cust_id']
w = Window.partitionBy(key_cols)

df.withColumn('update_load_dt', F.max('update_load_dt').over(w)).dropDuplicates(key_cols)

Я работаю с 1 миллиардом + строк, и это не медленно. Дайте мне знать, если это помогло!

0 голосов
/ 06 марта 2020

вы можете использовать оконные функции для этого. Однако с большими данными это может быть медленным.

import pyspark.sql.function as F
from pyspark.sql.window import Window

df.withColumn("row_number", F.row_number().over(Window.partitionBy(<cols>).orderBy(F.asc_null_last("update_load_dt"))))
.filter("row_number = 1")
.drop("row_number") # optional
...