Вы можете использовать функцию отставания, чтобы добавить столбец lagDate, используйте оператор if else, используя datediff , чтобы сравнить разницу:
import org.apache.spark.sql.functions.{when, col, lag, datediff, lag, date_add}
val windowId = Window.partitionBy(col("ID")).orderBy("Date")
val newDf = df.withColumn("dateLimit", date_add(col("Date"), 30))
.withColumn("lagDate1", lag(col("Date"), -1).over(windowId))
.withColumn("lagDate2", lag(col("Date"), -2).over(windowId))
.withColumn("lagDate",
when(datediff(col("lagDate1"), col("dateLimit")) >=0, col("lagDate1")).otherwise(col("lagDate2"))
)
.withColumn("correctNextDate",
when(datediff(col("NextDate"), col("Date")) > 30 || col("NextDate").isNull,
col("NextDate")).
otherwise(col("lagDate"))
)