Предполагая, что вам нужно выбрать последнюю запись в столбце идентификатора, удалив другие дубликаты, вы можете использовать функции окна и выполнить фильтр по row_number = count.Проверьте это
scala> val df = Seq((120,34.56,"2018-10-11"),(120,65.73,"2018-10-14"),(120,39.96,"2018-10-20"),(122,11.56,"2018-11-20"),(122,24.56,"2018-10-20")).toDF("id","amt","dt")
df: org.apache.spark.sql.DataFrame = [id: int, amt: double ... 1 more field]
scala> val df2=df.withColumn("dt",'dt.cast("date"))
df2: org.apache.spark.sql.DataFrame = [id: int, amt: double ... 1 more field]
scala> df2.show(false)
+---+-----+----------+
|id |amt |dt |
+---+-----+----------+
|120|34.56|2018-10-11|
|120|65.73|2018-10-14|
|120|39.96|2018-10-20|
|122|11.56|2018-11-20|
|122|24.56|2018-10-20|
+---+-----+----------+
scala> df2.createOrReplaceTempView("ido")
scala> spark.sql(""" select id,amt,dt,row_number() over(partition by id order by dt) rw, count(*) over(partition by id) cw from ido """).show(false)
+---+-----+----------+---+---+
|id |amt |dt |rw |cw |
+---+-----+----------+---+---+
|122|24.56|2018-10-20|1 |2 |
|122|11.56|2018-11-20|2 |2 |
|120|34.56|2018-10-11|1 |3 |
|120|65.73|2018-10-14|2 |3 |
|120|39.96|2018-10-20|3 |3 |
+---+-----+----------+---+---+
scala> spark.sql(""" select id,amt,dt from (select id,amt,dt,row_number() over(partition by id order by dt) rw, count(*) over(partition by id) cw from ido) where rw=cw """).show(false)
+---+-----+----------+
|id |amt |dt |
+---+-----+----------+
|122|11.56|2018-11-20|
|120|39.96|2018-10-20|
+---+-----+----------+
scala>
Если вы хотите отсортировать по убыванию dt, вы можете просто указать «order by dt desc» в предложении over (0). Помогает ли это?