Вы можете получить его, используя функции окна
scala> val df = Seq(("a",2,"2013-12-12","2999-12-31"),("b",3,"2011-12-14","2999-12-31"),("a",4,"2013-12-17","2999-12-31"),("b",8,"2011-12-19","2999-12-31"),("a",6,"2013-12-23","2999-12-31")).toDF("colA","colB","colC","colD")
df: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]
scala> val df2 = df.withColumn("colc",'colc.cast("date")).withColumn("cold",'cold.cast("date"))
df2: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]
scala> df2.createOrReplaceTempView("yash")
scala> spark.sql(""" select cola,colb,colc,cold, rank() over(partition by cola order by colc) c1, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold2 from yash """).show
+----+----+----------+----------+---+----------+
|cola|colb| colc| cold| c1| cold2|
+----+----+----------+----------+---+----------+
| b| 3|2011-12-14|2999-12-31| 1|2011-12-18|
| b| 8|2011-12-19|2999-12-31| 2|2999-12-31|
| a| 2|2013-12-12|2999-12-31| 1|2013-12-16|
| a| 4|2013-12-17|2999-12-31| 2|2013-12-22|
| a| 6|2013-12-23|2999-12-31| 3|2999-12-31|
+----+----+----------+----------+---+----------+
scala>
Удаление ненужных столбцов
scala> spark.sql(""" select cola,colb,colc, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold from yash """).show
+----+----+----------+----------+
|cola|colb| colc| cold|
+----+----+----------+----------+
| b| 3|2011-12-14|2011-12-18|
| b| 8|2011-12-19|2999-12-31|
| a| 2|2013-12-12|2013-12-16|
| a| 4|2013-12-17|2013-12-22|
| a| 6|2013-12-23|2999-12-31|
+----+----+----------+----------+
scala>