Учитывая список диапазонов дат, некоторые из которых перекрываются:
val df = Seq(
("Mike","2018-09-01","2018-09-10"), // range 1
("Mike","2018-09-05","2018-09-05"), // range 1
("Mike","2018-09-12","2018-09-12"), // range 1
("Mike","2018-09-11","2018-09-11"), // range 1
("Mike","2018-09-25","2018-09-29"), // range 4
("Mike","2018-09-21","2018-09-23"), // range 4
("Mike","2018-09-24","2018-09-24"), // range 4
("Mike","2018-09-14","2018-09-16"), // range 2
("Mike","2018-09-15","2018-09-17"), // range 2
("Mike","2018-09-05","2018-09-05"), // range 1
("Mike","2018-09-19","2018-09-19"), // range 3
("Mike","2018-09-19","2018-09-19"), // range 3
("Mike","2018-08-19","2018-08-20"), // range 5
("Mike","2018-10-01","2018-10-20"), // range 6
("Mike","2018-10-10","2018-10-30") // range 6
).toDF("name", "start", "end")
Я хотел бы сократить данные до минимального набора диапазонов дат, который полностью инкапсулирует вышеуказанные даты без дополнительных датдобавлено:
+----+----------+----------+
|name|start |end |
+----+----------+----------+
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-09-21|2018-09-29|
|Mike|2018-08-19|2018-08-20|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+
РЕДАКТИРОВАТЬ: Добавлено три новые записи в тестовые данные для учета новых крайних случаев.
Я не могу полагаться на даты в любом конкретном порядке.
Моя лучшая попытка сделать это:
- Разбить каждый диапазон дат на отдельные дни
- Объединить наборы в один большой набор всех дней
- Сортировка набора в список, чтобы дни были в порядке.
- Объединение отдельных дней обратно в список списков дней.
- . В качестве первого и последнего дня каждого списка укажитеновые диапазоны.
Код, такой как он есть:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import scala.collection.immutable.NumericRange
import java.time.LocalDate
case class MyRange(start:String, end:String)
val combineRanges = udf((ranges: Seq[Row]) => {
ranges.map(r => LocalDate.parse(r(0).toString).toEpochDay to LocalDate.parse(r(1).toString).toEpochDay)
.map(_.toIndexedSeq).reduce(_ ++ _).distinct.toList.sorted
.aggregate(List.empty[Vector[Long]])((ranges:List[Vector[Long]], d:Long) => {
ranges.lastOption.find(_.last + 1 == d) match {
case Some(r:Vector[Long]) => ranges.dropRight(1) :+ (r :+ d)
case None => ranges :+ Vector(d)
}
}, _ ++ _).map(v => MyRange(LocalDate.ofEpochDay(v.head).toString, LocalDate.ofEpochDay(v.last).toString))
})
df.groupBy("name")
.agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges")
.withColumn("ranges", explode($"ranges"))
.select($"name", $"ranges.start", $"ranges.end")
.show(false)
Кажется, он работает, но он очень уродлив и, вероятно, тратит время и память.
Я надеялся использовать класс Range Scala только для того, чтобы понять,союзник разбивает диапазоны дат на отдельные дни, но у меня есть ощущение, что операция сортировки заставляет руку scala и фактически создает список всех дат в памяти.
У кого-нибудь есть лучший способделать это?