Используя Spark 2.3.1 с Scala, уменьшите произвольный список диапазонов дат в отдельные непересекающиеся диапазоны дат - PullRequest
0 голосов
/ 28 октября 2018

Учитывая список диапазонов дат, некоторые из которых перекрываются:

val df = Seq(
  ("Mike","2018-09-01","2018-09-10"), // range 1
  ("Mike","2018-09-05","2018-09-05"), // range 1
  ("Mike","2018-09-12","2018-09-12"), // range 1
  ("Mike","2018-09-11","2018-09-11"), // range 1
  ("Mike","2018-09-25","2018-09-29"), // range 4
  ("Mike","2018-09-21","2018-09-23"), // range 4
  ("Mike","2018-09-24","2018-09-24"), // range 4
  ("Mike","2018-09-14","2018-09-16"), // range 2
  ("Mike","2018-09-15","2018-09-17"), // range 2
  ("Mike","2018-09-05","2018-09-05"), // range 1
  ("Mike","2018-09-19","2018-09-19"), // range 3
  ("Mike","2018-09-19","2018-09-19"), // range 3
  ("Mike","2018-08-19","2018-08-20"), // range 5
  ("Mike","2018-10-01","2018-10-20"), // range 6
  ("Mike","2018-10-10","2018-10-30")  // range 6
).toDF("name", "start", "end")

Я хотел бы сократить данные до минимального набора диапазонов дат, который полностью инкапсулирует вышеуказанные даты без дополнительных датдобавлено:

+----+----------+----------+                                                    
|name|start     |end       |
+----+----------+----------+
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-09-21|2018-09-29|
|Mike|2018-08-19|2018-08-20|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+

РЕДАКТИРОВАТЬ: Добавлено три новые записи в тестовые данные для учета новых крайних случаев.

Я не могу полагаться на даты в любом конкретном порядке.

Моя лучшая попытка сделать это:

  1. Разбить каждый диапазон дат на отдельные дни
  2. Объединить наборы в один большой набор всех дней
  3. Сортировка набора в список, чтобы дни были в порядке.
  4. Объединение отдельных дней обратно в список списков дней.
  5. . В качестве первого и последнего дня каждого списка укажитеновые диапазоны.

Код, такой как он есть:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import scala.collection.immutable.NumericRange
import java.time.LocalDate

case class MyRange(start:String, end:String)

val combineRanges = udf((ranges: Seq[Row]) => {
  ranges.map(r => LocalDate.parse(r(0).toString).toEpochDay to LocalDate.parse(r(1).toString).toEpochDay)
    .map(_.toIndexedSeq).reduce(_ ++ _).distinct.toList.sorted
    .aggregate(List.empty[Vector[Long]])((ranges:List[Vector[Long]], d:Long) => {
    ranges.lastOption.find(_.last + 1 == d) match {
      case Some(r:Vector[Long]) => ranges.dropRight(1) :+ (r :+ d)
      case None => ranges :+ Vector(d)
    }
  }, _ ++ _).map(v => MyRange(LocalDate.ofEpochDay(v.head).toString, LocalDate.ofEpochDay(v.last).toString))
})

df.groupBy("name")
  .agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges")
  .withColumn("ranges", explode($"ranges"))
  .select($"name", $"ranges.start", $"ranges.end")
  .show(false)

Кажется, он работает, но он очень уродлив и, вероятно, тратит время и память.

Я надеялся использовать класс Range Scala только для того, чтобы понять,союзник разбивает диапазоны дат на отдельные дни, но у меня есть ощущение, что операция сортировки заставляет руку scala и фактически создает список всех дат в памяти.

У кого-нибудь есть лучший способделать это?

Ответы [ 5 ]

0 голосов
/ 01 ноября 2018

Как насчет

  1. разнесения диапазона дат по дням с помощью udf
  2. с использованием аналитических функций для создания магии

Код:

import java.time.LocalDate
import java.time.format.DateTimeFormatter
def enumerateDays(start: LocalDate, end: LocalDate) = {
  Iterator.iterate(start)(d => d.plusDays(1L))
  .takeWhile(d => !d.isAfter(end))
  .toSeq
}
val udf_enumerateDays = udf( (start:String, end:String) => enumerateDays(LocalDate.parse(start), LocalDate.parse(end)).map(_.toString))

