Вы можете использовать datediff
вместе с оконной функцией lag
, чтобы проверить разрывы между текущими и предыдущими строками и вычислить недостающие диапазоны дат с некоторыми функциями дат :
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
import java.sql.Date
val df = Seq(
(1, Date.valueOf("2018-01-01"), Date.valueOf("2018-01-31")),
(1, Date.valueOf("2018-02-16"), Date.valueOf("2018-02-28")),
(1, Date.valueOf("2018-03-01"), Date.valueOf("2018-03-31")),
(2, Date.valueOf("2018-07-01"), Date.valueOf("2018-07-31")),
(2, Date.valueOf("2018-08-16"), Date.valueOf("2018-08-31"))
).toDF("MemberId", "StartDate", "EndDate")
val win = Window.partitionBy("MemberId").orderBy("StartDate", "EndDate")
df.
withColumn("PrevEndDate", coalesce(lag($"EndDate", 1).over(win), date_sub($"StartDate", 1))).
withColumn("DayGap", datediff($"StartDate", $"PrevEndDate")).
where($"DayGap" > 1).
select($"MemberId", date_add($"PrevEndDate", 1).as("StartDateGap"), date_sub($"StartDate", 1).as("EndDateGap")).
show
// +--------+------------+----------+
// |MemberId|StartDateGap|EndDateGap|
// +--------+------------+----------+
// | 1| 2018-02-01|2018-02-15|
// | 2| 2018-08-01|2018-08-15|
// +--------+------------+----------+