Это должно работать для вас.Сначала назначьте минимальную дату оригинальному df, затем присоедините новый df2 к df.
import org.apache.spark.sql.expressions.Window
val df = Seq(
(1,1,0,1,"2019-01-31"),
(2,1,1,0,"2018-12-31"),
(3,1,1,1,"2018-10-31"),
(1,1,0,1,"2018-09-30"),
(2,1,1,0,"2018-08-31"),
(3,1,1,0,"2018-07-31"),
(3,1,1,1,"2019-05-31"))
.toDF("id" ,"def_a" , "def_b", "deb_c", "date")
val w = Window.partitionBy($"id").orderBy($"date".asc)
val df2 = df.withColumn("date", $"date".cast("date"))
.withColumn("min_date", min($"date").over(w))
.select("id", "min_date")
.distinct()
df.join(df2, df("id") === df2("id") && df("date") === df2("min_date"))
.select(df("*"))
.show
И результат должен быть:
+---+-----+-----+-----+----------+
| id|def_a|def_b|deb_c| date|
+---+-----+-----+-----+----------+
| 1| 1| 0| 1|2018-09-30|
| 2| 1| 1| 0|2018-08-31|
| 3| 1| 1| 0|2018-07-31|
+---+-----+-----+-----+----------+
Кстати, я полагаю, у вас было немногоошибка в ваших ожидаемых результатах.Это (3, 1, 1, 0, 2018-07-31)
не (3, 1, 1, 1, 2018-07-31)