Дедупировать строки в Spark DataFrame по самой последней отметке времени - PullRequest
0 голосов
/ 20 сентября 2018

У меня есть DataFrame со следующей схемой:

root
|- documentId
|- timestamp
|- anotherField

Например,

"d1", "2018-09-20 10:00:00", "blah1"
"d2", "2018-09-20 09:00:00", "blah2"
"d1", "2018-09-20 10:01:00", "blahnew"

Обратите внимание, что для понимания (и моего удобства) я показываюотметка времени в виде строки.Фактически это long, представляющее миллисекунды с начала эпохи.

Как видно здесь, есть повторяющиеся строки (строки 1 и 3) с одинаковыми documentId, но разными timestamp (и, возможно, различными другими полями)).Я хочу дедуплицировать и сохранять только самую последнюю (на основе timestamp) строку для каждой documentId.

Простой df.groupBy("documentId").agg(max("timestamp), ...), похоже, не сработает здесь, потому что я не знаю, какоставьте остальные поля в строке, соответствующие тем, которые удовлетворяют max("timestamp").

Итак, я придумал сложный способ сделать это.

// first find the max timestamp corresponding to each documentId
val mostRecent = df
    .select("documentId", "timestamp")
      .groupBy("documentId")
        .agg(max("timestamp"))

// now join with the original df on timestamp to retain
val dedupedDf = df.join(mostRecent, Seq("documentId", "timestamp"), "inner")

В результате dedupedDfдолжны иметь только те строки, которые соответствуют самой последней записи для каждого documentId.

Хотя это работает, я не чувствую, что это правильный (или эффективный) подход, так как я использую join который кажется ненужным.

Как я могу сделать это лучше?Я ищу чистые решения на основе «DataFrame», а не подходы на основе RDD (поскольку сотрудники DataBricks неоднократно говорили нам на семинаре по работе с DataFrames, а не с RDD).

1 Ответ

0 голосов
/ 20 сентября 2018

См. Приведенный ниже код помогает вашей цели,

val df = Seq(
  ("d1", "2018-09-20 10:00:00", "blah1"),
  ("d2", "2018-09-20 09:00:00", "blah2"),
  ("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
     .where($"rownum" === 1).drop("rownum")

Resultdf.show()

ввод:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d1|2018-09-20 10:00:00|       blah1|
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+

вывод:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+
...