В Spark Scala, как проверить перекрывающиеся даты из соседних строк в кадре данных - PullRequest
0 голосов
/ 18 октября 2018

Как я могу проверить перекрывающиеся даты из соседних строк (предшествующих и следующих) в Dataframe.Это должно происходить на уровне ключа

У меня есть следующие данные после сортировки по ключу, даты

source_Df.show()
+-----+--------+------------+------------+
| key | code   | begin_dt   | end_dt     |
+-----+--------+------------+------------+
| 10  |  ABC   | 2018-01-01 | 2018-01-08 |
| 10  |  BAC   | 2018-01-03 | 2018-01-15 |
| 10  |  CAS   | 2018-01-03 | 2018-01-21 |
| 20  |  AAA   | 2017-11-12 | 2018-01-03 |
| 20  |  DAS   | 2018-01-01 | 2018-01-12 |
| 20  |  EDS   | 2018-02-01 | 2018-02-16 |
+-----+--------+------------+------------+

Когда есть совпадение дат (т.е. текущая строка begin_dt находится междудаты начала и окончания предыдущей строки), у меня должна быть самая низкая начальная дата во всех таких строках и самая высокая конечная дата.Вот результат, который мне нужен ..

final_Df.show()
+-----+--------+------------+------------+
| key | code   | begin_dt   | end_dt     |
+-----+--------+------------+------------+
| 10  |  ABC   | 2018-01-01 | 2018-01-21 |
| 10  |  BAC   | 2018-01-01 | 2018-01-21 |
| 10  |  CAS   | 2018-01-01 | 2018-01-21 |
| 20  |  AAA   | 2017-11-12 | 2018-01-12 |
| 20  |  DAS   | 2017-11-12 | 2018-01-12 |
| 20  |  EDS   | 2018-02-01 | 2018-02-16 |
+-----+--------+------------+------------+

Цените любые идеи для достижения этой цели.Заранее спасибо!

1 Ответ

0 голосов
/ 18 октября 2018

Вот один из подходов:

  1. Создать новый столбец group_id со значением null, если begin_dt находится в диапазоне дат от предыдущей строки;в противном случае уникальное целое число
  2. Backfill null s в group_id с ненулевым значением last
  3. Вычисление min(begin_dt) и max(end_dt) в каждом (key, group_id) разбиение

Пример ниже:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  (10, "ABC", "2018-01-01", "2018-01-08"),
  (10, "BAC", "2018-01-03", "2018-01-15"),
  (10, "CAS", "2018-01-03", "2018-01-21"),
  (20, "AAA", "2017-11-12", "2018-01-03"),
  (20, "DAS", "2018-01-01", "2018-01-12"),
  (20, "EDS", "2018-02-01", "2018-02-16")
).toDF("key", "code", "begin_dt", "end_dt")

val win1 = Window.partitionBy($"key").orderBy($"begin_dt", $"end_dt")
val win2 = Window.partitionBy($"key", $"group_id")

df.
  withColumn("group_id", when(
      $"begin_dt".between(lag($"begin_dt", 1).over(win1), lag($"end_dt", 1).over(win1)), null
    ).otherwise(monotonically_increasing_id)
  ).
  withColumn("group_id", last($"group_id", ignoreNulls=true).
      over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("begin_dt2", min($"begin_dt").over(win2)).
  withColumn("end_dt2", max($"end_dt").over(win2)).
  orderBy("key", "begin_dt", "end_dt").
  show
// +---+----+----------+----------+-------------+----------+----------+
// |key|code|  begin_dt|    end_dt|     group_id| begin_dt2|   end_dt2|
// +---+----+----------+----------+-------------+----------+----------+
// | 10| ABC|2018-01-01|2018-01-08|1047972020224|2018-01-01|2018-01-21|
// | 10| BAC|2018-01-03|2018-01-15|1047972020224|2018-01-01|2018-01-21|
// | 10| CAS|2018-01-03|2018-01-21|1047972020224|2018-01-01|2018-01-21|
// | 20| AAA|2017-11-12|2018-01-03| 455266533376|2017-11-12|2018-01-12|
// | 20| DAS|2018-01-01|2018-01-12| 455266533376|2017-11-12|2018-01-12|
// | 20| EDS|2018-02-01|2018-02-16| 455266533377|2018-02-01|2018-02-16|
// +---+----+----------+----------+-------------+----------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...