Вот один из подходов:
- Создать новый столбец
group_id
со значением null
, если begin_dt
находится в диапазоне дат от предыдущей строки;в противном случае уникальное целое число - Backfill
null
s в group_id
с ненулевым значением last
- Вычисление
min(begin_dt)
и max(end_dt)
в каждом (key, group_id)
разбиение
Пример ниже:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
(10, "ABC", "2018-01-01", "2018-01-08"),
(10, "BAC", "2018-01-03", "2018-01-15"),
(10, "CAS", "2018-01-03", "2018-01-21"),
(20, "AAA", "2017-11-12", "2018-01-03"),
(20, "DAS", "2018-01-01", "2018-01-12"),
(20, "EDS", "2018-02-01", "2018-02-16")
).toDF("key", "code", "begin_dt", "end_dt")
val win1 = Window.partitionBy($"key").orderBy($"begin_dt", $"end_dt")
val win2 = Window.partitionBy($"key", $"group_id")
df.
withColumn("group_id", when(
$"begin_dt".between(lag($"begin_dt", 1).over(win1), lag($"end_dt", 1).over(win1)), null
).otherwise(monotonically_increasing_id)
).
withColumn("group_id", last($"group_id", ignoreNulls=true).
over(win1.rowsBetween(Window.unboundedPreceding, 0))
).
withColumn("begin_dt2", min($"begin_dt").over(win2)).
withColumn("end_dt2", max($"end_dt").over(win2)).
orderBy("key", "begin_dt", "end_dt").
show
// +---+----+----------+----------+-------------+----------+----------+
// |key|code| begin_dt| end_dt| group_id| begin_dt2| end_dt2|
// +---+----+----------+----------+-------------+----------+----------+
// | 10| ABC|2018-01-01|2018-01-08|1047972020224|2018-01-01|2018-01-21|
// | 10| BAC|2018-01-03|2018-01-15|1047972020224|2018-01-01|2018-01-21|
// | 10| CAS|2018-01-03|2018-01-21|1047972020224|2018-01-01|2018-01-21|
// | 20| AAA|2017-11-12|2018-01-03| 455266533376|2017-11-12|2018-01-12|
// | 20| DAS|2018-01-01|2018-01-12| 455266533376|2017-11-12|2018-01-12|
// | 20| EDS|2018-02-01|2018-02-16| 455266533377|2018-02-01|2018-02-16|
// +---+----+----------+----------+-------------+----------+----------+