df.select($"name", explode(udf_enumerateDays($"start",$"end")).as("day"))
.distinct
.withColumn("day_prev", lag($"day",1).over(Window.partitionBy($"name").orderBy($"day")))
.withColumn("is_consecutive", coalesce(datediff($"day",$"day_prev"),lit(0))<=1)
.withColumn("group_nb", sum(when($"is_consecutive",lit(0)).otherwise(lit(1))).over(Window.partitionBy($"name").orderBy($"day")))
.groupBy($"name",$"group_nb").agg(min($"day").as("start"), max($"day").as("end"))
.drop($"group_nb")
.orderBy($"name",$"start")
.show

Результат:

+----+----------+----------+
|name|     start|       end|
+----+----------+----------+
|Mike|2018-08-19|2018-08-20|
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-09-21|2018-09-29|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+
0 голосов
/ 30 октября 2018

Вот еще одно решение, использующее DF без UDF.

val df = Seq(
  ("Mike","2018-09-01","2018-09-10"), // range 1
  ("Mike","2018-09-05","2018-09-05"), // range 1
  ("Mike","2018-09-12","2018-09-12"), // range 1
  ("Mike","2018-09-11","2018-09-11"), // range 1
  ("Mike","2018-09-25","2018-09-30"), // range 4
  ("Mike","2018-09-21","2018-09-23"), // range 4
  ("Mike","2018-09-24","2018-09-24"), // range 4
  ("Mike","2018-09-14","2018-09-16"), // range 2
  ("Mike","2018-09-15","2018-09-17"), // range 2
  ("Mike","2018-09-05","2018-09-05"), // range 1
  ("Mike","2018-09-19","2018-09-19"), // range 3
  ("Mike","2018-09-19","2018-09-19")  // range 3
).toDF("name", "start", "end").withColumn("start",'start.cast("date")).withColumn("end",'end.cast("date"))
df.printSchema()

