Я попробовал код ниже. Идея сценария, приведенного ниже, состоит в том, чтобы упорядочить записи по id и отметке времени и расположить их в порядке убывания по процессу processing_timestamp, но когда я пытаюсь выполнить запрос, он не упорядочивает запись в порядке убывания по обработанной отметке времени. Он даже удаляет самую последнюю запись и сохраняет более старую запись, что не должно иметь место
df2 = df_concat.orderBy("id", "processed_timestamp", f.col("processed_timestamp").desc()).dropDuplicates(["id"])
Я также попробовал подход, описанный ниже, но когда я попытался преобразовать его обратно в фрейм данных, схема таблицы теперь отличается и теперь записи находятся в одном столбце и разделяются запятой. Он также удаляет последнюю запись и сохраняет более старую запись, что не должно иметь место
def selectRowByTimeStamp(x,y):
if x.processed_timestamp > y.processed_timestamp:
return x
return y
dataMap = df_concat.rdd.map(lambda x: (x.id, x))
newdata = dataMap.reduceByKey(selectRowByTimeStamp)
Я не уверен, правильно ли я понимаю, как работает приведенный выше код.