Spark структурированные дубликаты потоковой передачи остаются последними - PullRequest
0 голосов
/ 24 декабря 2018

Я хотел бы сохранить потоковый фрейм данных, который получает "обновление".

Для этого я буду использовать dropDuplicates.

Но dropDuplicates отбросить последнее изменение.

Как сохранить только последнее?

1 Ответ

0 голосов
/ 24 декабря 2018

Предполагая, что вам нужно выбрать последнюю запись в столбце идентификатора, удалив другие дубликаты, вы можете использовать функции окна и выполнить фильтр по row_number = count.Проверьте это

scala> val df = Seq((120,34.56,"2018-10-11"),(120,65.73,"2018-10-14"),(120,39.96,"2018-10-20"),(122,11.56,"2018-11-20"),(122,24.56,"2018-10-20")).toDF("id","amt","dt")
df: org.apache.spark.sql.DataFrame = [id: int, amt: double ... 1 more field]

scala> val df2=df.withColumn("dt",'dt.cast("date"))
df2: org.apache.spark.sql.DataFrame = [id: int, amt: double ... 1 more field]

scala> df2.show(false)
+---+-----+----------+
|id |amt  |dt        |
+---+-----+----------+
|120|34.56|2018-10-11|
|120|65.73|2018-10-14|
|120|39.96|2018-10-20|
|122|11.56|2018-11-20|
|122|24.56|2018-10-20|
+---+-----+----------+


scala> df2.createOrReplaceTempView("ido")

scala> spark.sql(""" select id,amt,dt,row_number() over(partition by id order by dt) rw, count(*) over(partition by id) cw from ido """).show(false)
+---+-----+----------+---+---+
|id |amt  |dt        |rw |cw |
+---+-----+----------+---+---+
|122|24.56|2018-10-20|1  |2  |
|122|11.56|2018-11-20|2  |2  |
|120|34.56|2018-10-11|1  |3  |
|120|65.73|2018-10-14|2  |3  |
|120|39.96|2018-10-20|3  |3  |
+---+-----+----------+---+---+


scala> spark.sql(""" select id,amt,dt from (select id,amt,dt,row_number() over(partition by id order by dt) rw, count(*) over(partition by id) cw from ido) where rw=cw """).show(false)
+---+-----+----------+
|id |amt  |dt        |
+---+-----+----------+
|122|11.56|2018-11-20|
|120|39.96|2018-10-20|
+---+-----+----------+


scala>

Если вы хотите отсортировать по убыванию dt, вы можете просто указать «order by dt desc» в предложении over (0). Помогает ли это?

...