val df2 = df.as("t1").join(df.as("t2"), $"t1.start" =!= $"t2.start" and $"t1.end" =!= $"t2.end")
  .withColumn("date_diff_start",datediff($"t1.start",$"t2.start"))
  .withColumn("date_diff_end",datediff($"t1.end",$"t2.end"))
  .withColumn("n1_start",when('date_diff_start===1,$"t2.start"))
  .withColumn("n1_end",when('date_diff_end === -1,$"t2.end"))
  .filter( 'n1_start.isNotNull or 'n1_end.isNotNull)
  .withColumn( "new_start", when('n1_start.isNotNull, $"n1_start").otherwise($"t1.start"))
  .withColumn( "new_end", when('n1_end.isNotNull, $"n1_end").otherwise($"t1.end"))
  .select("t1.name","new_start","new_end")
  .distinct

val df3= df2.alias("t3").join(df2.alias("t4"),$"t3.name" === $"t4.name")
  .withColumn("x1",when($"t3.new_end"=== $"t4.new_start",1)
    .when($"t3.new_start" === $"t4.new_end",1)
    .otherwise(0))
  .groupBy("t3.name","t3.new_start","t3.new_end")
  .agg( min( when('x1===1,$"t4.new_start" ).otherwise($"t3.new_start") ).as("ns"), max(when('x1===1,$"t4.new_end").otherwise($"t3.new_end")).as("ne"))
  .select("t3.name","ns","ne")
  .distinct

df3.show(false)

val num_combinations = df3.count

val df4 = df.filter('start==='end).distinct.select("name","start").alias("dup")
  .join(df3.alias("d4"), $"d4.name"===$"dup.name" , "leftOuter")
  .withColumn("cond", ! $"dup.start".between($"ns" , $"ne"))
  .filter('cond)
  .groupBy("d4.name","start" ).agg(count($"start").as("count"),collect_set('start).as("dup_s1"))
  .filter('count===num_combinations)
  .withColumn("start",explode('dup_s1))
  .withColumn("end",'start)
  .select("name","start","end")

df3.union(df4).show(false)

Результаты:

+----+----------+----------+
|name|ns        |ne        |
+----+----------+----------+
|Mike|2018-09-21|2018-09-30|
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
+----+----------+----------+
0 голосов
/ 30 октября 2018

Я думаю, что самый простой (и наиболее читаемый) способ - это разбить диапазоны на отдельные дни, а затем объединить обратно в интервалы.Поскольку количество дней не может быть слишком большим, я думаю, что взрыв здесь не является узким местом.Я показываю «чистое Scala» решение, которое затем используется внутри UDF, которое получает все интервалы от агрегации collect_list:

import java.time.LocalDate
import java.time.temporal.ChronoUnit

def enumerateDays(start: LocalDate, end: LocalDate) = {
  Iterator.iterate(start)(d => d.plusDays(1L))
    .takeWhile(d => !d.isAfter(end)) 
    .toList
}

implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)

val combineRanges = udf((data: Seq[Row]) => {
  val dateEnumerated =
    data
      .toSet[Row] // use Set to save memory if many spans overlap
      // "explode" date spans into individual days
      .flatMap { case Row(start: String, end: String) => enumerateDays(LocalDate.parse(start), LocalDate.parse(end)) }
      .toVector
      .sorted

  // combine subsequent dates into Vectors
  dateEnumerated.tail
    // combine subsequent dates into Vectors
    .foldLeft(Vector(Vector(dateEnumerated.head)))((agg, curr) => {
    if (ChronoUnit.DAYS.between(agg.last.last, curr) == 1L) {
      agg.init :+ (agg.last :+ curr)
    } else {
      agg :+ Vector(curr)
    }
  })
    // now get min/max of dates per span
    .map(r => (r.min.toString, r.max.toString))
})

df.groupBy("name")
  .agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges")
  .withColumn("ranges", explode($"ranges"))
  .select($"name", $"ranges._1".as("start"), $"ranges._2".as("end"))
  .show(false)

дает

+----+----------+----------+
|name|start     |end       |
+----+----------+----------+
|Mike|2018-08-19|2018-08-20|
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-09-21|2018-09-29|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+

Я думаю, что этотакже выполнимо с большим количеством логики DataFrame API.Я бы все равно взорвался, используя UDF, но затем использовал Window-Functions и groupBy, чтобы построить новый блок на основе количества дней между двумя датами.Но я думаю, что вышеупомянутое решение также хорошо

0 голосов
/ 30 октября 2018

Вот альтернатива с DFs и SPARK SQL как непроцедурная, так и процедурная по определению.Вам нужно хорошо читать и сохранять.

// Aspects such as caching and re-partitioning for performance not considered. On the other hand it all happens under the bonnet wth DF's - so they say.
// Functional only.
import org.apache.spark.sql.functions._
import spark.implicits._
import java.time._
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

def toEpochDay(s: String) = LocalDate.parse(s).toEpochDay
val toEpochDayUdf = udf(toEpochDay(_: String))

val df = Seq(
("Betty","2018-09-05","2018-09-05"),  ("Betty","2018-09-05","2018-09-05"), 
("Betty","2018-09-05","2018-09-08"),  ("Betty","2018-09-07","2018-09-10"),  
("Betty","2018-09-07","2018-09-08"),  ("Betty","2018-09-06","2018-09-07"),  
("Betty","2018-09-10","2018-09-15"),  ("Betty","2017-09-10","2017-09-15"),
("XXX","2017-09-04","2017-09-10"),    ("XXX","2017-09-10","2017-09-15"),
("YYY","2017-09-04","2017-09-10"),    ("YYY","2017-09-11","2017-09-15"),
("Bob","2018-09-01","2018-09-02"),    ("Bob","2018-09-04","2018-09-05"),  
("Bob","2018-09-06","2018-09-07"),    ("Bob","2019-09-04","2019-09-05"),  
("Bob","2019-09-06","2019-09-07"),    ("Bob","2018-09-08","2018-09-22")   
           ).toDF("name", "start", "end")

// Remove any duplicates - pointless to n-process these!
val df2 = df.withColumn("s", toEpochDayUdf($"start")).withColumn("e", toEpochDayUdf($"end")).distinct  
df2.show(false) // The original input
df2.createOrReplaceTempView("ranges")

// Find those records encompassed by a broader time frame and hence not required for processing.
val q = spark.sql("""  SELECT * 
                         FROM ranges r1
                        WHERE EXISTS (SELECT r2.name                        
                                        FROM ranges r2
                                       WHERE r2.name = r1.name 
                                         AND ((r1.s >= r2.s AND r1.e < r2.e) OR 
                                              (r1.e <= r2.e AND r1.s > 2.s))
                                     ) 
                  """)   
//q.show(false)

val df3 = df2.except(q) // Overlapping or on their own / single range records left.
//df3.show(false)
df3.createOrReplaceTempView("ranges2")

// Find those ranges that have a gap between them and the next adjacent records, before or after, i.e. records that exist on their own and are in fact per de facto the first part of the answer.
val q2 = spark.sql("""  SELECT * 
                         FROM ranges2 r1
                        WHERE NOT EXISTS (SELECT r2.name                        
                                            FROM ranges2 r2
                                           WHERE r2.name = r1.name 
                                             AND (r2.e >= r1.s - 1 AND r2.s <= r1.s - 1 ) OR
                                                 (r2.s <= r1.e + 1 AND r2.e >= r1.e + 1 )) 
                                          ) 
                   """)

// Store the first set of records that exist on their own with some form of gap, first part of result overall result set.                                                    
val result1 = q2.select("name", "start", "end")
result1.show(false) 

// Get the records / ranges that have overlaps to process - the second remaining set of such records to process.
val df4 = df3.except(q2) 
//df4.show(false)

//Avoid Serialization errors with lag!
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("name").orderBy("e")
@transient val lag_y = lag("e", 1, -99999999).over(w)
//df.select(lag_y).map(f _).first
val df5 = df4.withColumn("new_col", lag_y)
//df5.show(false)

// Massage data to get results via easier queries, e.g. avoid issues with correlated sub-queries.
val myExpression = "s - new_col"
val df6 = df5.withColumn("result", when($"new_col" === 0, 0).otherwise(expr(myExpression)))
//df6.show(false)
df6.createOrReplaceTempView("ranges3")

val q3 = spark.sql("""  SELECT *, dense_rank() over (PARTITION BY name ORDER BY start ASC) as RANK
                          FROM ranges3
                          WHERE new_col = -99999999 OR result > 1
                   """)
q3.createOrReplaceTempView("rangesSTARTS")

val q4 = spark.sql("""  SELECT *
                          FROM ranges3
                         WHERE result <= 1 AND new_col <> -99999999 
                   """)
q4.createOrReplaceTempView("rangesFOLLOWERS")

val q5 = spark.sql("""  SELECT r1.*, r2.start as next_start
                          FROM rangesSTARTS r1 LEFT JOIN rangesSTARTS r2
                           ON r2.name = r1.name 
                          AND r2.RANK = r1.RANK + 1 
                   """)
//q5.show(false)

val q6 = q5.withColumn("end_period", when($"next_start".isNull, "2525-01-01").otherwise($"next_start"))
//q6.show(false)
q6.createOrReplaceTempView("rangesSTARTS2")

// Second and final set of results - the head and tail of such set of range records.
val result2 = spark.sql("""  SELECT r1.name, r1.start, MAX(r2.end) as end
                               FROM rangesFOLLOWERS r2, rangesSTARTS2 r1
                              WHERE r2.name = r1.name
                                AND r2.end >= r1.start 
                                AND r2.end <  r1.end_period
                           GROUP BY r1.name, r1.start """)   
result2.show(false)

val finalresult = result1.union(result2)
finalresult.show

возвращает:

+-----+----------+----------+
| name|     start|       end|
+-----+----------+----------+
|  Bob|2018-09-01|2018-09-02|
|Betty|2017-09-10|2017-09-15|
|  YYY|2017-09-04|2017-09-15|
|  Bob|2018-09-04|2018-09-22|
|  Bob|2019-09-04|2019-09-07|
|  XXX|2017-09-04|2017-09-15|
|Betty|2018-09-05|2018-09-15|
+-----+----------+----------+

Интересный контраст - что лучше для производительности и стиля?Мое последнее такое усилие на время.Интересуют ваши комментарии.Вы знаете аспекты программирования лучше, чем я, поэтому этот вопрос дает хорошее сравнение и хорошее образование.другие решения взрываются, не то, что я видел.

0 голосов
/ 29 октября 2018

Я думаю, что мой второй подход лучше, но все еще далек от совершенства.По крайней мере, он избегает повторения каждого дня в каждом диапазоне дат, хотя теперь он обрабатывает каждый диапазон несколько раз.Я думаю, что я в основном собираюсь обрабатывать несколько больших диапазонов вместо нескольких маленьких диапазонов, так что, может быть, это нормально.

Учитывая:

val ranges = Seq(
  ("Mike","2018-09-01","2018-09-10"),
  ("Mike","2018-09-05","2018-09-05"),
  ("Mike","2018-09-12","2018-09-12"),
  ("Mike","2018-09-11","2018-09-11"),
  ("Mike","2018-09-25","2018-09-30"),
  ("Mike","2018-09-21","2018-09-23"),
  ("Mike","2018-09-24","2018-09-24"),
  ("Mike","2018-09-14","2018-09-16"),
  ("Mike","2018-09-15","2018-09-17"),
  ("Mike","2018-09-05","2018-09-05"),
  ("Mike","2018-09-19","2018-09-19"),
  ("Mike","2018-09-19","2018-09-19"),
  ("Mike","2018-08-19","2018-08-20"),
  ("Mike","2018-10-01","2018-10-20"),
  ("Mike","2018-10-10","2018-10-30")
)
val df = ranges.toDF("name", "start", "end")

Я хочу:

+----+----------+----------+                                                    
|name|start     |end       |
+----+----------+----------+
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-21|2018-09-30|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-08-19|2018-08-20|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+

(На этот раз они не в порядке. Я согласен с этим, так как это никогда не было требованием. Просто это был артефакт моего предыдущего подхода)

// very specific helper functions to convert a date string to and from a range
implicit class MyString(s:String) {
  def toFilteredInt: Int = s.filter(_.isDigit).toInt
  def to(s2:String): Range = s.toFilteredInt to s2.toFilteredInt
  // this only works for YYYYMMDD strings. very dangerous.
  def toDateStr = s"${s.slice(0,4)}-${s.slice(4,6)}-${s.slice(6,8)}"
}

// helper functions to combine two ranges
implicit class MyRange(r:Range) {
  def expand(i: Int): Range = r.head - i * r.step to r.last + i * r.step
  def containsPart(r2:Range): Boolean = r.contains(r2.head) || r.contains(r2.last)
  def intersects(r2:Range): Boolean = r.containsPart(r2) || r2.containsPart(r)
  def combine(r2:Range): Option[Range] = {
    if (r.step == r2.step && r.intersects(r2 expand 1)) {
      if (r.step > 0) Some(Math.min(r.head, r2.head) to Math.max(r.last, r2.last))
      else Some(Math.max(r.head, r2.head) to Math.min(r.last, r2.last))
    }
    else None
  }
  def toDateStrTuple: (String,String) = (r.start.toString.toDateStr, r.end.toString.toDateStr)
}

// combines a range to one of the ranges in a sequence if possible;
// adds it to the sequence if it can't be combined.
def addToRanges(rs:Seq[Range], r:Range): Seq[Range] = {
  if (rs.isEmpty) Seq(r)
  else r.combine(rs.last) match {
    case Some(r:Range) => rs.init :+ r
    case None => addToRanges(rs.init, r) :+ rs.last
  }
}

// tries to combine every range in the sequence with every other range
// does not handle the case where combining two ranges together allows
// them to be combined with a previous range in the sequence.
// however, if we call this and nothing has been combined, we know
// we are done
def collapseOnce(rs:Seq[Range]):Seq[Range] = {
  if (rs.size <= 1) rs
  else addToRanges(collapseOnce(rs.init), rs.last)
}

// keep collapsing the sequence of ranges until they can't collapse
// any further
def collapseAll(rs:Seq[Range]):Seq[Range] = {
  val collapsed = collapseOnce(rs)
  if (rs.size == collapsed.size) rs
  else collapseAll(collapsed)
}

// now our udf is much simpler
val combineRanges = udf((rows: Seq[Row]) => {
  val ranges  = rows.map(r => r(0).toString to r(1).toString)
  collapseAll(ranges).map(_.toDateStrTuple)
})


df.groupBy("name").agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges"
  ).withColumn("ranges", explode($"ranges")
  ).select($"name", $"ranges._1" as "start", $"ranges._2" as "end").show(false)

Комнатадля улучшения:

  • Я почти уверен, что большую часть времени я получу лучшую производительность, если выйду из collapseOnce, как только найду диапазон для объединения.Типичным вариантом использования будет добавление диапазона из одного дня к последнему диапазону в последовательности.
  • collapseOnce и addToRanges еще не являются хвостовыми рекурсивами.
  • Некоторые из даты в строку и строки в датуметоды в моих неявных классах, вероятно, должны быть отдельными методами.Они очень специфичны для моей проблемы и не заслуживают включения в классы String и Range.
...