Как удалить дубликаты записей с одинаковым значением в указанном столбце c и сохранить запись с самой высокой отметкой времени с помощью pyspark - PullRequest
0 голосов
/ 16 апреля 2020

Я попробовал код ниже. Идея сценария, приведенного ниже, состоит в том, чтобы упорядочить записи по id и отметке времени и расположить их в порядке убывания по процессу processing_timestamp, но когда я пытаюсь выполнить запрос, он не упорядочивает запись в порядке убывания по обработанной отметке времени. Он даже удаляет самую последнюю запись и сохраняет более старую запись, что не должно иметь место

df2 = df_concat.orderBy("id", "processed_timestamp", f.col("processed_timestamp").desc()).dropDuplicates(["id"])

Я также попробовал подход, описанный ниже, но когда я попытался преобразовать его обратно в фрейм данных, схема таблицы теперь отличается и теперь записи находятся в одном столбце и разделяются запятой. Он также удаляет последнюю запись и сохраняет более старую запись, что не должно иметь место

    def selectRowByTimeStamp(x,y):
        if x.processed_timestamp > y.processed_timestamp:
            return x
        return y

dataMap = df_concat.rdd.map(lambda x: (x.id, x))
newdata = dataMap.reduceByKey(selectRowByTimeStamp) 

Я не уверен, правильно ли я понимаю, как работает приведенный выше код.

1 Ответ

0 голосов
/ 16 апреля 2020

Если бы не простая ошибка, ваш код работал бы так, как ожидалось.

Не следует использовать имя столбца "processed_timestamp" дважды:

df2 = df_concat.orderBy(
    "id", f.col("processed_timestamp").desc()
).dropDuplicates(["id"])

Ваш код сортируется DataFrame на processed_timestamp в порядке возрастания, потому что необработанное имя столбца идет первым.

...