Рассчитать последовательности постоянно увеличивающихся дат Spark - PullRequest
0 голосов
/ 09 июня 2019

У меня есть датафрейм в Spark с именем столбца и датами. И я хотел бы найти все непрерывные последовательности постоянно увеличивающихся дат (день за днем) для каждого имени и рассчитать их продолжительность. Вывод должен содержать имя, дату начала (последовательности дат) и продолжительность такого периода времени (количество дней) Как я могу сделать это с функциями Spark?

Пример последовательности последовательностей дат:

2019-03-12
2019-03-13
2019-03-14
2019-03-15

Я определил такое решение, но оно рассчитывает общее количество дней по каждому имени и не делит его на последовательности:

val result = allDataDf
    .groupBy($"name")
    .agg(count($"date").as("timePeriod"))
    .orderBy($"timePeriod".desc)
    .head()

Кроме того, я пробовал с рангами, но столбец count имеет только 1 с, по некоторым причинам:

val names = Window
    .partitionBy($"name")
    .orderBy($"date")
 val result = allDataDf
    .select($"name", $"date", rank over names as "rank")
    .groupBy($"name", $"date", $"rank")
    .agg(count($"*") as "count")

Вывод выглядит так:

+-----------+----------+----+-----+
|stationName|      date|rank|count|
+-----------+----------+----+-----+
|       NAME|2019-03-24|   1|    1|
|       NAME|2019-03-25|   2|    1|
|       NAME|2019-03-27|   3|    1|
|       NAME|2019-03-28|   4|    1|
|       NAME|2019-01-29|   5|    1|
|       NAME|2019-03-30|   6|    1|
|       NAME|2019-03-31|   7|    1|
|       NAME|2019-04-02|   8|    1|
|       NAME|2019-04-05|   9|    1|
|       NAME|2019-04-07|  10|    1|
+-----------+----------+----+-----+

1 Ответ

2 голосов
/ 09 июня 2019

Поиск последовательных дат в SQL довольно прост.Вы можете сделать это с помощью запроса:

WITH s AS (
   SELECT
    stationName,
    date,
    date_add(date, -(row_number() over (partition by stationName order by date))) as discriminator
  FROM stations
)
SELECT
  stationName,
  MIN(date) as start,
  COUNT(1) AS duration
FROM s GROUP BY stationName, discriminator

К счастью, мы можем использовать SQL в spark.Давайте проверим, работает ли он (я использовал разные даты):

val df = Seq(
       ("NAME1", "2019-03-22"),
       ("NAME1", "2019-03-23"),
       ("NAME1", "2019-03-24"),
       ("NAME1", "2019-03-25"),

       ("NAME1", "2019-03-27"),
       ("NAME1", "2019-03-28"),

       ("NAME2", "2019-03-27"),
       ("NAME2", "2019-03-28"),

       ("NAME2", "2019-03-30"),
       ("NAME2", "2019-03-31"),

       ("NAME2", "2019-04-04"),
       ("NAME2", "2019-04-05"),
       ("NAME2", "2019-04-06")
  ).toDF("stationName", "date")
      .withColumn("date", date_format(col("date"), "yyyy-MM-dd"))

df.createTempView("stations");

  val result = spark.sql(
  """
     |WITH s AS (
     |   SELECT
     |    stationName,
     |    date,
     |    date_add(date, -(row_number() over (partition by stationName order by date)) + 1) as discriminator
     |  FROM stations
     |)
     |SELECT
     |  stationName,
     |  MIN(date) as start,
     |  COUNT(1) AS duration
     |FROM s GROUP BY stationName, discriminator
   """.stripMargin)

result.show()

Кажется, он выводит правильный набор данных:

+-----------+----------+--------+
|stationName|     start|duration|
+-----------+----------+--------+
|      NAME1|2019-03-22|       4|
|      NAME1|2019-03-27|       2|
|      NAME2|2019-03-27|       2|
|      NAME2|2019-03-30|       2|
|      NAME2|2019-04-04|       3|
+-----------+----------+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...