У меня есть DataFrame
со следующей схемой:
root
|- documentId
|- timestamp
|- anotherField
Например,
"d1", "2018-09-20 10:00:00", "blah1"
"d2", "2018-09-20 09:00:00", "blah2"
"d1", "2018-09-20 10:01:00", "blahnew"
Обратите внимание, что для понимания (и моего удобства) я показываюотметка времени в виде строки.Фактически это long
, представляющее миллисекунды с начала эпохи.
Как видно здесь, есть повторяющиеся строки (строки 1 и 3) с одинаковыми documentId
, но разными timestamp
(и, возможно, различными другими полями)).Я хочу дедуплицировать и сохранять только самую последнюю (на основе timestamp
) строку для каждой documentId
.
Простой df.groupBy("documentId").agg(max("timestamp), ...)
, похоже, не сработает здесь, потому что я не знаю, какоставьте остальные поля в строке, соответствующие тем, которые удовлетворяют max("timestamp")
.
Итак, я придумал сложный способ сделать это.
// first find the max timestamp corresponding to each documentId
val mostRecent = df
.select("documentId", "timestamp")
.groupBy("documentId")
.agg(max("timestamp"))
// now join with the original df on timestamp to retain
val dedupedDf = df.join(mostRecent, Seq("documentId", "timestamp"), "inner")
В результате dedupedDf
должны иметь только те строки, которые соответствуют самой последней записи для каждого documentId
.
Хотя это работает, я не чувствую, что это правильный (или эффективный) подход, так как я использую join
который кажется ненужным.
Как я могу сделать это лучше?Я ищу чистые решения на основе «DataFrame», а не подходы на основе RDD (поскольку сотрудники DataBricks неоднократно говорили нам на семинаре по работе с DataFrames, а не с RDD).